PySpark
PySpark Day 1 — Quick Recall: Architecture + RDD
PySpark · Section 3 of 8

PySpark Day 1 — Quick Recall: Architecture + RDD

PySpark Day 1 — Quick Recall: Architecture + RDD

Must know⚠️Trap🧠Memory map📝One-liner

🧠 MASTER MNEMONICS

🧠 SPARK ARCHITECTURE → "DCE"
SPARK ARCHITECTURE"DCE"
DDriver (orchestrates, runs main(), DAGScheduler lives here)
CCluster Manager (YARN / K8s / Standalone — resource allocation only)
EExecutors (JVM workers — store data + run tasks)
RDD PROPERTIES"RD-ILP"
RResilient (fault tolerant via lineage)
DDistributed (partitions across cluster)
IImmutable (never modified in place)
LLazily evaluated (DAG built, not executed until action)
PPartitioned (unit of parallelism)
DAG EXECUTION"DAG → Stages → Tasks"
DDAG built on every transformation
SStages broken at WIDE transformations (shuffle boundaries)
TTasks → one task per partition per stage
TRANSFORMATIONS"N-W" (Narrow-Wide)
NARROW: map, flatMap, filter, union, coalesce (no shuffle)
WIDE: groupBy, join, sort, distinct, repartition (shuffle = stage boundary)
STORAGE LEVELS"MDSRO"
MMEMORY_ONLY (default cache — RDD as Java objects, fast)
DDISK_ONLY (spill to disk)
SMEMORY_AND_DISK (spill overflow to disk)
RMEMORY_ONLY_SER (serialized — smaller memory, slower)
OOFF_HEAP (Tungsten managed memory)

SECTION 1: ARCHITECTURE FLASH CARDS

QWhat are the 3 components of Spark architecture?

Driver + Cluster Manager + Executors (DCE)

QWhat does the Driver do?

Runs main(), creates SparkSession/SparkContext, builds DAG, schedules stages/tasks, collects results

QWhat does the Cluster Manager do?

Allocates resources (containers/memory/cores) to the Spark application — does NOT run tasks

QWhat do Executors do?

Run tasks, store RDD partitions in memory/disk, send results back to Driver

QSparkContext vs SparkSession — difference?

🧠 Memory Map
SparkContextSpark 1.x entry point, RDD-only
SparkSessionSpark 2.x+ unified entry point, wraps SparkContext
→ spark.sparkContext gives you the underlying SC
QHow many Drivers per Spark application?

Exactly 1 Driver. Many Executors (typically 1 per worker node, can be multiple)

QWhat is DAGScheduler vs TaskScheduler?

DAGSchedulerConverts RDD lineage into stages (high-level)
TaskSchedulerSends tasks to Executors (low-level)
Both live inside the Driver.

SECTION 2: DAG + LAZY EVALUATION FLASH CARDS

QWhat is lazy evaluation?

Transformations only build a DAG (plan), NOT executed. Execution starts only when an ACTION is called.

QName 5 transformations (lazy)

map, flatMap, filter, groupByKey, join (+ any function that returns an RDD/DataFrame)

QName 5 actions (triggers execution)

collect(), count(), show(), take(N), saveAsTextFile() / write.save()

QWhy is lazy evaluation beneficial?

1. Catalyst can optimize the whole plan before running
2. Avoids unnecessary computation
3. Enables pipelining of narrow transformations
QWhat is a Stage?

A set of tasks that can run without a shuffle. Broken at wide transformation boundaries.

QWhat is a Task?

Smallest unit of work. One task per partition per stage. Runs on one Executor core.

QIf you have 200 partitions and 2 stages, how many tasks total?

200 (stage 1) + 200 (stage 2) = 400 tasks

SECTION 3: NARROW vs WIDE FLASH CARDS

QDefine Narrow transformation

Each partition of output depends on AT MOST ONE partition of input. No shuffle. Fast.

QDefine Wide transformation

Each partition of output depends on MULTIPLE partitions of input. Requires shuffle. Slow. Creates new stage.

QClassify these: map, groupBy, filter, join, union, repartition, coalesce
NARROW: map, filter, union, coalesce (fewer partitions)
WIDE: groupBy, join, repartition (more partitions = wide even if # doesn't increase)

⚠️ TRAP: coalesce is NARROW (reduces partitions without full shuffle). repartition is WIDE (full shuffle even if same count).

QWhy does a wide transformation create a new stage?

Spark must complete all tasks in the current stage (write shuffle files) before any task in the next stage can start reading those files.

SECTION 4: RDD FLASH CARDS

QWhat are the 5 properties of an RDD? (RD-ILP)

1. Resilient — rebuilds from lineage on failure
2. Distributed — partitions span multiple nodes
3. Immutable — transformations create new RDD
4. Lazy — only evaluated on action
5. Partitioned — parallel processing unit
Qmap() vs flatMap() — key difference?

python — editable
# map: 1 input → 1 output (same number of elements)
rdd.map(lambda x: x.split())      # [["hello","world"], ["foo","bar"]]

# flatMap: 1 input → 0 or more outputs (flattens one level)
rdd.flatMap(lambda x: x.split())  # ["hello", "world", "foo", "bar"]
QWhen to use mapPartitions() instead of map()?

When setup cost is high per-element (DB connection, ML model load). mapPartitions = one connection per partition vs one per row.

python — editable
def process_partition(iter):
    conn = create_db_connection()  # one connection per partition
    for row in iter:
        yield conn.query(row)
    conn.close()
rdd.mapPartitions(process_partition)
QreduceByKey vs groupByKey — which to use and WHY?

Always reduceByKey. Never groupByKey for aggregation.

reduceByKey:
1. Pre-aggregates locally on each mapper (combiner step)
2. Only aggregated values cross network
3. Much less shuffle data
groupByKey:
1. ALL values for ALL keys shuffle across network
2. Collects all values into memory (OOM risk)
3. Then you aggregate on the driver side
Rule: If you do groupByKey().mapValues(sum) → use reduceByKey instead
QWhen do you NEED groupByKey?

When you need ALL values (not just aggregated) — e.g., groupBy.mapValues(list) to collect full lists

QWhat is aggregateByKey? When to use?

Like reduceByKey but allows output type to differ from input type.

python — editable
# Computing average (output = float, input = int)
seqOp  = lambda acc, val: (acc[0] + val, acc[1] + 1)   # (sum, count)
combOp = lambda a1, a2: (a1[0]+a2[0], a1[1]+a2[1])     # merge accumulators
rdd.aggregateByKey((0, 0), seqOp, combOp)\
   .mapValues(lambda x: x[0]/x[1])  # → average

SECTION 5: PERSISTENCE FLASH CARDS

QWhat is the difference between cache() and persist()?

python — editable
df.cache()                              # = persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)   # explicit storage level

cache() is shorthand for persist() with default level.

QWhat are the 5 storage levels? (MDSRO)
🧠 Memory Map
MEMORY_ONLYFast, may recompute if evicted (default for cache())
MEMORY_AND_DISKSpill to disk if RAM full (safer)
DISK_ONLYSlowest, always on disk
MEMORY_ONLY_SERSerialized objects in RAM (smaller, slower to deserialize)
OFF_HEAPTungsten off-heap memory
QWhen to cache?

When a DataFrame/RDD is used 2+ times in different actions.

QWhen NOT to cache?

When used only once (wastes memory). When DataFrame is too large for memory.

QHow to release cache?
python — editable
df.unpersist()  # removes from memory/disk immediately
QRDD lineage — what is it?

The DAG of transformations that produced an RDD. Used to recompute lost partitions on failure.

QCheckpointing vs caching — difference?

🧠 Memory Map
CachingStores data in memory/disk (fast, within app lifetime)
→ Lineage is KEPT (can still recompute)
→ Lost when app restarts
CheckpointingWrites to HDFS/S3 (persistent, survives restarts)
→ Lineage is CUT (cannot recompute, relies on checkpoint)
→ Use when lineage is very long (iterative algorithms like PageRank)

SECTION 6: BROADCAST + ACCUMULATORS FLASH CARDS

QWhat is a Broadcast Variable?

An immutable variable sent to all Executors once (not per-task). Cached locally on each Executor.

python — editable
lookup = spark.sparkContext.broadcast({"A": 1, "B": 2})
rdd.map(lambda x: lookup.value.get(x, 0))  # access via .value
QWhy use Broadcast instead of a regular variable?

Regular variable is serialized and sent with EVERY task. Broadcast sends once per Executor using BitTorrent-like distribution.

QWhat is an Accumulator?

A shared counter/aggregator that tasks can ADD to, but only the Driver can READ.

python — editable
error_count = spark.sparkContext.accumulator(0)
def count_errors(row):
    if row.is_null: error_count.add(1)
    return row
rdd.foreach(count_errors)
print(error_count.value)  # only readable on Driver

⚠️ TRAP: Accumulators in transformations (map) may be counted multiple times due to retries. Use in foreach or actions for reliable counting.

QCan a task read an Accumulator value?

NO. Tasks can only ADD. Only the Driver can read the final value.

SECTION 7: WHAT-IF SCENARIOS

QExecutor OOM (OutOfMemoryError) — causes + fixes?
CAUSE: Too much data per partition, large shuffles, big RDD cached
FIX
1. Increase spark.executor.memory
2. Increase partitions: repartition(N) for larger N
3. Use MEMORY_AND_DISK instead of MEMORY_ONLY
4. Avoid groupByKeyuse reduceByKey
5. Filter data early (predicate pushdown)
QDriver OOM — causes + fixes?
CAUSE: collect() on large dataset, massive broadcast variable
FIX
1. Never collect() the full dataset — use take(100) for inspection
2. Use write() instead of collect() for output
3. Increase spark.driver.memory
QJob is slow — where do you look first?
1. Spark UIStages tab → skewed tasks (one task 10x slower than others)
2. Spark UISQL tab → physical plan (any Sort-Merge Join that should be Broadcast?)
3. spark.sql.shuffle.partitions = 200 (default) — too many for small data?
4. Is data cached when used multiple times?

🧠 DAY 1 ULTRA SUMMARY CARD

📐 Architecture Diagram
╔═══════════════════════════════════════════════════════════════╗
║         PYSPARK DAY 1 — ARCHITECTURE + RDD                    ║
╠═══════════════════════════════════════════════════════════════╣
║                                                               ║
║  ARCHITECTURE (DCE):                                          ║
║  Driver → DAGScheduler → TaskScheduler → Executors            ║
║  1 Driver per app | N Executors | CM = resource broker only   ║
║                                                               ║
║  DAG EXECUTION:                                               ║
║  Transformations = lazy (build DAG) | Actions = trigger       ║
║  Wide transform = stage boundary (shuffle required)           ║
║  1 Task = 1 Partition = 1 Core                                ║
║                                                               ║
║  NARROW vs WIDE:                                              ║
║  NARROW: map, flatMap, filter, union, coalesce (no shuffle)   ║
║  WIDE:   groupBy, join, sort, distinct, repartition (shuffle)  ║
║  coalesce = narrow | repartition = wide (full shuffle)        ║
║                                                               ║
║  RDD OPERATIONS:                                              ║
║  map()          → 1-to-1 transform                            ║
║  flatMap()      → 1-to-N transform (flattens)                 ║
║  mapPartitions()→ 1 setup per partition (DB conn, model)      ║
║  reduceByKey()  → LOCAL pre-agg + shuffle (PREFER THIS)       ║
║  groupByKey()   → ALL values shuffle (AVOID for aggregation)  ║
║  aggregateByKey → output type ≠ input type (avg = sum/count)  ║
║                                                               ║
║  PERSISTENCE (MDSRO):                                         ║
║  MEMORY_ONLY > MEMORY_AND_DISK > DISK_ONLY > MEMORY_ONLY_SER  ║
║  cache() = persist(MEMORY_AND_DISK) shorthand                 ║
║  Checkpoint = HDFS/S3, cuts lineage, survives restart         ║
║                                                               ║
║  BROADCAST + ACCUMULATORS:                                    ║
║  Broadcast → read-only, sent once per Executor (not per task) ║
║  Accumulator → write-only in tasks, read-only on Driver       ║
║  ⚠ Accumulator in map() → may double-count on retry          ║
║                                                               ║
║  TOP TRAPS:                                                   ║
║  1. groupByKey → use reduceByKey for aggregation              ║
║  2. collect() on large data → Driver OOM                      ║
║  3. repartition ≠ coalesce (repartition = full shuffle)       ║
║  4. Accumulator readable only on Driver                       ║
║  5. cache() before using 2+ times (not only once)             ║
║                                                               ║
╚═══════════════════════════════════════════════════════════════╝

WORD COUNT — Classic RDD Interview Code

python — editable
# The "Hello World" of Spark — know this cold!
rdd = sc.textFile("hdfs:///data/text")
word_count = (
    rdd
    .flatMap(lambda line: line.split())        # split each line into words
    .map(lambda word: (word, 1))               # (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)           # sum counts per word
    .sortBy(lambda x: x[1], ascending=False)   # sort by count descending
)
word_count.take(10)  # action — triggers execution

QUICK DECISION TABLE

🧠 SCENARIO → SOLUTION
SCENARIOSOLUTION
────────────────────────────────────────────────────────────────
Count occurrences of each keyreduceByKey(lambda a,b: a+b)
Compute average per keyaggregateByKey with (sum,count)
Get all values per key as listgroupByKey (rare, ok here)
Process partition with shared connmapPartitions()
Parse each line into multiple rowsflatMap()
Share lookup table across tasksBroadcast Variable
Count global errors across tasksAccumulator
Reuse expensive DF computationcache() or persist()
Long iterative algorithm (ML-like) → checkpoint() to cut lineage
Reduce partition count safelycoalesce() (no full shuffle)
Redistribute data evenlyrepartition(N) (full shuffle)