PySpark Day 1 — Quick Recall: Architecture + RDD
🧠 MASTER MNEMONICS
SECTION 1: ARCHITECTURE FLASH CARDS
Driver + Cluster Manager + Executors (DCE)
Runs main(), creates SparkSession/SparkContext, builds DAG, schedules stages/tasks, collects results
Allocates resources (containers/memory/cores) to the Spark application — does NOT run tasks
Run tasks, store RDD partitions in memory/disk, send results back to Driver
Exactly 1 Driver. Many Executors (typically 1 per worker node, can be multiple)
SECTION 2: DAG + LAZY EVALUATION FLASH CARDS
Transformations only build a DAG (plan), NOT executed. Execution starts only when an ACTION is called.
map, flatMap, filter, groupByKey, join (+ any function that returns an RDD/DataFrame)
collect(), count(), show(), take(N), saveAsTextFile() / write.save()
A set of tasks that can run without a shuffle. Broken at wide transformation boundaries.
Smallest unit of work. One task per partition per stage. Runs on one Executor core.
200 (stage 1) + 200 (stage 2) = 400 tasks
SECTION 3: NARROW vs WIDE FLASH CARDS
Each partition of output depends on AT MOST ONE partition of input. No shuffle. Fast.
Each partition of output depends on MULTIPLE partitions of input. Requires shuffle. Slow. Creates new stage.
⚠️ TRAP: coalesce is NARROW (reduces partitions without full shuffle). repartition is WIDE (full shuffle even if same count).
Spark must complete all tasks in the current stage (write shuffle files) before any task in the next stage can start reading those files.
SECTION 4: RDD FLASH CARDS
# map: 1 input → 1 output (same number of elements)
rdd.map(lambda x: x.split()) # [["hello","world"], ["foo","bar"]]
# flatMap: 1 input → 0 or more outputs (flattens one level)
rdd.flatMap(lambda x: x.split()) # ["hello", "world", "foo", "bar"]
When setup cost is high per-element (DB connection, ML model load). mapPartitions = one connection per partition vs one per row.
def process_partition(iter):
conn = create_db_connection() # one connection per partition
for row in iter:
yield conn.query(row)
conn.close()
rdd.mapPartitions(process_partition)
Always reduceByKey. Never groupByKey for aggregation.
When you need ALL values (not just aggregated) — e.g., groupBy.mapValues(list) to collect full lists
Like reduceByKey but allows output type to differ from input type.
# Computing average (output = float, input = int)
seqOp = lambda acc, val: (acc[0] + val, acc[1] + 1) # (sum, count)
combOp = lambda a1, a2: (a1[0]+a2[0], a1[1]+a2[1]) # merge accumulators
rdd.aggregateByKey((0, 0), seqOp, combOp)\
.mapValues(lambda x: x[0]/x[1]) # → average
SECTION 5: PERSISTENCE FLASH CARDS
df.cache() # = persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY) # explicit storage level
cache() is shorthand for persist() with default level.
When a DataFrame/RDD is used 2+ times in different actions.
When used only once (wastes memory). When DataFrame is too large for memory.
df.unpersist() # removes from memory/disk immediately
The DAG of transformations that produced an RDD. Used to recompute lost partitions on failure.
SECTION 6: BROADCAST + ACCUMULATORS FLASH CARDS
An immutable variable sent to all Executors once (not per-task). Cached locally on each Executor.
lookup = spark.sparkContext.broadcast({"A": 1, "B": 2})
rdd.map(lambda x: lookup.value.get(x, 0)) # access via .value
Regular variable is serialized and sent with EVERY task. Broadcast sends once per Executor using BitTorrent-like distribution.
A shared counter/aggregator that tasks can ADD to, but only the Driver can READ.
error_count = spark.sparkContext.accumulator(0)
def count_errors(row):
if row.is_null: error_count.add(1)
return row
rdd.foreach(count_errors)
print(error_count.value) # only readable on Driver
⚠️ TRAP: Accumulators in transformations (map) may be counted multiple times due to retries. Use in foreach or actions for reliable counting.
NO. Tasks can only ADD. Only the Driver can read the final value.
SECTION 7: WHAT-IF SCENARIOS
🧠 DAY 1 ULTRA SUMMARY CARD
╔═══════════════════════════════════════════════════════════════╗ ║ PYSPARK DAY 1 — ARCHITECTURE + RDD ║ ╠═══════════════════════════════════════════════════════════════╣ ║ ║ ║ ARCHITECTURE (DCE): ║ ║ Driver → DAGScheduler → TaskScheduler → Executors ║ ║ 1 Driver per app | N Executors | CM = resource broker only ║ ║ ║ ║ DAG EXECUTION: ║ ║ Transformations = lazy (build DAG) | Actions = trigger ║ ║ Wide transform = stage boundary (shuffle required) ║ ║ 1 Task = 1 Partition = 1 Core ║ ║ ║ ║ NARROW vs WIDE: ║ ║ NARROW: map, flatMap, filter, union, coalesce (no shuffle) ║ ║ WIDE: groupBy, join, sort, distinct, repartition (shuffle) ║ ║ coalesce = narrow | repartition = wide (full shuffle) ║ ║ ║ ║ RDD OPERATIONS: ║ ║ map() → 1-to-1 transform ║ ║ flatMap() → 1-to-N transform (flattens) ║ ║ mapPartitions()→ 1 setup per partition (DB conn, model) ║ ║ reduceByKey() → LOCAL pre-agg + shuffle (PREFER THIS) ║ ║ groupByKey() → ALL values shuffle (AVOID for aggregation) ║ ║ aggregateByKey → output type ≠ input type (avg = sum/count) ║ ║ ║ ║ PERSISTENCE (MDSRO): ║ ║ MEMORY_ONLY > MEMORY_AND_DISK > DISK_ONLY > MEMORY_ONLY_SER ║ ║ cache() = persist(MEMORY_AND_DISK) shorthand ║ ║ Checkpoint = HDFS/S3, cuts lineage, survives restart ║ ║ ║ ║ BROADCAST + ACCUMULATORS: ║ ║ Broadcast → read-only, sent once per Executor (not per task) ║ ║ Accumulator → write-only in tasks, read-only on Driver ║ ║ ⚠ Accumulator in map() → may double-count on retry ║ ║ ║ ║ TOP TRAPS: ║ ║ 1. groupByKey → use reduceByKey for aggregation ║ ║ 2. collect() on large data → Driver OOM ║ ║ 3. repartition ≠ coalesce (repartition = full shuffle) ║ ║ 4. Accumulator readable only on Driver ║ ║ 5. cache() before using 2+ times (not only once) ║ ║ ║ ╚═══════════════════════════════════════════════════════════════╝
WORD COUNT — Classic RDD Interview Code
# The "Hello World" of Spark — know this cold!
rdd = sc.textFile("hdfs:///data/text")
word_count = (
rdd
.flatMap(lambda line: line.split()) # split each line into words
.map(lambda word: (word, 1)) # (word, 1) pairs
.reduceByKey(lambda a, b: a + b) # sum counts per word
.sortBy(lambda x: x[1], ascending=False) # sort by count descending
)
word_count.take(10) # action — triggers execution