PySpark Day 3 — Optimization + Performance
Senior-level deep dive: Catalyst, Tungsten, AQE, Join Strategies, Skew, Configs This is the #1 differentiator topic at Senior Data Engineer level
THE SENIOR ENGINEER MINDSET
🧠 Memory Map
When someone says "the Spark job is slow", I don't guess.
I open Spark UI:
1. Stages tab→skewed tasks (one task 10x slower than median?)
2. SQL tab→physical plan (Sort-Merge Joins that should be Broadcast?)
3. Executors tab→GC time > 10%? → memory pressure
4. Storage tab→missing cache that's recomputed?
5. Environment tab→shuffle.partitions = 200 for 10 GB data?
The answer is ALWAYS one of:
Data skew → salting or AQE skew handling
Wrong join strategy → broadcast hint or AQE
Partitions wrong → repartition / coalesce / shuffle.partitions
DataFrame recomputed multiple times → cache()
GC pressure → G1GC + memory config tuning
PART 1: CATALYST OPTIMIZER
The 4 Phases
🧠 Memory Map
USER CODE (DataFrame/SQL)
↓
1. ANALYSIS PHASE
Parse query → Unresolved Logical Plan
Resolve columns/tables against Catalog
Result: Resolved Logical Plan
↓
2. LOGICAL OPTIMIZATION PHASE
Apply rule-based optimizations:
• Predicate Pushdown→move filters as early as possible
• Column Pruning→only read needed columns
• Constant Folding→1 + 1 → 2 at plan time
• Join Reordering→smaller table first
Result: Optimized Logical Plan
↓
3. PHYSICAL PLANNING PHASE
Generate multiple Physical Plans (join strategies, scan strategies)
Cost-Based Optimizer (CBO) picks cheapest plan
Result: Selected Physical Plan
↓
4. CODE GENERATION (Tungsten Whole-Stage CodeGen)
Generates Java bytecode for the physical plan
Eliminates virtual function calls
Result: Executed compiled code
Predicate Pushdown — When It Works
python — editable
# WORKS — Spark pushes filter into Parquet reader
df = spark.read.parquet("data/")
df.filter(col("year") == 2024).select("name", "amount")
# → Parquet reader skips row groups where year ≠ 2024
# WORKS — Pushes into JDBC source
df = spark.read.jdbc(...).filter(col("id") > 1000)
# → Sends WHERE id > 1000 to database
# DOES NOT WORK — UDF blocks pushdown
df.filter(my_udf(col("year")) == 2024) # Catalyst can't push UDF into source
# DOES NOT WORK — computed column loses pushdown
df.withColumn("yr", year(col("date"))).filter(col("yr") == 2024)
# ↑ use df.filter(year(col("date")) == 2024) BEFORE withColumn instead
Column Pruning
python — editable
# Catalyst automatically prunes: reads only "id" and "name" from Parquet
df = spark.read.parquet("data/").select("id", "name")
# Parquet is columnar — unread columns are never deserialized
Force Catalyst to Show Plan
python — editable
df.explain() # Physical plan only
df.explain(True) # All 4 plans
df.explain("formatted") # Formatted with indentation
df.explain("cost") # With cost statistics
df.explain("codegen") # Generated Java bytecode
PART 2: TUNGSTEN ENGINE
3 Key Features
1. OFF-HEAP MEMORY MANAGEMENT
Custom memory allocator (bypasses Java heap/GC)
Stores data in binary format (not Java objects)
No GC pressure for data storage
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = 4g
2. CACHE-AWARE COMPUTATION
Organizes data access patterns for CPU cache efficiency
Sort and hash operations optimized for L1/L2 cache
Column-based in-memory format
3. WHOLE-STAGE CODE GENERATION (CodeGen)
Collapses multiple operators into one compiled JVM function
Eliminates virtual function calls per row
~10x improvement in CPU efficiency
Applied automatically by Catalyst
PART 3: REPARTITION vs COALESCE
python — editable
1 partition → writes 1 file → bottleneck!
Use repartition(N) then write, not coalesce(1) for large data."># repartition(N) — WIDE transformation (full shuffle)
df.repartition(200) # increase OR decrease partitions
df.repartition(200, "col") # hash partition by column (for joins!)
df.repartition(200, "col1", "col2") # hash by multiple columns
# coalesce(N) — NARROW transformation (no shuffle)
df.coalesce(10) # only DECREASE partitions (merge adjacent)
# cannot increase partition count
WHEN TO USE:
repartition():
- Before a join (ensure even distribution)
- When increasing partition count
- When you want balanced partitions by column value
- Upstream of a large shuffle operation
coalesce():
- After a filter that reduces data significantly
- Before writing output (reduce file count)
- When current partitions are already somewhat balanced
⚠️ TRAP: coalesce(1) creates 1 partition → writes 1 file → bottleneck!
Use repartition(N) then write, not coalesce(1) for large data.
spark.sql.shuffle.partitions Tuning
python — editable
200 GB data)
# Rule of thumb: 1 partition per 100 MB-200 MB of shuffled data
spark.conf.set("spark.sql.shuffle.partitions", "400")
# TOO FEW partitions (e.g., 10 for 1 TB data):
# → Each partition too large → OOM, slow, no parallelism
# TOO MANY partitions (e.g., 200 for 1 MB data):
# → Thousands of tiny files → scheduler overhead, small file problem
# AQE automatically adjusts this! (see Part 5)
# Check current setting:
spark.conf.get("spark.sql.shuffle.partitions")"># Default: 200 (good for ~200 GB data)
# Rule of thumb: 1 partition per 100 MB-200 MB of shuffled data
spark.conf.set("spark.sql.shuffle.partitions", "400")
# TOO FEW partitions (e.g., 10 for 1 TB data):
# → Each partition too large → OOM, slow, no parallelism
# TOO MANY partitions (e.g., 200 for 1 MB data):
# → Thousands of tiny files → scheduler overhead, small file problem
# AQE automatically adjusts this! (see Part 5)
# Check current setting:
spark.conf.get("spark.sql.shuffle.partitions")
PART 4: JOIN STRATEGIES
The 5 Join Strategies
1. BROADCAST HASH JOIN (BHJ) — FASTEST
─────────────────────────────────────────────────────────
When: One table fits in memory (< autoBroadcastJoinThreshold)
How: Driver broadcasts small table to ALL Executors
Each Executor does local hash join — NO SHUFFLE
Best for: Fact-dimension joins (orders JOIN countries)
Trigger:
• Auto: spark.sql.autoBroadcastJoinThreshold = 10 MB (default)
• Manual hint: df1.join(broadcast(df2), "key")
2. SHUFFLE HASH JOIN (SHJ)
─────────────────────────────────────────────────────────
When: One side is small enough to build hash map in memory
(but too big to broadcast)
How: Shuffle both sides by join key
Build hash map from smaller side
Probe hash map with larger side (no sort needed)
Best for: Medium-sized tables
3. SORT-MERGE JOIN (SMJ) — DEFAULT for large tables
─────────────────────────────────────────────────────────
When: Both tables are large (neither fits in memory for hash)
How: Shuffle both sides by join key
Sort both sides by join key
Merge (like merge sort) — no hash map needed
Best for: Large-large joins, equi-joins
Note: Most robust, but most expensive (shuffle + sort + merge)
4. BROADCAST NESTED LOOP JOIN (BNLJ) — SLOW
─────────────────────────────────────────────────────────
When: Non-equi joins (>, <, >=, <=, !=, BETWEEN, LIKE)
How: Broadcasts smaller table, nested loops for each row pair
Very slow O(n×m) — avoid when possible
Use case: "Find all orders within 30 days of each event"
5. CARTESIAN JOIN — SLOWEST
─────────────────────────────────────────────────────────
When: CROSS JOIN with no join condition
Result: M × N rows (every combination)
Requires: spark.sql.crossJoin.enabled = true
Use case: Generating all combinations (usually intentional)
Broadcast Join — Full Details
python — editable
50 MB
# Set to -1 to DISABLE auto-broadcast (for testing SMJ)
# When to manually broadcast:
# 1. Table is >10 MB but <driver_memory (auto-threshold is too conservative)
# 2. AQE hasn't kicked in yet (early in job)
# 3. You know table is small but stats aren't computed (no ANALYZE TABLE)
# ⚠️ DANGER: Broadcasting a 10 GB table → Driver/Executor OOM">from pyspark.sql.functions import broadcast
# Method 1: Hint in code (override Catalyst's decision)
result = large_df.join(broadcast(small_df), "customer_id")
# Method 2: SQL hint
spark.sql("""
SELECT /*+ BROADCAST(countries) */ *
FROM orders o
JOIN countries c ON o.country_code = c.code
""")
# Threshold: auto-broadcast tables smaller than this
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "50m") # 50 MB
# Set to -1 to DISABLE auto-broadcast (for testing SMJ)
# When to manually broadcast:
# 1. Table is >10 MB but <driver_memory (auto-threshold is too conservative)
# 2. AQE hasn't kicked in yet (early in job)
# 3. You know table is small but stats aren't computed (no ANALYZE TABLE)
# ⚠️ DANGER: Broadcasting a 10 GB table → Driver/Executor OOM
Join Strategy Decision Tree
🧠 Memory Map
Question: How big are the tables?
Small + Any→BROADCAST HASH JOIN (always prefer!)
↓ (both large)
Equi-join?
Yes→SORT-MERGE JOIN (shuffle + sort + merge)
No→BROADCAST NESTED LOOP JOIN (slow, avoid if possible)
Non-equi but small side→BROADCAST NESTED LOOP (more tolerable)
PART 5: AQE — ADAPTIVE QUERY EXECUTION
What is AQE?
AQE is Spark 3.0+ feature that re-optimizes query plans at runtime using actual runtime statistics (not estimated). Default enabled in Spark 3.2+.
python — editable
# Enable AQE (default ON in Spark 3.2+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
AQE Feature 1: Coalescing Shuffle Partitions
PROBLEM
shuffle.partitions = 200 (default)
But your data is small→200 tiny partitions → scheduler overhead
AQE SOLUTION
After each shuffle stage, AQE looks at actual partition sizes
Merges small partitions into larger ones
Result: Fewer, more optimal partitions (no manual tuning!)
CONFIG
spark.sql.adaptive.coalescePartitions.enabled = true (default: true with AQE)
spark.sql.adaptive.advisoryPartitionSizeInBytes = 64m # target size per partition
spark.sql.adaptive.coalescePartitions.minPartitionNum = 1 # minimum after coalescing
AQE Feature 2: Dynamic Join Strategy Switching
PROBLEM
At plan time, Catalyst estimates table A is 500 MB (no broadcast)
But at runtime, after filtering, table A is actually 5 MB
Original plan: Sort-Merge Join (expensive)
AQE SOLUTION
At runtime, after filter executes, AQE rechecks actual table size
If now small enough→switches to Broadcast Hash Join on the fly!
No code change needed.
CONFIG
spark.sql.adaptive.localShuffleReader.enabled = true
# AQE can avoid network shuffle entirely when possible
AQE Feature 3: Skew Join Optimization
PROBLEM
One join key (e.g., "US") appears in 80% of rows
One task processes 80% of data→1 task runs for hours while others finish
AQE SOLUTION
Detects skewed partitions at runtime (using actual sizes)
Splits skewed partitions into smaller sub-partitions
Duplicates the matching data from the other side
Processes sub-partitions in parallel
CONFIG
spark.sql.adaptive.skewJoin.enabled = true (default: true with AQE)
spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5 # 5x median = skewed
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = 256m # and > 256 MB
AQE Full Config Block
python — editable
spark = SparkSession.builder \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5") \
.config("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m") \
.getOrCreate()
PART 6: DATA SKEW — DETECTION + SOLUTIONS
Detecting Skew
python — editable
# Method 1: Spark UI → Stages → look for one task MUCH slower than median
# "Task Duration: median=5s, max=500s" → SKEW
# Method 2: Check key distribution
df.groupBy("country_code").count().orderBy(col("count").desc()).show(20)
# "US: 50M rows, UK: 2M, FR: 1M" → US is skewed key
# Method 3: Check partition sizes
from pyspark.sql.functions import spark_partition_id, count
df.withColumn("pid", spark_partition_id()) \
.groupBy("pid").count() \
.orderBy(col("count").desc()) \
.show(20)
# Large variance in count → skew
Solution 1: Salting (Manual Fix)
python — editable
from pyspark.sql.functions import col, concat, lit, monotonically_increasing_id, rand
import math
# Step 1: Add salt to the large table's skewed key
SALT_FACTOR = 10
large_df_salted = large_df.withColumn(
"salted_key",
concat(col("country_code"), lit("_"), (rand() * SALT_FACTOR).cast("int").cast("string"))
)
# Step 2: Explode the small table to match all salt values
from pyspark.sql.functions import array, explode
small_df_exploded = small_df.withColumn(
"salt", array([lit(i) for i in range(SALT_FACTOR)])
).withColumn("salt", explode("salt")) \
.withColumn("salted_key",
concat(col("country_code"), lit("_"), col("salt").cast("string")))
# Step 3: Join on salted key
result = large_df_salted.join(small_df_exploded, "salted_key", "inner")
# Step 4: Drop salt columns
result = result.drop("salted_key", "salt")
Solution 2: Broadcast the Small Side (If Possible)
python — editable
# If the skewed join is small-large, just broadcast the small side
result = large_df.join(broadcast(small_df), "country_code")
Solution 3: Let AQE Handle It (Spark 3.0+)
python — editable
# Enable AQE skew join handling (splits hot partitions automatically)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# No code change! AQE detects and handles at runtime.
Solution 4: Two-Phase Aggregation (for groupBy skew)
python — editable
# Phase 1: Group by (key, salt) to partially aggregate within partitions
import math
from pyspark.sql.functions import rand, col, sum
SALT = 10
partial = df.withColumn("salt", (rand() * SALT).cast("int")) \
.groupBy("country_code", "salt") \
.agg(sum("amount").alias("partial_sum"))
# Phase 2: Final aggregation drops salt
final = partial.groupBy("country_code") \
.agg(sum("partial_sum").alias("total_amount"))
PART 7: CACHE vs PERSIST — DEEP DIVE
Storage Levels Comparison
| Level | Memory | Disk | Serialized | Replicated |
|---|---|---|---|---|
| MEMORY_ONLY | ✓ | ✗ | ✗ | ✗ |
| MEMORY_ONLY_2 | ✓ | ✗ | ✗ | ✓ |
| MEMORY_AND_DISK | ✓ | ✓ | ✗ | ✗ |
| MEMORY_AND_DISK_2 | ✓ | ✓ | ✗ | ✓ |
| MEMORY_ONLY_SER | ✓ | ✗ | ✓ | ✗ |
| MEMORY_AND_DISK_SER | ✓ | ✓ | ✓ | ✗ |
| DISK_ONLY | ✗ | ✓ | ✓ | ✗ |
| OFF_HEAP | off-heap Tungsten memory | ✗ |
python — editable
from pyspark.storagelevel import StorageLevel
df.cache() # MEMORY_AND_DISK
df.persist(StorageLevel.MEMORY_ONLY) # fastest, may recompute
df.persist(StorageLevel.MEMORY_AND_DISK) # safest
df.persist(StorageLevel.DISK_ONLY) # slowest, reliable
df.persist(StorageLevel.MEMORY_ONLY_SER) # smaller memory, slower
df.persist(StorageLevel.OFF_HEAP) # Tungsten managed
df.unpersist() # release cache
# For RDD:
rdd.cache()
rdd.persist(StorageLevel.MEMORY_AND_DISK)
rdd.unpersist()
When to Cache
CACHE when:
✓ DataFrame used in 2+ actions (count + show + write)
✓ Iterative algorithms (MLlib PageRank-like)
✓ Interactive queries in notebooks (avoid recompute)
✓ Joins where one side is reused multiple times
DO NOT CACHE when:
✗ DataFrame used only once
✗ Data is too large for executor memory (causes cache thrashing)
✗ Data freshness required (cache can serve stale data)
✗ Simple linear pipeline with no branching
PART 8: GROUPBY AGGREGATION — REDUCEBYKEY vs AGGREGATEBYKEY vs GROUPBYKEY
| METHOD | Pre-aggregation | Output = Input | Use when |
|---|---|---|---|
| reduceByKey | YES (local) | YES (same type) | sum, max, min, product |
| aggregateByKey | YES (local) | NO (any type) | average, collect set |
| combineByKey | YES (local) | NO (any type) | most flexible, complex |
| groupByKey | NO | YES | need ALL values as list |
python — editable
# Average per key using aggregateByKey (output type ≠ input type)
rdd = sc.parallelize([("a", 10), ("b", 20), ("a", 30), ("b", 40)])
seqOp = lambda acc, val: (acc[0] + val, acc[1] + 1) # (sum, count)
combOp = lambda a, b: (a[0] + b[0], a[1] + b[1])
result = rdd.aggregateByKey((0, 0), seqOp, combOp) \
.mapValues(lambda x: x[0] / x[1])
# → [("a", 20.0), ("b", 30.0)]
# vs DataFrame way (preferred):
df.groupBy("key").agg(avg("value")) # Let Catalyst handle it
PART 9: SMALL FILES PROBLEM
Detection
Signs:
Job takes long to start (many tasks scheduled for tiny files)
Executor logs: "Task completed in 10 ms" → overhead dominates
Output: ls shows 10,000 small files (< 10 MB each)
Causes
1. Streaming writes (each micro-batch writes many files)
2. Partitioned writes with many partition values (partitionBy with high cardinality)
3. After filter that produces tiny result sets per partition
4. coalesce(1) then repartition creates small outputs
Solutions
python — editable
128 MB per partition
spark.conf.set("spark.sql.files.openCostInBytes", "4194304") # 4 MB open cost estimate"># Solution 1: Coalesce before writing
df.coalesce(10).write.parquet("output/") # reduce to 10 output files
# Solution 2: Repartition by actual data volume
target_size_mb = 128
data_size_mb = df.count() * schema.avg_row_size_mb # estimate
n_partitions = max(1, int(data_size_mb / target_size_mb))
df.repartition(n_partitions).write.parquet("output/")
# Solution 3: Delta OPTIMIZE (compact small files into larger ones)
# OPTIMIZE delta.`/path/to/delta` ZORDER BY (col)
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (customer_id)")
# Solution 4: Hive/Spark Config (auto merge small files)
spark.conf.set("spark.sql.files.maxPartitionBytes", "134217728") # 128 MB per partition
spark.conf.set("spark.sql.files.openCostInBytes", "4194304") # 4 MB open cost estimate
PART 10: SPARK CONFIGURATIONS — COMPLETE REFERENCE
Memory Configuration
python — editable
384 MB)
spark.memory.fraction = 0.6 # fraction of heap for Spark (default: 0.6)
spark.memory.storageFraction = 0.5 # within Spark memory: storage vs execution
# Memory layout (4g executor):
# Total heap = 4g
# User memory = 4g × (1 - 0.6) = 1.6g (UDFs, user data structures)
# Spark memory = 4g × 0.6 = 2.4g
# Storage (cache) = 2.4g × 0.5 = 1.2g
# Execution (joins, shuffles) = 2.4g × 0.5 = 1.2g (both can borrow from each other)
# --- DRIVER MEMORY ---
spark.driver.memory = "4g" # Driver JVM heap
spark.driver.maxResultSize = "2g" # max result from collect() etc.
# --- OFF-HEAP (Tungsten) ---
spark.memory.offHeap.enabled = "true"
spark.memory.offHeap.size = "4g""># --- EXECUTOR MEMORY ---
spark.executor.memory = "4g" # JVM heap for executors
spark.executor.memoryOverhead = "512m" # Off-heap overhead (Python, native)
# Default: max(executor.memory * 0.1, 384 MB)
spark.memory.fraction = 0.6 # fraction of heap for Spark (default: 0.6)
spark.memory.storageFraction = 0.5 # within Spark memory: storage vs execution
# Memory layout (4g executor):
# Total heap = 4g
# User memory = 4g × (1 - 0.6) = 1.6g (UDFs, user data structures)
# Spark memory = 4g × 0.6 = 2.4g
# Storage (cache) = 2.4g × 0.5 = 1.2g
# Execution (joins, shuffles) = 2.4g × 0.5 = 1.2g (both can borrow from each other)
# --- DRIVER MEMORY ---
spark.driver.memory = "4g" # Driver JVM heap
spark.driver.maxResultSize = "2g" # max result from collect() etc.
# --- OFF-HEAP (Tungsten) ---
spark.memory.offHeap.enabled = "true"
spark.memory.offHeap.size = "4g"
Executor Configuration
python — editable
spark.executor.cores = "4" # cores per executor (2-5 recommended)
spark.executor.instances = "10" # number of executors (static allocation)
# Dynamic allocation config below
# Rule of thumb:
# cores: 4-5 per executor (too many → memory contention, too few → underutilized)
# memory: 4g per core (so 4 cores → 16g executor)
Shuffle Configuration
python — editable
1 partition per 100-200 MB)
spark.shuffle.compress = "true" # compress shuffle data (default: true)
spark.shuffle.spill.compress = "true" # compress spilled shuffle data
spark.io.compression.codec = "snappy" # snappy (fast), lz4, zstd (best ratio)">spark.sql.shuffle.partitions = "200" # default (tune: 1 partition per 100-200 MB)
spark.shuffle.compress = "true" # compress shuffle data (default: true)
spark.shuffle.spill.compress = "true" # compress spilled shuffle data
spark.io.compression.codec = "snappy" # snappy (fast), lz4, zstd (best ratio)
Parallelism Configuration
python — editable
128 MB max per input partition
spark.sql.files.openCostInBytes = "4194304" # 4 MB estimated open cost">spark.default.parallelism = "200" # default for RDD operations (= shuffle partitions)
spark.sql.shuffle.partitions = "200" # for DataFrame/SQL operations
spark.sql.files.maxPartitionBytes = "134217728" # 128 MB max per input partition
spark.sql.files.openCostInBytes = "4194304" # 4 MB estimated open cost
Dynamic Allocation
python — editable
spark.dynamicAllocation.enabled = "true" # auto scale executors
spark.dynamicAllocation.minExecutors = "1"
spark.dynamicAllocation.maxExecutors = "50"
spark.dynamicAllocation.initialExecutors = "5"
spark.dynamicAllocation.executorIdleTimeout = "60s" # remove idle after 60s
spark.dynamicAllocation.schedulerBacklogTimeout = "1s" # add executor after 1s backlog
spark.shuffle.service.enabled = "true" # required for dynamic allocation
Serialization Configuration
python — editable
spark.serializer = "org.apache.spark.serializer.KryoSerializer" # faster than Java
spark.kryo.registrationRequired = "false"
spark.kryo.registrator = "com.myapp.MyKryoRegistrator" # custom for custom classes
# Kryo vs Java serialization:
# Java: safe, works with any class, 3-4x larger output
# Kryo: faster, ~3-10x smaller, needs class registration for best performance
JVM GC Configuration
python — editable
# G1GC is recommended (better for large heaps)
spark.executor.extraJavaOptions = "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=4"
spark.driver.extraJavaOptions = "-XX:+UseG1GC"
Production SparkSession Template
python — editable
spark = SparkSession.builder \
.appName("ProductionJob") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.driver.memory", "4g") \
.config("spark.driver.maxResultSize", "2g") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.autoBroadcastJoinThreshold", "50m") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
.config("spark.shuffle.service.enabled", "true") \
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
.getOrCreate()
PART 11: SPARK UI — HOW TO READ IT
Tabs and What to Look For
🧠 Memory Map
JOBS TAB
→ Each action = 1 job
→ Click job to see stages
STAGES TAB
→ Each stage has: Tasks completed, Duration, Input/Output/Shuffle R/W
→ "Task Duration" distribution: if max >> median→SKEW
→ "GC Time" > 10% of task time→memory pressure
→ Failed tasks→OOM or node failure
SQL TAB (most important!)
→ Click "Details" on a query
→ See Physical Plan with metrics
→ Look for:
• "FileScan"→"PushedFilters" → is predicate pushed down?
• "BroadcastHashJoin" vs "SortMergeJoin"
• "Exchange" = shuffle (wide transformation)
• "Sort" = sort happening
→ Numbers on each node: rows, bytes in/out
EXECUTORS TAB
→ GC Time column→if high → reduce executor memory fraction or use G1GC
→ Storage Memory Used→is cache working?
→ Failed Tasks→which executor is problematic?
STORAGE TAB
→ Cached RDDs/DataFrames
→ Fraction cached, memory used
→ If cached fraction < 1.0→data doesn't fit in cache
ENVIRONMENT TAB
→ All Spark configs active in this session
→ Verify your config changes took effect
Reading the SQL Physical Plan
FileScan parquet [col1, col2, col3] ← column pruning working
PushedFilters: [EqualTo(year, 2024)] ← predicate pushdown working
PartitionCount: 100 ← how many partitions scanned
HashAggregate(keys=[dept], functions=[sum]) ← partial aggregation (local)
Exchange hashpartitioning(dept, 200) ← SHUFFLE (200 = shuffle.partitions)
HashAggregate(keys=[dept], functions=[partial_sum]) ← pre-aggregation
BroadcastHashJoin [id = id] ← BROADCAST JOIN (fast!)
BroadcastExchange HashedRelationBroadcastMode ← small table broadcasted
SortMergeJoin [id = id] ← SORT-MERGE JOIN (shuffle + sort)
Sort [id ASC]
Exchange hashpartitioning(id, 200)
Sort [id ASC]
Exchange hashpartitioning(id, 200)
PART 12: CHECKPOINTING vs CACHING
CACHING
Purpose: Reuse data in same application (avoid recomputation)
Storage: Executor memory/disk (not persistent)
Lineage: Preserved (can recompute from lineage if lost)
Speed: Fast to read (memory), slower (disk)
Scope: Current SparkSession lifetime
df.cache()
df.persist(StorageLevel.MEMORY_AND_DISK)
CHECKPOINTING
Purpose: Cut DAG lineage for very long iterative jobs
Save state that survives application restart
Storage: HDFS / S3 (reliable, persistent)
Lineage: CUT (checkpoint IS the source — no upstream lineage)
Speed: Slower (HDFS write, but saves re-traversing long DAG)
Scope: Survives application restarts
spark.sparkContext.setCheckpointDir("hdfs:///checkpoints/")
df.checkpoint() # triggers action, writes to HDFS, returns new DF
WHEN TO CHECKPOINT (vs cache)
✓ Iterative ML-like jobs (PageRank, k-means): after each iteration
✓ Streaming: checkpoint state for fault tolerance
✓ Very long lineage chains (DAG > 50 stages) to avoid recomputation cost
✓ When data must survive Driver restart
PATTERN: Cache first, then checkpoint
df.cache()
df.checkpoint() # triggers action (reads from cache → writes to HDFS)
df.unpersist() # release cache after checkpoint
PART 13: SCENARIO-BASED QUESTIONS
Scenario 1: "Our Spark job takes 6 hours. How do you debug?"
python — editable
1 GB? → TOO FEW PARTITIONS
STEP 2: SQL tab → physical plan
- SortMergeJoin where one side is small? → switch to BroadcastHashJoin
df.join(broadcast(small_df), "key")
- FileScan without PushedFilters? → predicate pushdown not working
Fix: filter BEFORE any UDF/withColumn computed columns
STEP 3: Check configurations
- shuffle.partitions: default 200 for 2 TB data? → set to 2000
- AQE enabled? spark.sql.adaptive.enabled = true
- Using groupByKey instead of reduceByKey? → high shuffle data
STEP 4: Check data distribution
df.groupBy("join_key").count().orderBy(col("count").desc()).show(10)
# If top key has 50% of rows → add salting
STEP 5: Check cache utilization
- Is an expensive DataFrame recomputed multiple times?
- Storage tab: is cache evicted (fraction < 1.0)?
"""">"""
STEP 1: Open Spark UI → Stages tab
- Find the slowest stage
- Check: is one task WAY slower than median? → DATA SKEW
- Check: is input size per task >1 GB? → TOO FEW PARTITIONS
STEP 2: SQL tab → physical plan
- SortMergeJoin where one side is small? → switch to BroadcastHashJoin
df.join(broadcast(small_df), "key")
- FileScan without PushedFilters? → predicate pushdown not working
Fix: filter BEFORE any UDF/withColumn computed columns
STEP 3: Check configurations
- shuffle.partitions: default 200 for 2 TB data? → set to 2000
- AQE enabled? spark.sql.adaptive.enabled = true
- Using groupByKey instead of reduceByKey? → high shuffle data
STEP 4: Check data distribution
df.groupBy("join_key").count().orderBy(col("count").desc()).show(10)
# If top key has 50% of rows → add salting
STEP 5: Check cache utilization
- Is an expensive DataFrame recomputed multiple times?
- Storage tab: is cache evicted (fraction < 1.0)?
"""
Scenario 2: "Executor OOM errors"
python — editable
"""
CAUSE ANALYSIS:
1. Partition too large (data per task > executor memory)
2. Caching more data than memory can hold
3. groupByKey collecting too many values per key
4. Large broadcast (broadcasting too-large table)
FIXES:
1. Increase partitions:
df.repartition(spark.sql.shuffle.partitions * 2)
2. Increase executor memory:
spark.executor.memory = "16g"
spark.executor.memoryOverhead = "2g"
3. Replace groupByKey with reduceByKey/aggregateByKey:
# Bad:
rdd.groupByKey().mapValues(sum)
# Good:
rdd.reduceByKey(lambda a, b: a + b)
4. Use MEMORY_AND_DISK instead of MEMORY_ONLY:
df.persist(StorageLevel.MEMORY_AND_DISK)
5. Check broadcast threshold (maybe accidentally broadcasting large table):
spark.conf.get("spark.sql.autoBroadcastJoinThreshold")
"""
Scenario 3: "Job runs fine daily, but fails every Monday"
python — editable
"""
ANALYSIS: Monday has 7x data (weekend accumulation)
INVESTIGATION:
1. Check input file size on Monday vs other days
2. Check shuffle data in Spark UI (much larger on Monday?)
FIX — Make job adaptive to data volume:
1. Enable AQE (auto-coalesces/splits partitions at runtime):
spark.sql.adaptive.enabled = true
2. Set shuffle.partitions based on expected peak:
spark.sql.shuffle.partitions = 2000 # for Monday's 7x data
3. Use dynamic allocation:
spark.dynamicAllocation.enabled = true
spark.dynamicAllocation.maxExecutors = 100
4. Add checkpointing in middle of job:
after_heavy_join.checkpoint() # if job fails, resume from here
"""
Scenario 4: "Write to Parquet creates 10,000 small files"
python — editable
250 partitions
Each partition writes its own file → tiny files
SOLUTIONS:
1. Coalesce WITHIN each partition before write (using repartition):
df.repartition(200, "date", "region") # 200 files total, co-located by key
.write.partitionBy("date", "region").parquet("output/")
2. Use Delta OPTIMIZE to compact after write:
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date, region)")
3. Adjust spark.sql.files.maxPartitionBytes:
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456") # 256 MB
4. For streaming: trigger OPTIMIZE periodically (not per micro-batch)
"""">"""
PROBLEM: partitionBy("date", "region") with 365 dates × 50 regions = 18,250 partitions
Each partition writes its own file → tiny files
SOLUTIONS:
1. Coalesce WITHIN each partition before write (using repartition):
df.repartition(200, "date", "region") # 200 files total, co-located by key
.write.partitionBy("date", "region").parquet("output/")
2. Use Delta OPTIMIZE to compact after write:
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date, region)")
3. Adjust spark.sql.files.maxPartitionBytes:
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456") # 256 MB
4. For streaming: trigger OPTIMIZE periodically (not per micro-batch)
"""
PART 14: DYNAMIC ALLOCATION
python — editable
# Dynamic Allocation = Spark requests/releases Executors based on workload
# ENABLE:
spark.dynamicAllocation.enabled = true
spark.shuffle.service.enabled = true # REQUIRED: keeps shuffle files after executor released
# SCALING UP:
spark.dynamicAllocation.schedulerBacklogTimeout = "1s" # add exec if backlog > 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout = "5s" # keep adding every 5s
# SCALING DOWN:
spark.dynamicAllocation.executorIdleTimeout = "60s" # release executor idle >60s
# LIMITS:
spark.dynamicAllocation.minExecutors = "2" # always keep at least 2
spark.dynamicAllocation.maxExecutors = "100" # never exceed 100
# WHY shuffle service is required:
# When executor is released, shuffle files it wrote are LOST
# External shuffle service (running on YARN NM) keeps those files available
# Without it: releasing executors causes downstream stages to fail
# WHEN TO USE dynamic allocation:
# ✓ Shared clusters (don't hog resources when idle)
# ✓ Jobs with varying workload stages (small start, large shuffle middle)
# ✓ Interactive workloads (notebooks)
# WHEN TO PREFER STATIC:
# ✓ Consistent workload (same data size every run)
# ✓ SLA-critical jobs (dynamic scaling adds latency)
PART 15: FULL OPTIMIZATION CHECKLIST
BEFORE WRITING SPARK CODE
□ Is data partitioned correctly for this query? (partition pruning)
□ Which join strategy should I use? (size of tables)
□ Should I broadcast the small table?
□ What's my target partition size? (100-200 MB per partition)
WHILE WRITING
□ Filters applied early (before joins/aggregations)
□ No groupByKey→use reduceByKey/aggregateByKey
□ UDFs avoided where built-ins suffice
□ DataFrame cached before reuse (2+ actions)
□ Explicit schema defined (no inferSchema=True)
□ JDBC reads have numPartitions configured
CONFIG CHECKLIST
□ spark.sql.adaptive.enabled = true
□ spark.sql.shuffle.partitions tuned (not default 200 for large data)
□ spark.executor.memory adequate (no OOM)
□ spark.sql.autoBroadcastJoinThreshold tuned (> 10 MB for modern clusters)
□ Kryo serializer enabled (for RDD-heavy jobs)
□ Dynamic allocation configured (for shared clusters)
AFTER JOB RUNS
□ Spark UI: any skewed stages? (max task >> median task)
□ Spark UI: SortMergeJoin that could be BroadcastHashJoin?
□ GC Time on Executors tab > 10%?→reduce memory fraction or tune GC
□ Output file count reasonable? (not 10,000 tiny files)
□ AQE actually triggered? (check Physical Plan for AQE annotations)
QUICK COMPARISON TABLES
Repartition vs Coalesce
| Property | repartition(N) | coalesce(N) |
|---|---|---|
| Transformation | WIDE (full shuffle) | NARROW (no shuffle) |
| Direction | Increase OR decrease | DECREASE ONLY |
| Balance | Perfectly balanced | May be uneven (skipped) |
| Speed | Slower (shuffle) | Faster (no shuffle) |
| When to use | Before joins, on | After filter, before |
| skewed data | write to reduce files | |
| Column-based | YES (repartition by | NO |
Join Strategy Comparison
| Strategy | When | Shuffle | Sort | Speed |
|---|---|---|---|---|
| BroadcastHashJoin | Small + Any size | NO | NO | FASTEST |
| ShuffleHashJoin | Medium + Medium | YES | NO | Fast |
| SortMergeJoin | Large + Large (equi-join) | YES | YES | Slow |
| BroadcastNLJ | Non-equi join | Partial | NO | Slowest |
| CartesianJoin | No condition (CROSS) | YES | NO | Slowest |
AQE Features
| Feature | Problem Solved | Config |
|---|---|---|
| Coalesce Shuffle Partitions | Too many small partitions | coalescePartitions.enabled |
| Dynamic Join Strategy | SMJ → BHJ at runtime | (enabled with AQE) |
| Skew Join Optimization | Hot key tasks | skewJoin.enabled |