PySpark
Day 1: Spark Architecture + RDD — Deep Interview Guide
PySpark · Section 2 of 8

Day 1: Spark Architecture + RDD — Deep Interview Guide

Day 1: Spark Architecture + RDD — Deep Interview Guide

🧠 MASTER MEMORY MAP — Day 1

🧠 SPARK ARCHITECTURE = "DCE" (Driver → Cluster Manager → Executors)
SPARK ARCHITECTURE"DCE" (Driver → Cluster Manager → Executors)
DDriver: Brain of the app. Creates DAG, schedules tasks, ONE per app
CCluster Manager: Resource allocator (YARN/K8s/Mesos/Standalone)
EExecutors: JVM workers. Run tasks, hold cached data. Many per app
DAG FLOW"DAG → Stages → Tasks → Partitions"
DAG breaks at WIDE transformations (shuffle boundaries)
Each Stage = group of tasks (no shuffle within)
Each Task = one partition processed
LAZY EVALUATION"Plan first, execute only on ACTION"
Transformationsbuild DAG (nothing runs!)
Actions (collect, count, show, write) → trigger execution
RDD 5 PROPERTIES"RD-ILP"
RResilient: fault-tolerant via lineage
DDistributed: spread across cluster
IImmutable: never modified, transformations make new RDDs
LLazily evaluated
PPartitioned: split into partitions for parallelism
NARROW vs WIDE = "One-to-One vs Shuffle"
NARROW: each output partition from ONE input partition (map, filter, flatMap)
WIDE: output partitions from MULTIPLE input partitionsSHUFFLE! (groupBy, join, sort)
RDD TRANSFORMATIONS"MFR-GJU"
Mmap(), flatMap(), mapPartitions()
Ffilter()
RreduceByKey(), repartition()
GgroupByKey() (⚠️ AVOID — use reduceByKey!)
Jjoin(), cogroup()
Uunion(), distinct()

SECTION 1: SPARK ARCHITECTURE

Q1: What Is Apache Spark?

Definition (1 line): Apache Spark is an open-source, in-memory distributed computing engine designed for large-scale data processing across clusters of machines.

Simple Explanation: Before Spark, Hadoop MapReduce was the standard. But MapReduce writes intermediate results to disk after every step — read from disk, process, write to disk, repeat. For jobs that iterate over data many times (like ML training), this disk I/O made things painfully slow.

Spark solved this by keeping data in memory between processing steps. Instead of reading/writing to disk 10 times during an ML training loop, Spark reads once and keeps the data in RAM across all 10 iterations.

Real-world Analogy: Imagine cooking a recipe that requires 5 steps. MapReduce is like putting all ingredients back in the fridge after each step, then taking them out again for the next step. Spark is like keeping everything on the kitchen counter — you only go to the fridge once at the start and once at the end. The result? Up to 100x faster for iterative workloads.

Key facts to memorize:

  • Written in Scala, runs on the JVM
  • Supports Python (PySpark), Scala, Java, R, SQL
  • Processes batch (large datasets) and streaming (real-time data)
  • Can run on YARN, Kubernetes, Mesos, or Standalone cluster
  • Default since Spark 2.0: DataFrame API (not RDD) with Catalyst optimizer

Interview Tip: "Spark's core advantage over MapReduce is in-memory computation — intermediate data stays in RAM instead of being written to disk. This makes iterative algorithms like ML training up to 100x faster. For single-pass ETL jobs, the speedup is smaller (~2-3x) because disk I/O isn't the bottleneck."

What NOT to say: "Spark is 100x faster than Hadoop." — This is an oversimplification. It's 100x faster for iterative workloads because of in-memory caching. For simple one-pass jobs, the difference is much smaller. Saying "100x faster" without qualification signals shallow understanding.

Q2: Explain Spark Architecture — the 3 Components

Definition (1 line): Spark uses a master-worker architecture with three components: Driver (brain), Cluster Manager (resource allocator), and Executors (workers).

Simple Explanation: When you submit a Spark job, one machine becomes the Driver — it plans the work and tells others what to do. The Cluster Manager (like YARN) finds available machines and allocates resources. The Executors are the actual workers — they receive tasks from the Driver, process data, and send results back.

Real-world Analogy: Think of a construction project:

  • Driver = The architect who creates the blueprint, decides what gets built in what order, and coordinates everyone
  • Cluster Manager = The HR department that assigns workers from the labor pool to this specific project
  • Executors = The construction workers who actually build things, following the architect's blueprint

If the architect (Driver) leaves, the entire project stops. If one worker (Executor) gets sick, the architect reassigns that work to another worker.

🧠 Memory Map
DRIVER (ONE per application)
Runs your main() / SparkSession code
Converts your code → Logical Plan → DAG
Splits DAG into Stages (at shuffle boundaries)
Splits Stages into Tasks (one per partition)
Talks to Cluster Manager to get resources
Talks to Executors to send tasks and get results
Hosts SparkContext and SparkSession objects
⚠️If Driver dies → entire application fails!
⚠️Driver memory must hold the result of collect() — never collect() huge data!
CLUSTER MANAGER (External)
Allocates resources (CPU, RAM) for the application
Options: YARN (Hadoop), Kubernetes (cloud-native), Mesos, Standalone
Spark just asks "give me N executors with X cores and Y GB RAM"
CM manages competing applications on the same cluster
EXECUTORS (MANY per application)
JVM processes launched on worker nodes by Cluster Manager
Each executor has: N CPU cores + M GB RAM
Run tasks assigned by Driver (one task per core at a time)
Store cached/persisted data in their memory
Each application gets its OWN dedicated executors (isolation!)
If executor dies → tasks fail, Driver reschedules on other executors
📐 Architecture Diagram
SPARK CLUSTER DIAGRAM:

    ┌─────────────────────────────┐
    │         DRIVER NODE          │
    │  ┌──────────────────────┐   │
    │  │    SparkSession       │   │
    │  │    DAGScheduler       │   │
    │  │    TaskScheduler      │   │
    │  └──────────┬───────────┘   │
    └─────────────┼───────────────┘
                  │ Task assignments
    ┌─────────────▼───────────────┐
    │      CLUSTER MANAGER         │
    │  (YARN / Kubernetes / etc)   │
    └──────┬──────────────┬────────┘
           │              │
    ┌──────▼──┐    ┌──────▼──┐
    │EXECUTOR 1│    │EXECUTOR 2│
    │ Core 1   │    │ Core 1   │
    │ Core 2   │    │ Core 2   │
    │ Memory   │    │ Memory   │
    │ Task A   │    │ Task B   │
    └──────────┘    └──────────┘

What the interviewer is testing: Can you explain how the three components interact? Do you understand the flow — that the Driver plans, CM allocates resources, and Executors execute? Can you explain what happens when something fails?

What NOT to say: "The Cluster Manager runs the tasks." — No! The CM only allocates resources (CPU/memory). It's the Executors that run tasks, and the Driver that schedules them.

Q3: SparkContext vs SparkSession — What's the Difference?

Definition (1 line): SparkContext was Spark 1.x's entry point for RDD-only operations. SparkSession (Spark 2.0+) is the unified entry point that wraps SparkContext + SQLContext + HiveContext into one object.

Simple Explanation: In Spark 1.x, you needed different objects for different APIs — SparkContext for RDDs, SQLContext for DataFrames, HiveContext for Hive tables. This was confusing. Spark 2.0 introduced SparkSession as a single entry point for everything. You create one SparkSession and it gives you access to all APIs.

Code:

python — editable
# SparkContext (Spark 1.x) — entry point for RDD API
from pyspark import SparkContext
sc = SparkContext("local[*]", "MyApp")

# SparkSession (Spark 2.0+) — entry point for EVERYTHING
# Wraps SparkContext + SQLContext + HiveContext in one object
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("yarn") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "2") \
    .config("spark.sql.shuffle.partitions", "200") \
    .enableHiveSupport() \
    .getOrCreate()               # creates new or returns existing session

sc = spark.sparkContext          # access SparkContext from SparkSession

Interview Tip: "In modern Spark, always start with SparkSession. SparkContext is still there under the hood — you can access it via spark.sparkContext when you need RDD operations. But SparkSession is the recommended entry point."

Q4: What Is a DAG? How Does Spark Execute Your Code?

Definition (1 line): A DAG (Directed Acyclic Graph) is Spark's internal execution plan — a graph of all transformations your code performs, organized so Spark can optimize and execute them efficiently.

Simple Explanation: When you write Spark code, nothing executes immediately (lazy evaluation). Instead, Spark builds a DAG — a plan that records every transformation and how they depend on each other. When you call an action (like count() or write()), Spark looks at the entire DAG, optimizes it (removes unnecessary steps, reorders operations), and then executes it.

Real-world Analogy: Think of planning a road trip. You don't start driving after deciding the first stop — you plan the entire route first. Once you see the full route, you might realize you can skip a detour, combine two stops into one, or take a faster highway. That's exactly what Spark does with your code — it sees the entire plan before executing, so it can optimize the route.

How DAG becomes execution:

📋 Overview
DAGSTAGES → TASKS
Step 1: DAG — Spark records every transformation as a graph node
Step 2: STAGES — Spark cuts the DAG at shuffle boundaries (wide transformations)
Step 3: TASKS — Each stage is split into tasks (one task per partition)
Step 4: EXECUTION — Tasks run in parallel across executors
Example:
textFilefilter → flatMap → map → reduceByKey → sortByKey → collect()
▲ ▲ ▲
narrow ops WIDE (new stage) WIDE (new stage)
Result: 3 stages
Stage 1: textFilefilter → flatMap → map → write shuffle files
Stage 2: shuffle readreduceByKey → write shuffle files
Stage 3: shuffle readsortByKey → collect
If Stage 1 has 100 partitions100 tasks
Each task processes ONE partition
Tasks run IN PARALLEL across executors (up to num_cores tasks at once)

Key components inside the Driver:

ComponentWhat It DoesOperates At
DAGSchedulerBreaks DAG into stages at shuffle boundariesStage level
TaskSchedulerAssigns tasks to executors with data localityTask level
Catalyst OptimizerOptimizes the logical plan before executionQuery level

Interview Tip: "A DAG is Spark's execution plan. It breaks at wide transformation boundaries into stages. Each stage has one task per partition. Tasks run in parallel on executors. The DAGScheduler manages stage scheduling; the TaskScheduler manages individual task assignment to executors with data locality preference."

What NOT to say: "Spark processes data step by step." — No! Spark plans the ENTIRE DAG first, then executes. This whole-plan view is what enables optimizations like predicate pushdown and column pruning.

Q5: What Is Lazy Evaluation? Why Does Spark Use It?

Definition (1 line): Lazy evaluation means Spark does NOT execute transformations immediately — it only builds a plan (DAG). Execution only happens when you call an action (like count(), show(), write()).

Simple Explanation: When you write df.filter("age > 30"), Spark doesn't actually filter anything. It just notes: "When you eventually need results, I'll filter age > 30." Only when you call df.count() does Spark look at the entire chain of transformations, optimize them, and execute them all at once.

Why this is powerful (3 reasons):

  1. Optimization — Catalyst can see the ENTIRE plan before executing. It can push filters earlier, prune unnecessary columns, and choose the best join strategy.
  2. Fault Recovery — Since Spark "remembers" how to compute each step (lineage), it can recompute lost partitions from the source.
  3. Efficiency — Spark never computes data you don't need. df.filter(...).first() only reads ONE partition, not the entire dataset.

Code — See lazy evaluation in action:

python — editable
# EXAMPLE: Lazy evaluation in action
df = spark.read.csv("/huge/file.csv")       # ← NO execution (just reads metadata)
df2 = df.filter("age > 30")                 # ← NO execution (builds DAG node)
df3 = df2.select("name", "age")             # ← NO execution (builds DAG node)
df4 = df3.withColumn("age_group",           # ← NO execution (builds DAG node)
        when(col("age") > 60, "senior").otherwise("adult"))

# ONLY NOW does Spark execute — when you call an ACTION:
result = df4.count()    # ← TRIGGERS execution of the ENTIRE plan
df4.show()              # ← TRIGGERS execution again (recomputes from scratch unless cached!)

Transformations vs Actions — Quick Reference:

Transformations (Lazy — build DAG)Actions (Trigger execution)
filter(), select(), where()count(), show(), first()
map(), flatMap(), groupBy()collect(), take(n), top(n)
join(), union(), distinct()write.parquet(), write.csv()
withColumn(), drop(), agg()reduce(), foreach(), toPandas()

What NOT to say: "Lazy evaluation means Spark is slow to start." — No! It means Spark is smart about when to start. By waiting, it can see the full plan and optimize it. This makes execution FASTER, not slower.

Q6: What Is a Shuffle? Why Is It the Most Expensive Operation?

Definition (1 line): A shuffle is when Spark must redistribute data across the network — sending records from multiple input partitions to new output partitions — because the next operation needs data grouped by a different key.

Simple Explanation: Imagine you have 100 files of customer orders, each file containing orders from all states. Now someone asks: "Give me all orders grouped by state." You need to open all 100 files, pull out the California orders from each, and put them together. That data movement — pulling records from many sources and regrouping them — is a shuffle.

Why shuffles are expensive (4 costs):

🧠 Memory Map
A SHUFFLE INVOLVES
1. SERIALIZEConvert in-memory objects to bytes (CPU cost)
2. DISK WRITEWrite shuffle files to local disk (I/O cost)
3. NETWORKTransfer data across machines (network cost)
4. DESERIALIZEConvert bytes back to in-memory objects (CPU cost)
A single shuffle can take 80% of your job's total runtime!

Real-world Analogy: Shuffles are like mailing packages. If all your data is already in the right place (narrow transformation), you just process it — no mailing needed. But if you need to regroup data by a different key (wide transformation), you have to package it up, put it on a truck, drive it to the right destination, and unpack it. That's slow and expensive.

Narrow vs Wide Transformations

🧠 Memory Map
NARROW TRANSFORMATION (no shuffle)
Each output partition depends on exactly ONE input partition
Can be "pipelined" — executed without waiting for other partitions
No data movement across the network
No stage boundary
Examples:
map(f)apply function to each element
filter(f)keep elements matching condition
flatMap(f)map + flatten output
mapPartitions(f)apply function to entire partition at once
union()combine two RDDs (no shuffle)
sample()random sample
coalesce(n)reduce partitions (merges local partitions)
WIDE TRANSFORMATION (shuffle required)
Output partitions depend on MULTIPLE input partitions
Must wait for ALL map tasks to complete before reduce tasks start
Writes intermediate data to LOCAL DISK (shuffle files)
Creates a new STAGE boundary
Examples:
groupByKey()group all values by key (EXPENSIVE shuffle!)
reduceByKey()reduce locally then shuffle (smarter!)
groupBy()SQL-style group by
join()must shuffle both sides (unless broadcast)
repartition(n)full shuffle to redistribute
distinct()must shuffle to find unique values
sortBy() / sort()must shuffle to globally sort
cogroup()group multiple RDDs by key

Interview Tip: "Minimizing shuffles is the #1 performance optimization in Spark. I look for three things: Can I use reduceByKey instead of groupByKey? Can I broadcast the small table in a join? Can I pre-partition data to avoid shuffles in repeated operations?"

What NOT to say: "Shuffles are always bad." — Not always. Shuffles are necessary for correct results in operations like joins and aggregations. The goal is to minimize unnecessary shuffles, not avoid them entirely.

SECTION 2: RDD — RESILIENT DISTRIBUTED DATASET

Q7: What Is an RDD? Explain Its 5 Properties.

Definition (1 line): An RDD (Resilient Distributed Dataset) is Spark's fundamental data abstraction — an immutable, distributed collection of records that can be processed in parallel and recovered from failures using lineage.

Simple Explanation: An RDD is like a spreadsheet that's been split across multiple machines. Each machine holds a few rows (a partition). You can apply transformations to all partitions in parallel — filter rows, transform values, join with another RDD. If one machine crashes and loses its rows, Spark re-creates them by replaying the transformations from the original data (lineage).

Real-world Analogy: Think of an RDD like a recipe card for your data:

  • The recipe card tells you HOW to produce the data (lineage)
  • It doesn't store the data itself until you ask for it (lazy)
  • If you lose one batch (partition), re-run the recipe (recompute from lineage)
  • Multiple chefs (executors) can cook different portions simultaneously (parallel)

The 5 Properties (memorize "RD-ILP"):

PropertyWhat It MeansWhy It Matters
ResilientRecovers from failures via lineageNo data loss even if machines crash
DistributedData split across multiple nodesEnables parallel processing
ImmutableCannot be modified in placeSimplifies fault recovery (just replay)
Lazily evaluatedOnly computes on actionEnables optimization before execution
PartitionedData split into partitionsUnit of parallelism (1 task per partition)

Code — Creating RDDs:

python — editable
4 partitions
print(rdd.getNumPartitions())   # 4

# From a file
rdd = sc.textFile("hdfs:///path/to/file.txt", minPartitions=10)

# From another RDD (transformation)
filtered = rdd.filter(lambda x: x > 2)  # new RDD, not executed yet">sc = spark.sparkContext

# From a Python collection
rdd = sc.parallelize([1, 2, 3, 4, 5], numSlices=4)  # 4 partitions
print(rdd.getNumPartitions())   # 4

# From a file
rdd = sc.textFile("hdfs:///path/to/file.txt", minPartitions=10)

# From another RDD (transformation)
filtered = rdd.filter(lambda x: x > 2)  # new RDD, not executed yet

When to use RDD vs DataFrame:

Use RDD WhenUse DataFrame When
Low-level control over partitioningStructured data with columns
Unstructured data (text, logs)SQL-like operations (filter, join, agg)
Custom serialization neededWant Catalyst optimizer benefits
Fine-grained data manipulationPerformance-critical workloads

Interview Tip: "In modern Spark (2.0+), DataFrames are preferred over RDDs because they benefit from Catalyst optimization. But RDDs are still the foundation — every DataFrame operation compiles down to RDD operations internally. Understanding RDDs helps you debug performance issues and understand Spark's execution model."

What NOT to say: "RDD is deprecated." — RDDs are NOT deprecated. DataFrames are the recommended API for structured data, but RDDs are still fully supported and necessary for certain use cases (custom partitioning, unstructured data, low-level control).

Q8: Explain Key RDD Transformations with Code

Why this matters: Interviewers often ask you to write RDD-based code to test if you understand the fundamentals beyond the DataFrame API.

python — editable
# ─── map() — apply function to EACH element, one-to-one ───
rdd = sc.parallelize([1, 2, 3, 4])
doubled = rdd.map(lambda x: x * 2)
doubled.collect()   # [2, 4, 6, 8]

# ─── filter() — keep elements matching condition ───
evens = rdd.filter(lambda x: x % 2 == 0)
evens.collect()     # [2, 4]

# ─── flatMap() — one input → ZERO OR MORE outputs (flattened) ───
sentences = sc.parallelize(["hello world", "foo bar"])
words = sentences.flatMap(lambda s: s.split(" "))
words.collect()     # ['hello', 'world', 'foo', 'bar']
# map() would give: [['hello', 'world'], ['foo', 'bar']]  (nested lists!)
# flatMap() flattens: ['hello', 'world', 'foo', 'bar']     (flat list!)

# ─── mapPartitions() — function runs ON ENTIRE PARTITION (not row-by-row) ───
# MORE EFFICIENT for expensive initialization (DB connections, ML models)
def process_partition(iterator):
    # Setup once per partition (not once per row!)
    connection = create_db_connection()
    for record in iterator:
        yield connection.lookup(record)
    connection.close()

result = rdd.mapPartitions(process_partition)

mapPartitions vs map — Interview favorite:

Aspectmap()mapPartitions()
Function calledOnce per ELEMENTOnce per PARTITION
Setup costPaid for every elementPaid once per partition
Use whenSimple transformsExpensive setup (DB, HTTP, model loading)
MemoryLow (one element at a time)Higher (entire partition in memory)
python — editable
# ─── reduceByKey() vs groupByKey() — CRITICAL interview question ───

pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4)])

# reduceByKey — aggregates LOCALLY first, then shuffles partial results
total = pairs.reduceByKey(lambda a, b: a + b)
total.collect()     # [('a', 4), ('b', 6)]
# What happens: Each partition reduces locally, only small partial sums are shuffled

# groupByKey — shuffles ALL raw values, THEN groups them
grouped = pairs.groupByKey().mapValues(list)
grouped.collect()   # [('a', [1, 3]), ('b', [2, 4])]
# What happens: ALL individual values sent across network → much more data shuffled!

Why reduceByKey is better than groupByKey — visual explanation:

🧠 Memory Map
Dataset: [("a", 1), ("a", 1), ("a", 1), ("a", 1), ("a", 1)] (5 records for key "a")
Assume 2 partitions, each with ~2-3 records.
groupByKey flow:
Partition 1: ("a",1), ("a",1), ("a",1) ──shuffle──→ Reducer: ("a", [1,1,1,1,1]) → sum = 5
Partition 2: ("a",1), ("a",1) ──shuffle──→ 5 values sent over network!
reduceByKey flow:
Partition 1: ("a",1), ("a",1), ("a",1) → local reduce → ("a", 3) ──shuffle──→ ("a", 3+2) = 5
Partition 2: ("a",1), ("a",1) → local reduce → ("a", 2) ──shuffle──→ Only 2 values sent!
⚠️For a key with 1 million values:
groupByKey: sends 1,000,000 values over networkOOM risk!
reduceByKey: sends ~N values (one per partition) → fast and safe!
python — editable
# ─── aggregateByKey — when output type differs from input ───
# Use case: computing average (need sum AND count, not just sum)
pairs = sc.parallelize([("a", 1), ("b", 2), ("a", 3), ("b", 4)])

zero_value = (0, 0)  # (sum, count)
seq_func = lambda acc, val: (acc[0] + val, acc[1] + 1)     # within partition
comb_func = lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])  # across partitions

avg = pairs.aggregateByKey(zero_value, seq_func, comb_func) \
           .mapValues(lambda x: x[0] / x[1])
avg.collect()   # [('a', 2.0), ('b', 3.0)]

# ─── join() — join two key-value RDDs by key ───
rdd1 = sc.parallelize([("a", 1), ("b", 2)])
rdd2 = sc.parallelize([("a", "x"), ("b", "y")])
joined = rdd1.join(rdd2)
joined.collect()    # [('a', (1, 'x')), ('b', (2, 'y'))]

# ─── coalesce() vs repartition() ───
rdd.repartition(10)    # full shuffle, increases OR decreases to 10 equal partitions
rdd.coalesce(4)         # no full shuffle, reduces (merges adjacent partitions)
# RULE: Use coalesce() to DECREASE partitions (no shuffle)
#       Use repartition() to INCREASE partitions (requires shuffle)

Q9: What Are RDD Actions? List the Key Ones.

Definition (1 line): Actions are RDD operations that trigger execution of the DAG and return results to the Driver or write data to storage.

Simple Explanation: Transformations are lazy — they build a plan. Actions are eager — they say "execute this plan NOW and give me results." Every Spark program must end with at least one action, otherwise nothing actually runs.

python — editable
# Actions TRIGGER computation and return results to Driver
rdd = sc.parallelize([1, 2, 3, 4, 5])

rdd.collect()               # returns ALL elements as Python list ⚠️ OOM risk on large RDDs!
rdd.count()                 # count of elements → 5
rdd.first()                 # first element → 1
rdd.take(3)                 # first N elements → [1, 2, 3]
rdd.top(3)                  # top N (sorted descending) → [5, 4, 3]
rdd.takeSample(False, 3)    # 3 random samples (False = without replacement)
rdd.reduce(lambda a, b: a + b)    # aggregate all elements → 15
rdd.fold(0, lambda a, b: a + b)   # like reduce but with zero value → 15
rdd.foreach(print)          # apply function to each element (no return value)
rdd.saveAsTextFile("/path/")    # write each partition as a text file
rdd.countByValue()          # count occurrences of each unique value
rdd.countByKey()            # for key-value RDDs

⚠️ TRAP — collect() is dangerous:

python — editable
100 GB dataset:
result = df.collect()  # ← tries to send ALL 100 GB to Driver memory → OutOfMemoryError!

# Safe alternatives:
df.show(20)              # show first 20 rows (small data to Driver)
df.take(100)             # take 100 rows only
df.write.parquet("/out") # write to storage (no data to Driver)
df.count()               # just returns a number"># On a 100 GB dataset:
result = df.collect()  # ← tries to send ALL 100 GB to Driver memory → OutOfMemoryError!

# Safe alternatives:
df.show(20)              # show first 20 rows (small data to Driver)
df.take(100)             # take 100 rows only
df.write.parquet("/out") # write to storage (no data to Driver)
df.count()               # just returns a number

Q10: Explain RDD Persistence — cache() vs persist()

Definition (1 line): Persistence stores an RDD's computed data in memory/disk so it doesn't have to be recomputed every time it's used. cache() is a shortcut for persist() with a default storage level.

Simple Explanation: Without caching, every time you call an action on an RDD/DataFrame, Spark recomputes it from scratch — re-reading from disk, re-applying all transformations. If you use the same data multiple times (like training an ML model), caching saves the computed result so the second use is instant.

Real-world Analogy: Imagine you calculate a complex financial report from raw transaction data. Without caching, every time someone asks for a number from that report, you recalculate the entire thing from scratch. With caching, you calculate it once, pin it to the whiteboard, and everyone reads from the whiteboard.

Code:

python — editable
from pyspark import StorageLevel

# cache() = persist(MEMORY_AND_DISK) for DataFrames
# cache() = persist(MEMORY_ONLY) for RDDs
df.cache()

# persist() lets you choose storage level explicitly
rdd.persist(StorageLevel.MEMORY_ONLY)             # JVM heap, deserialized (fastest access)
rdd.persist(StorageLevel.MEMORY_AND_DISK)         # Memory first, spill to disk if full
rdd.persist(StorageLevel.MEMORY_ONLY_SER)         # Serialized in memory (more compact)
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)     # Serialized + disk spill
rdd.persist(StorageLevel.DISK_ONLY)               # Always on disk (slowest)
rdd.persist(StorageLevel.OFF_HEAP)                # Tungsten off-heap memory

# Unpersist when done (free up memory for other operations)
rdd.unpersist()

When to cache vs when NOT to:

Cache When ✓Don't Cache When ✗
Used MORE THAN ONCE in the same jobUsed only once
Expensive computation (complex joins, aggregations)Simple fast reads (small CSV)
Iterative algorithms (ML training loops)Data too large to fit in memory
Intermediate results reused in multiple branchesStreaming (data changes constantly)

⚠️ TRAP — cache() is LAZY:

python — editable
rdd.cache()              # ← nothing happens yet! Just marks for caching
rdd.count()              # ← NOW the RDD is computed AND cached
rdd.collect()            # ← reads from cache (fast!)

Interview Tip: "Cache is lazy — it only stores data on the first action after cache() is called. And cache() for DataFrames defaults to MEMORY_AND_DISK, not MEMORY_ONLY. This is a common trap question."

Q11: What Is RDD Lineage? What Is Checkpointing?

Definition:

  • Lineage: The chain of transformations that produced an RDD. Spark tracks this to recompute lost data.
  • Checkpointing: Saving an RDD to reliable storage (HDFS) and cutting the lineage chain.

Simple Explanation: Lineage is Spark's fault tolerance mechanism. Instead of replicating data (like HDFS does with 3 copies), Spark remembers HOW to produce the data. If a partition is lost, it replays the transformations from the source data.

But for very long chains (like 100 ML iterations), the lineage becomes dangerously long — recomputing from scratch would take forever. Checkpointing saves the data to disk and "cuts" the lineage, so recovery only needs to go back to the checkpoint.

Code:

python — editable
# View lineage of an RDD
rdd1 = sc.textFile("/data/logs.txt")
rdd2 = rdd1.filter(lambda line: "ERROR" in line)
rdd3 = rdd2.map(lambda line: line.split(","))
rdd4 = rdd3.map(lambda parts: (parts[0], parts[1]))
rdd5 = rdd4.reduceByKey(lambda a, b: a + b)

print(rdd5.toDebugString())
# Shows: rdd5 ← rdd4 ← rdd3 ← rdd2 ← rdd1 ← textFile(logs.txt)

# PROBLEM: After 100 ML iterations, lineage is 100 steps deep
# If one partition fails → recompute ALL 100 steps from scratch!

# SOLUTION: Checkpoint! Saves to HDFS, truncates lineage
sc.setCheckpointDir("hdfs:///checkpoints/")
rdd5.checkpoint()   # mark for checkpoint
rdd5.count()        # triggers: compute, save to HDFS, new lineage starts from checkpoint

# Best practice: cache THEN checkpoint (avoids computing twice)
rdd5.cache()
rdd5.checkpoint()
rdd5.count()   # computes once → stores in cache AND saves to HDFS

Lineage vs Checkpointing comparison:

AspectLineageCheckpointing
StorageIn memory (metadata only)On disk (HDFS/S3)
Recovery speedSlow for long chainsFast (read from disk)
Storage costZero (just metadata)Stores full data
When to useShort pipelinesLong iterative algorithms, streaming

Q12: What Are Broadcast Variables and Accumulators?

Definition:

  • Broadcast variable: A read-only variable efficiently shared with all executors (sent once, not per task).
  • Accumulator: A write-only variable that executors can add to, but only the Driver can read.

Simple Explanation: Normally, when your Spark function references a variable from the Driver, Spark serializes and sends a copy with EVERY task. If you have 1000 tasks and a 100 MB lookup table, that's 100 GB of unnecessary network traffic! Broadcast variables solve this by sending the data to each executor ONCE.

Accumulators solve the opposite problem — you want executors to report metrics back to the Driver (like counting bad records). Each executor adds to the counter, and the Driver reads the total.

Code:

python — editable
1000 tasks × 100 MB table = 100 GB network traffic!

# WITH broadcast (GOOD)
bc_codes = sc.broadcast(country_codes)
rdd.map(lambda code: bc_codes.value.get(code, "Unknown"))
# ^ Sent ONCE to each executor, all tasks on that executor share it

bc_codes.unpersist()    # release from executor memory
bc_codes.destroy()      # remove from ALL executors immediately"># ─── BROADCAST VARIABLES ───
# USE CASE: lookup tables, configuration data, ML model parameters

# WITHOUT broadcast (BAD for large lookups)
country_codes = {"US": "United States", "IN": "India", "GB": "United Kingdom"}
rdd.map(lambda code: country_codes.get(code, "Unknown"))
# ^ Python closure: serialized and sent with EVERY task!
# 1000 tasks × 100 MB table = 100 GB network traffic!

# WITH broadcast (GOOD)
bc_codes = sc.broadcast(country_codes)
rdd.map(lambda code: bc_codes.value.get(code, "Unknown"))
# ^ Sent ONCE to each executor, all tasks on that executor share it

bc_codes.unpersist()    # release from executor memory
bc_codes.destroy()      # remove from ALL executors immediately
python — editable
# ─── ACCUMULATORS ───
# USE CASE: counters (bad records, events, debug metrics)

error_count = sc.accumulator(0)
malformed_count = sc.accumulator(0)

def parse_line(line):
    try:
        parts = line.split(",")
        if len(parts) != 5:
            malformed_count.add(1)
            return None
        return parts
    except Exception:
        error_count.add(1)
        return None

results = rdd.map(parse_line).filter(lambda x: x is not None)
results.count()    # trigger action

print(f"Errors: {error_count.value}")
print(f"Malformed: {malformed_count.value}")

⚠️ TRAP — Accumulator double counting:

python — editable
# Accumulators in TRANSFORMATIONS can be counted MULTIPLE TIMES
# because Spark may retry failed tasks!
# If a map() task fails and retries → accumulator incremented AGAIN

# SAFE: Use accumulators inside foreach() (an action — runs exactly once per record)
rdd.foreach(lambda x: error_count.add(1) if x is None else None)

Interview Tip: "Broadcast variables are essential for broadcast joins — when one side of a join is small enough to fit in executor memory (typically <100 MB), broadcasting it avoids a full shuffle. This is one of the most impactful Spark optimizations."

SECTION 3: REAL-WORLD SCENARIOS — DAY 1

💡 Interview Tip
These are the questions that separate senior engineers from juniors. Interviewers want to see if you can diagnose and fix production problems.

Scenario 1: Executor OOM — what happened?

Situation: Your Spark job fails with ExecutorLostFailure + java.lang.OutOfMemoryError

How a senior engineer diagnoses this:

📋 Overview
STEP 1: Check Spark UIExecutors tab
Look at: GC Time column
If GC Time > 10-15% of total timememory pressure
STEP 2: Check partition sizes
Spark UIStages tab → click on the failing stage → look at task metrics
If one task processed 2 GB while others processed 50 MBDATA SKEW!
STEP 3: Check what's cached
Spark UIStorage tab
Are you caching data that's filling up executor memory?
FIXES (try in this order)
a) Increase executor memory: --executor-memory 8g
b) Increase shuffle partitions (smaller partitions = less memory per task):
spark.conf.set("spark.sql.shuffle.partitions", "400")
c) Use serialized storage if caching:
StorageLevel.MEMORY_AND_DISK_SER
d) Fix data skew with salting or AQE:
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
e) Use off-heap memory:
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "2g")

Scenario 2: Job slow — 199/200 tasks done, last task running forever

Situation: Stage progress shows 199/200 tasks completed in 2 minutes, but the last task has been running for 30 minutes.

Diagnosis: DATA SKEW! One key has far more values than others, so one partition is 10-100x larger.

python — editable
# FIX 1: Enable AQE skew join handling (Spark 3.0+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

# FIX 2: Manual salting (for groupBy/join on a skewed key)
from pyspark.sql.functions import rand, concat_ws, col

# Add a random salt (0-9) to the skewed key
df_salted = df.withColumn("salted_key",
    concat_ws("_", col("key"), (rand() * 10).cast("int")))

# Group by salted key → 10x more groups, distributes the load
result = df_salted.groupBy("salted_key").agg(sum("value"))

# Remove salt to get final result
result = result.withColumn("key", split(col("salted_key"), "_")[0]) \
               .groupBy("key").agg(sum("sum(value)"))

Interview Tip: "When I see one task taking much longer than others, my first thought is data skew. I check the Spark UI task metrics to confirm, then either enable AQE (Spark 3+) or manually salt the skewed key to distribute the load across more partitions."

Scenario 3: Driver OOM after collect()

Situation: Driver lost: java.lang.OutOfMemoryError right after a collect() call.

python — editable
# CAUSE: Someone called collect() on a DataFrame with millions of rows
result = huge_df.collect()   # ← sends ALL data to Driver memory → OOM!

# FIX — use alternatives:
huge_df.show(20)                          # show first 20 rows only
huge_df.take(100)                         # take 100 rows
huge_df.write.parquet("/output/path")     # write to storage (data stays on executors)
huge_df.toPandas()                        # ⚠️ also pulls ALL data to Driver — same risk!

# If you truly need data on Driver, increase memory:
# --driver-memory 16g

SECTION 4: CLASSIC INTERVIEW CODE — Word Count

This is the single most asked RDD coding question. You MUST be able to write it from memory.

python — editable
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("WordCount").getOrCreate()
sc = spark.sparkContext

# Step 1: Read text file → one string per line
text_rdd = sc.textFile("/path/to/text.txt")

# Step 2: Word count using RDD operations
word_count = (
    text_rdd
    .flatMap(lambda line: line.split(" "))      # split each line into words
    .filter(lambda word: word.strip() != "")    # remove empty strings
    .map(lambda word: (word.lower(), 1))        # normalize + create (word, 1) pairs
    .reduceByKey(lambda a, b: a + b)            # sum counts per word (local pre-agg!)
    .sortBy(lambda x: x[1], ascending=False)    # sort by count (most frequent first)
)

# Action: get top 10 words
top_10 = word_count.take(10)
for word, count in top_10:
    print(f"{word}: {count}")

Follow-up they'll ask: "Why reduceByKey instead of groupByKey + sum?"

Answer: reduceByKey does local pre-aggregation on each partition BEFORE shuffle. If the word "the" appears 5000 times on one partition, reduceByKey sends just ("the", 5000) over the network. groupByKey would send all 5000 individual ("the", 1) values — 5000x more data shuffled!

Follow-up they'll ask: "Now write this using DataFrames instead of RDDs."

python — editable
from pyspark.sql.functions import explode, split, lower, col

word_count_df = (
    spark.read.text("/path/to/text.txt")                    # DataFrame with one column 'value'
    .select(explode(split(col("value"), " ")).alias("word")) # split + flatten into rows
    .filter(col("word") != "")                              # remove empty strings
    .withColumn("word", lower(col("word")))                  # normalize to lowercase
    .groupBy("word")                                         # group by word
    .count()                                                 # count per group
    .orderBy(col("count").desc())                            # sort descending
)

word_count_df.show(10)

Interview Tip: "The DataFrame version is preferred in production because Catalyst can optimize it — for example, it can push down the filter to the data source and choose columnar execution. The RDD version gives you more control but misses these optimizations."