PySpark Day 3 — Quick Recall: Optimization + Performance
🧠 MASTER MNEMONICS
SECTION 1: CATALYST FLASH CARDS
Moving filter conditions as close to the data source as possible (before joins/aggregations). For Parquet: skips row groups. For JDBC: adds WHERE clause to SQL sent to DB.
Catalyst reads only the columns referenced in the query. Parquet (columnar) makes this very efficient — unneeded columns are never deserialized from disk.
df.explain("formatted") # shows all 4 plan phases
df.explain("cost") # shows with CBO cost estimates
# Look for: PushedFilters in FileScan = pushdown working
# Look for: BroadcastHashJoin = correct join chosen
SECTION 2: TUNGSTEN FLASH CARDS
Spark's low-level execution engine for memory + CPU efficiency.
Eliminates virtual function call overhead per row (e.g., calling filter() then map() on every row). Instead, Spark generates a single compiled Java method that processes all operators in one loop → ~10x CPU efficiency.
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")
SECTION 3: REPARTITION vs COALESCE FLASH CARDS
| Property | repartition(N) | coalesce(N) |
|---|---|---|
| Type | WIDE (full shuffle) | NARROW (no shuffle) |
| Direction | Up or down | DOWN ONLY |
| Balance | Perfectly even | May be uneven |
| Speed | Slow (shuffle cost) | Fast (local merge) |
| By column? | YES (.repartition(N,col)) | NO |
⚠️ TRAP: coalesce(1) = single partition = single writer = bottleneck for large data. Use repartition(small_N) instead.
SECTION 4: JOIN STRATEGIES FLASH CARDS
SECTION 5: AQE FLASH CARDS (MOST IMPORTANT!)
Adaptive Query Execution — Spark 3.0+, default ON in Spark 3.2+. Re-optimizes query plans at runtime using actual statistics (not estimates).
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
Not completely. AQE helps with partition coalescing, join switching, and skew — but you still need to: tune executor memory, choose repartition/coalesce, add broadcast hints for non-stats-based decisions, and handle the small files problem after writes.
SECTION 6: DATA SKEW FLASH CARDS
Salting works for JOIN skew (one key appears too much on one side). It appends a random number (0-9) to the hot key in the large table, then replicates the matching small table 10 times with each salt value. The hot key is distributed across 10 tasks instead of 1.
SECTION 7: CACHE + PERSIST FLASH CARDS
df.cache() # = MEMORY_AND_DISK (Spark 3.x)
df.persist() # same as cache()
df.persist(StorageLevel.MEMORY_ONLY) # fast, recomputes on eviction
df.persist(StorageLevel.MEMORY_AND_DISK) # spills to disk, safer
df.persist(StorageLevel.DISK_ONLY) # slowest, always disk
df.persist(StorageLevel.MEMORY_ONLY_SER) # smaller memory, slower access
df.persist(StorageLevel.OFF_HEAP) # Tungsten off-heap
df.unpersist() # release cache
| Property | Cache | Checkpoint |
|---|---|---|
| Storage | Executor mem/disk | HDFS / S3 (persistent) |
| Lineage | KEPT | CUT |
| Scope | App lifetime only | Survives restarts |
| Use when | Reuse in same job | Long iterative (cut DAG) |
SECTION 8: SPARK CONFIGS FLASH CARDS
spark.executor.memory # JVM heap for tasks + storage
spark.executor.memoryOverhead # off-heap (Python UDFs, native)
spark.driver.memory # Driver JVM heap (collect() lives here)
spark.driver.maxResultSize # max data sent to Driver (2g default)
spark.memory.fraction # % of heap for Spark (default 0.6 = 60%)
spark.memory.storageFraction # % of Spark memory for cache (0.5 = 50%)
spark.sql.shuffle.partitions = "200" # number of post-shuffle partitions
spark.shuffle.compress = "true" # compress shuffle data (default: true)
spark.io.compression.codec = "snappy" # codec: snappy (fast) vs lz4 vs zstd
When an Executor is released, it takes its shuffle files with it. The External Shuffle Service (running on YARN NodeManager) keeps shuffle files available after the Executor is gone. Without it, releasing executors breaks downstream stages.
SECTION 9: SPARK UI READING FLASH CARDS
Stages tab → click a Stage → "Task Duration" summary table. If max >> median (e.g., max=500s, median=5s) → skew.
SQL tab → click a query → expand physical plan. Look for BroadcastHashJoin node. If you see SortMergeJoin instead → opportunity to add broadcast hint.
Memory pressure. Too many Java objects in heap → GC runs too long. Fix: reduce executor memory fraction, use G1GC, use Kryo, use serialized storage level for cache, or add more memory.
A shuffle operation. Each Exchange = data moves across the network. Minimize Exchanges = minimize shuffles = faster job.
SQL tab → physical plan → FileScan parquet node should show PushedFilters: [EqualTo(...)]. If PushedFilters is empty, pushdown is not working.
SECTION 10: SMALL FILES FLASH CARDS
128 MB per file)
n_parts = max(1, estimated_gb * 1024 // 128)
df.repartition(n_parts).write.parquet("output/")
# 3. Delta OPTIMIZE (compact existing small files)
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date)")"># 1. Coalesce before write
df.coalesce(10).write.parquet("output/")
# 2. Repartition by target file size (128 MB per file)
n_parts = max(1, estimated_gb * 1024 // 128)
df.repartition(n_parts).write.parquet("output/")
# 3. Delta OPTIMIZE (compact existing small files)
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date)")
PYSPARK ULTRA CHEAT SHEET — ALL 3 DAYS
╔═════════════════════════════════════════════════════════════════════════╗
║ PYSPARK — ALL 3 DAYS CHEAT SHEET ║
╠═════════════════════════════════════════════════════════════════════════╣
║ ║
║ DAY 1: ARCHITECTURE + RDD ║
║ ───────────────────────────────────────────────────────────────────── ║
║ Architecture (DCE): Driver + Cluster Manager + Executors ║
║ DAG → Stages (at shuffle) → Tasks (1 per partition) ║
║ Narrow: map,flatMap,filter,union,coalesce → NO shuffle ║
║ Wide: groupBy,join,sort,distinct,repartition → shuffle+new stage ║
║ ║
║ RDD Properties (RD-ILP): Resilient|Distributed|Immutable|Lazy|Part ║
║ map() → 1-to-1 | flatMap() → 1-to-N (flattens) ║
║ mapPartitions() → 1 call per partition (DB conn, model load) ║
║ reduceByKey → LOCAL pre-agg + shuffle (PREFER) ║
║ groupByKey → ALL values shuffle (AVOID for aggregation) ║
║ aggregateByKey → output type ≠ input type (avg = sum/count) ║
║ ║
║ Persistence (MDSRO): ║
║ MEMORY_ONLY > MEMORY_AND_DISK > DISK_ONLY > MEMORY_ONLY_SER ║
║ cache() = persist(MEMORY_AND_DISK) ║
║ Broadcast = send once per Executor (not per task) ║
║ Accumulator = add-only in tasks, read-only on Driver ║
║ Checkpoint = HDFS, cuts lineage, survives restart ║
║ ║
║ DAY 2: DATAFRAME + SPARKSQL ║
║ ───────────────────────────────────────────────────────────────────── ║
║ Read (CPJ-OAD): CSV|Parquet|JSON|ORC|Avro|Delta ║
║ Always explicit StructType in prod (no inferSchema) ║
║ Multiple files: glob("path/*/"), list, directory, unionByName ║
║ input_file_name() → track source file ║
║ JDBC: numPartitions + partitionColumn (else 1 thread!) ║
║ ║
║ Window (POF): partitionBy().orderBy().rowsBetween() ║
║ ROW_NUMBER: unique | RANK: gaps | DENSE_RANK: no gaps ║
║ Top-N: DENSE_RANK | Dedup: ROW_NUMBER = 1 ║
║ LAG/LEAD: first row=NULL, last row=NULL ║
║ ║
║ UDF Speed: Built-in >> Pandas UDF (Arrow) >> Python UDF ║
║ Pandas UDF: batch-based, vectorized, Arrow zero-copy, 3-100x faster ║
║ UDFs are opaque to Catalyst → no optimization! ║
║ ║
║ Joins: inner|left|right|full|cross|left_anti|left_semi ║
║ left_anti = NOT IN | left_semi = EXISTS (no right cols) ║
║ Write modes (OACEI): overwrite|append|ignore|error ║
║ partitionBy → directories | bucketBy → hash for join opt ║
║ explode → drops null rows | explode_outer → keeps null rows ║
║ ║
║ DAY 3: OPTIMIZATION ║
║ ───────────────────────────────────────────────────────────────────── ║
║ Catalyst (ALPC): Analysis→Logical Opt→Physical→CodeGen ║
║ Predicate pushdown: filter early, NO UDFs in filter, no computed cols ║
║ Column pruning: Parquet reads only referenced columns ║
║ Tungsten (OCW): Off-heap|Cache-aware|Whole-stage CodeGen ║
║ ║
║ repartition(N): WIDE, any direction, even, by column ║
║ coalesce(N): NARROW, down only, faster, may be uneven ║
║ shuffle.partitions: tune to 1 per 100-200 MB shuffled data ║
║ ║
║ Joins (BSS-BC fastest→slowest): ║
║ BroadcastHash(<threshold/hint) → ShuffleHash → SortMerge ║
║ → BroadcastNestedLoop(non-equi) → Cartesian(CROSS) ║
║ Broadcast hint: large_df.join(broadcast(small_df), "key") ║
║ SMJ default for large-large equi-joins ║
║ ║
║ AQE (CDS) — enable: spark.sql.adaptive.enabled=true ║
║ C = Coalesce: merge tiny post-shuffle partitions automatically ║
║ D = Dynamic: SMJ→BHJ at runtime if table shrinks after filter ║
║ S = Skew: detect+split hot partitions, duplicate other side ║
║ ║
║ Skew solutions (LABS): ║
║ L=AQE auto | A=broadcast hint | B=salt keys | S=2-phase agg ║
║ ║
║ Key Configs: ║
║ executor.memory + memoryOverhead | driver.memory | shuffle.partitions ║
║ autoBroadcastJoinThreshold=50m | adaptive.enabled=true ║
║ dynamicAllocation.enabled=true (needs shuffle.service.enabled=true) ║
║ serializer=KryoSerializer (for RDD-heavy shuffle jobs) ║
║ ║
╠═════════════════════════════════════════════════════════════════════════╣
║ ║
║ TOP 10 INTERVIEW TRAPS: ║
║ 1. groupByKey → use reduceByKey (pre-aggregation = less shuffle) ║
║ 2. RANK ≠ DENSE_RANK: top-N with ties → DENSE_RANK ║
║ 3. collect() on large data → Driver OOM ║
║ 4. coalesce cannot increase partitions (use repartition) ║
║ 5. inferSchema=True → full extra scan (always explicit in prod) ║
║ 6. UDF in filter = no predicate pushdown ║
║ 7. JDBC without numPartitions = 1 thread (serial read) ║
║ 8. explode drops null rows (use explode_outer) ║
║ 9. Accumulator in map() may double-count on retries ║
║ 10. Dynamic allocation needs External Shuffle Service (or files lost) ║
║ ║
╠═════════════════════════════════════════════════════════════════════════╣
║ ║
║ DEBUGGING SLOW JOBS — WHERE TO LOOK: ║
║ Stages tab → max task >> median → SKEW ║
║ SQL tab → SortMergeJoin on small table → add broadcast hint ║
║ Executors tab → GC > 10% → memory pressure, tune G1GC ║
║ SQL plan → Exchange = shuffle = network cost = minimize ║
║ SQL plan → no PushedFilters → predicate pushdown broken ║
║ ║
╚═════════════════════════════════════════════════════════════════════════╝
QUICK DECISION TABLE — OPTIMIZATION
PRODUCTION CONFIG TEMPLATE
50 MB (default 10 MB)
# Serialization
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
# Dynamic Allocation
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
.config("spark.shuffle.service.enabled", "true") \
# GC
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
.getOrCreate()">spark = SparkSession.builder \
.appName("ProductionETL") \
# Memory
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "1g") \
.config("spark.driver.memory", "4g") \
.config("spark.driver.maxResultSize", "2g") \
# AQE (most important!)
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m") \
.config("spark.sql.adaptive.skewJoin.enabled", "true") \
# Shuffle
.config("spark.sql.shuffle.partitions", "400") \ # tune for your data
.config("spark.io.compression.codec", "snappy") \
# Joins
.config("spark.sql.autoBroadcastJoinThreshold", "50m") \ # 50 MB (default 10 MB)
# Serialization
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
# Dynamic Allocation
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.minExecutors", "2") \
.config("spark.dynamicAllocation.maxExecutors", "50") \
.config("spark.shuffle.service.enabled", "true") \
# GC
.config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
.getOrCreate()