🧱
Databricks
File 01: Spark Architecture & Internals
🧱
🧱
Databricks · Section 2 of 17

File 01: Spark Architecture & Internals

File 01: Spark Architecture & Internals

Level: Senior/Lead (10+ years) — No basic questions Focus: Deep internals, debugging, trade-offs

SECTION 1: SPARK APPLICATION LIFECYCLE

Q1: Explain the complete lifecycle of a Spark application from spark-submit to job completion.

Answer:

  1. The driver process starts and creates a SparkContext/SparkSession
  2. SparkContext connects to the Cluster Manager (YARN/Mesos/K8s/Standalone)
  3. Cluster Manager allocates executor JVMs on worker nodes
  4. Driver converts user code into a logical plan (DAG of DataFrame operations)
  5. The Catalyst Optimizer optimizes the logical plan → physical plan
  6. The DAG Scheduler breaks the physical plan into stages at shuffle boundaries
  7. Each stage is broken into tasks (one per partition) by the Task Scheduler
  8. Tasks are serialized and sent to executors
  9. Executors run tasks, store results, and report back to the driver
  10. Results are collected or written to storage

Follow-up they'll ask: "What happens if the driver dies vs an executor dies?" → See Q5.

Q2: What is the difference between the DAG Scheduler and the Task Scheduler?

Answer:

AspectDAG SchedulerTask Scheduler
Operates atStage levelTask level (within a stage)
ResponsibilityComputes DAG of stages, identifies shuffle dependenciesAssigns tasks to executors with data locality
HandlesStage retries on fetch failuresIndividual task retries, speculative execution
InputLogical plan / physical planTaskSet (set of tasks for one stage)
LocalityN/APROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY

Q3: Explain narrow vs wide dependencies with examples. Why does this distinction matter?

Answer:

  • Narrow dependency: Each parent partition is used by at most ONE child partition

    • Examples: map, filter, union, coalesce (reduce only)
    • Can be pipelined within a single stage (no shuffle)
  • Wide dependency: Each parent partition may be used by MULTIPLE child partitions

    • Examples: groupByKey, reduceByKey, join (non-co-partitioned), repartition
    • Requires a shuffle → creates a new stage boundary

Why it matters: Wide dependencies trigger the most expensive operation in Spark — shuffle. Every shuffle means data serialization → disk write → network transfer → disk read → deserialization. Understanding this helps you minimize shuffles in your pipelines.

Q4: What is the Block Manager and how does it work?

Answer: BlockManager is the storage subsystem in each executor (and the driver). It manages:

  • Cached/persisted RDD/DataFrame partitions
  • Shuffle data (shuffle blocks)
  • Broadcast variable blocks
  • Task result blocks

Architecture:

  • MemoryStore: On-heap and off-heap memory
  • DiskStore: Local disk spillover
  • BlockManagerMaster (on driver): Tracks all block locations across the cluster
  • BlockTransferService (Netty-based): Fetches blocks from remote executors

When a shuffle reader needs a block from another executor, it queries BlockManagerMaster for the location, then uses BlockTransferService to fetch it.

Q5: What happens when a driver fails vs when an executor fails?

Answer:

Executor failure:

  • Driver detects via heartbeat timeout
  • Tasks on that executor are rescheduled on other executors
  • If the stage used shuffle output from the lost executor, those shuffle blocks must be recomputed
  • Cached RDD partitions on that executor are lost → recomputed on demand
  • The External Shuffle Service mitigates this (shuffle data survives executor death)

Driver failure:

  • Client mode: Entire application fails. No recovery.
  • Cluster mode: With spark.driver.supervise=true (Standalone) or YARN --max-app-attempts, the driver restarts, but ALL state is lost (SparkContext, accumulators, broadcast variables)
  • Structured Streaming: Can recover from driver failure using checkpointing (offsets + state are persisted)

SECTION 2: CATALYST OPTIMIZER

Q6: Explain the complete pipeline of Spark's Catalyst Optimizer.

Answer:

🧠 Memory Map
SQL/DataFrame API
1. PARSINGUnresolved Logical Plan (AST)
2. ANALYSISResolved Logical Plan
(Analyzer resolves table names, column names, data types using Catalog)
3. LOGICAL OPTIMIZATIONOptimized Logical Plan
Rule-based optimizations:
• Predicate pushdown
• Constant folding
• Column pruning (projection pushdown)
• Boolean simplification
• Null propagation
• Filter/projection combining
4. PHYSICAL PLANNINGPhysical Plan(s)
• Generates multiple candidate plans (e.g., SortMergeJoin vs BroadcastHashJoin)
• Cost model selects the best plan
• Uses table/column statistics if available (CBO)
5. CODE GENERATION (Tungsten) → Optimized Java Bytecode
• Whole-stage code generation
• Fuses operators into tight loops
• Avoids virtual function dispatch

Key insight for interviews: Catalyst is why DataFrame operations are faster than RDD operations — the optimizer can reason about the operations and optimize the entire plan globally.

Q7: What is Predicate Pushdown and Projection Pushdown? When do they NOT work?

Answer:

  • Predicate pushdown: Filters pushed as close to the data source as possible (into Parquet file metadata, JDBC WHERE clause). Reduces I/O dramatically.
  • Projection pushdown: Only required columns are read from source. Columnar formats (Parquet) benefit hugely — entire column chunks are skipped.

When they DON'T work:

  1. UDFs in filter conditions — Catalyst cannot reason about UDF internals, so predicates involving UDFs cannot be pushed down
  2. Complex nested column access — may not be pushed in all Spark versions
  3. Non-optimized data sources that don't support pushdown
  4. After a shuffle — predicates before the shuffle cannot be pushed past it

How to verify: Use df.explain(True) — look for PushedFilters in the scan node.

Q8: What is Adaptive Query Execution (AQE)? What problems does it solve?

Answer: AQE (default ON in Spark 3.x) re-optimizes the query plan at runtime based on actual shuffle statistics. It solves 3 problems:

1. Coalescing post-shuffle partitions:

  • If many shuffle partitions are tiny, AQE merges them
  • Config: spark.sql.adaptive.coalescePartitions.enabled=true
  • Eliminates the pain of tuning spark.sql.shuffle.partitions

2. Converting Sort-Merge Join → Broadcast Hash Join:

  • If one side of a join turns out to be small at runtime (< spark.sql.adaptive.autoBroadcastJoinThreshold)
  • Happens when compile-time statistics were wrong

3. Optimizing skew joins:

  • Detects skewed partitions at runtime
  • Splits the large partition and replicates the corresponding partition from the other side
  • Config: spark.sql.adaptive.skewJoin.enabled=true

Scenario question: "You set spark.sql.shuffle.partitions=200 but your 100 GB shuffle creates many tiny partitions. How does AQE help?" → AQE auto-coalesces the 200 partitions into fewer, larger partitions.

Q9: What is Whole-Stage Code Generation (CodeGen)?

Answer: Tungsten's whole-stage codegen collapses an entire stage of operators (filter → project → aggregate) into a single Java function.

Traditional Volcano/iterator model:

  • Each operator calls next() on its child
  • Virtual function dispatch per row
  • Poor CPU cache utilization
  • Branch prediction misses

With CodeGen:

  • Fuses operators into tight loops
  • No virtual function calls
  • Operates on raw memory (sun.misc.Unsafe)
  • Leverages CPU pipelining and L1/L2 cache
  • Can see generated code: df.queryExecution.debug.codegen()

What breaks codegen:

  • External sorts
  • Some joins with complex expressions
  • Python UDFs (completely bypass codegen)
  • Very large expressions (hit JVM method size limit)

Q10: What is Cost-Based Optimization (CBO)?

Answer: CBO uses table and column statistics to choose optimal plans.

Collect statistics:

sql
ANALYZE TABLE t COMPUTE STATISTICS;
ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c1, c2;

Statistics collected:

  • Table: row count, size in bytes
  • Column: distinct count, min, max, avg length, null count, histogram

What CBO affects:

  • Join strategy selection (broadcast vs sort-merge)
  • Join ordering in multi-table joins
  • Filter selectivity estimation

Enable:

spark.sql.cbo.enabled=true
spark.sql.cbo.joinReorder.enabled=true

SECTION 3: MEMORY MANAGEMENT

Q11: Explain Spark's Unified Memory Management model in detail.

Answer:

📐 Architecture Diagram
Executor JVM Heap
┌────────────────────────────────────────────┐
│ Reserved Memory (300 MB, fixed)             │
├────────────────────────────────────────────┤
│ User Memory                                │
│ (1 - spark.memory.fraction) * (heap-300 MB) │
│ Default: 40% of (heap - 300 MB)             │
│ Used for: UDF variables, RDD metadata      │
├────────────────────────────────────────────┤
│ Spark (Unified) Memory                     │
│ spark.memory.fraction * (heap - 300 MB)     │
│ Default: 60% of (heap - 300 MB)             │
│ ┌──────────────┬─────────────────┐         │
│ │  Execution   │    Storage      │         │
│ │  (shuffles,  │    (cached      │         │
│ │   joins,     │     data,       │         │
│ │   sorts)     │     broadcast)  │         │
│ │              │                 │         │
│ │ ← soft boundary, can borrow → │         │
│ └──────────────┴─────────────────┘         │
└────────────────────────────────────────────┘

Key rule: Execution can evict Storage (cached data), but Storage cannot evict Execution. This is because execution memory is critical (can't pause mid-computation), while cached data can be recomputed.

Q12: Explain all memory-related configurations and their relationship.

Answer:

python — editable
384 MB, 10% of executor.memory)
# Container size = executor.memory + memoryOverhead = 10g

# Spark memory fraction
spark.memory.fraction = 0.6            # 60% of (heap - 300 MB) for Spark
spark.memory.storageFraction = 0.5     # Initial split between execution and storage

# Off-heap (optional, avoids GC)
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = "4g"

# Driver
spark.driver.memory = "4g"
spark.driver.maxResultSize = "1g"      # Max size of collect() results"># Executor heap
spark.executor.memory = "8g"           # JVM heap

# Memory overhead (off-heap, for VM, NIO, etc.)
spark.executor.memoryOverhead = "2g"    # Default: max(384 MB, 10% of executor.memory)
# Container size = executor.memory + memoryOverhead = 10g

# Spark memory fraction
spark.memory.fraction = 0.6            # 60% of (heap - 300 MB) for Spark
spark.memory.storageFraction = 0.5     # Initial split between execution and storage

# Off-heap (optional, avoids GC)
spark.memory.offHeap.enabled = true
spark.memory.offHeap.size = "4g"

# Driver
spark.driver.memory = "4g"
spark.driver.maxResultSize = "1g"      # Max size of collect() results

Q13: What causes OOM errors? How do you debug driver OOM vs executor OOM?

Answer:

Driver OOM causes:

CauseSolution
collect() on large dataUse take(n), show(), or write to storage
Large broadcast variableCheck variable size, increase spark.driver.memory
Too many tasks (metadata)Reduce number of partitions
Accumulator resultsLimit accumulator usage
toPandas() on large DFUse Arrow + batch conversion

Executor OOM causes:

CauseSolution
Skewed partition (1 partition >> others)Salt keys, use AQE, broadcast join
Insufficient memory for shuffle/joinIncrease spark.executor.memory, increase partitions
UDF holding large objectsMove large objects to broadcast variables
Container killed by YARNIncrease spark.executor.memoryOverhead

Debugging checklist:

  1. Check Spark UI → Executors tab → check which executor died
  2. Check logs for OutOfMemoryError vs Container killed by YARN
  3. Check Tasks tab → sort by duration → look for skew
  4. Check GC time → if GC time > 10% of task time, memory pressure exists

Q14: What is Tungsten's memory management? What is sun.misc.Unsafe?

Answer: Tungsten manages memory outside the JVM garbage collector using sun.misc.Unsafe:

  • Direct memory allocation/deallocation (like C malloc/free)
  • Read/write raw bytes at memory addresses
  • No GC overhead for managed data

Tungsten stores data in a compact binary format: rows are serialized into byte arrays with:

  • Null bitmap
  • Fixed-length values (int, long, double)
  • Variable-length region (strings, arrays)

Benefits: No GC pauses, better cache locality, explicit memory management, smaller memory footprint.

SECTION 4: SHUFFLE DEEP DIVE

Q15: Explain the complete shuffle process in Spark.

Answer:

Map side (Shuffle Write):

  1. Each map task computes which reducer partition each record belongs to (hash or range partitioner)
  2. Records written to SortShuffleWriter (default since Spark 2.0)
  3. Data sorted by partition ID → written to a single data file + index file per map task
  4. Number of shuffle files = number of map tasks (NOT map × reduce)

Reduce side (Shuffle Read):

  1. Each reduce task fetches its partition from ALL map tasks via BlockStoreShuffleReader
  2. Uses ShuffleClient (Netty) for remote block fetching
  3. Data deserialized and optionally sorted (for sort-merge operations)

Why shuffle is expensive:

  • Disk I/O (write on map side, read on reduce side)
  • Network I/O (cross-executor data transfer)
  • Serialization/deserialization overhead
  • Can cause spill to disk if memory insufficient

Q16: What is spark.sql.shuffle.partitions and how do you tune it?

Answer:

  • Default: 200 (often too low for large data or too high for small data)
  • This controls the number of partitions after a shuffle (groupBy, join, repartition by column)

Tuning formula:

num_partitions = total_shuffle_data_size / target_partition_size
target_partition_size = 128 MB to 200 MB

Example: 50 GB shuffle data → 50 GB / 200 MB = 250 partitions

With AQE: Set this high (e.g., 2000) and let coalescePartitions merge small partitions automatically. This is the modern best practice.

Common mistake: Setting it to 200 for both a 1 GB dataset and a 1 TB dataset.

Q17: What is the External Shuffle Service and why is it critical?

Answer: A long-running auxiliary service on each worker node that serves shuffle files independently of executors.

Why critical:

  1. Dynamic allocation: Executors can be removed (scaled down) without losing their shuffle files. Without it, removing an executor means recomputing its shuffle output.
  2. Fault tolerance: If executor crashes, shuffle data still available
  3. Resource efficiency: Executors released between stages while shuffle output remains accessible

Config:

spark.shuffle.service.enabled=true
spark.dynamicAllocation.enabled=true

Q18: How do you handle the "Fetch Failed" exception?

Answer: FetchFailedException = reduce task couldn't fetch shuffle data from a map task's executor.

Causes:

  • Executor OOM/crash during or after shuffle write
  • Network issues (timeout, connection refused)
  • Disk failures
  • Long GC pauses making executor unresponsive

Default behavior: Spark retries the entire stage (spark.stage.maxConsecutiveAttempts)

Fixes:

  1. Enable External Shuffle Service
  2. Increase executor memory (prevent OOM during shuffle write)
  3. Tune spark.shuffle.io.maxRetries (default 3) and spark.shuffle.io.retryWait (default 5s)
  4. Reduce shuffle data volume (filter/project early)
  5. Check for data skew causing single-task OOM

Q19: Explain shuffle spill. How do you detect and minimize it?

Answer: Shuffle spill occurs when execution memory is exhausted during shuffle operations. Data spills from memory to disk.

Detect in Spark UI:

  • Stage detail page → Spill (Memory) and Spill (Disk) columns
  • Spill (Memory) > 0 indicates memory pressure

Minimize:

  1. Increase spark.executor.memory
  2. Increase spark.memory.fraction (more memory for Spark operations)
  3. Increase parallelism (more partitions = smaller data per task)
  4. Filter early / select only needed columns (reduce shuffle volume)
  5. Use mapPartitions instead of map (reduce object overhead)
  6. Consider off-heap memory

SECTION 5: SERIALIZATION

Q20: Compare all Spark serialization options with trade-offs.

Answer:

SerializationSpeedSizeUse Case
Java (default for RDD)SlowLargeAny Serializable class. Avoid for performance.
Kryo10x fasterCompactRDD operations. Must register classes.
Tungsten binaryFastestMost compactDataFrames/Datasets internally. Not configurable. Off-heap, no GC.
Apache ArrowVery fastColumnarPySpark ↔ Pandas conversion. Zero-copy. Enable with spark.sql.execution.arrow.pyspark.enabled=true

Kryo config:

python — editable
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.conf.set("spark.kryo.registrationRequired", "true")  # For best performance

Key interview point: DataFrames are always faster than RDDs because they use Tungsten binary format internally, bypassing Java/Kryo serialization entirely.

Q21: How does PySpark actually execute Python code? Explain the Py4J gateway architecture.

Answer:

📐 Architecture Diagram
┌─────────────────────┐      ┌─────────────────────┐
│   Python Driver     │      │    JVM Driver        │
│                     │ Py4J │                      │
│  PySpark API calls ─┼──────┼→ Java SparkContext   │
│                     │      │                      │
└─────────────────────┘      └──────────┬───────────┘
                                        │
                              ┌─────────┴──────────┐
                              │    Executors (JVM)  │
                              │  ┌─────────────────┐│
                              │  │ Python Worker    ││ ← Spawned for UDFs
                              │  │ (subprocess)     ││
                              │  └─────────────────┘│
                              └─────────────────────┘

How it works:

  1. Python SparkSession wraps a Java SparkSession via Py4J gateway
  2. DataFrame operations are translated to JVM calls → no Python on executors for native operations
  3. When a Python UDF is used:
    • Each executor spawns a Python worker process
    • Data serialized (pickle or Arrow) → sent via socket to Python worker
    • Python worker processes data → sends results back
    • This JVM↔Python boundary is the main performance overhead of PySpark vs Scala Spark

Key insight: Pure DataFrame/SQL operations in PySpark are just as fast as Scala because they run entirely in the JVM. The overhead only appears with Python UDFs.

SECTION 6: FAULT TOLERANCE & SPECULATION

Q22: How does Spark achieve fault tolerance?

Answer:

  • RDD lineage: Each RDD knows how to reconstruct itself from its parent. If a partition is lost, Spark replays the transformations to recreate it.
  • Checkpointing: Breaks lineage by saving data to reliable storage. Two types:
    • Reliable checkpoint: Saves to HDFS/S3 (fault-tolerant)
    • Local checkpoint: Saves to executor local storage (not fault-tolerant, faster)
  • Stage retry: If a fetch failure occurs, the entire stage is recomputed
  • Task retry: Individual tasks are retried on failure (default: 4 attempts, spark.task.maxFailures)
  • Structured Streaming: Checkpoints track offsets + state for exactly-once recovery

Q23: What is speculative execution? When should you enable vs disable it?

Answer: Spark re-launches slow tasks ("stragglers") on other executors and takes the result from whichever finishes first.

Config:

spark.speculation=true
spark.speculation.multiplier=3 # Task must be 3x slower than median
spark.speculation.quantile=0.75 # 75% of tasks must complete before speculation starts

Enable when:

  • Tasks have variable duration due to hardware issues or data locality
  • You're running on heterogeneous hardware
  • Network storage has variable latency

Disable when:

  • Tasks have side effects (non-idempotent writes) — speculation would write duplicates
  • Slowness is due to data skew — the re-launched task processes the same skewed partition
  • You're already using most of cluster resources

Q24: What are Accumulators and Broadcast variables? What are the pitfalls?

Answer:

Broadcast variables:

  • Read-only variables cached on each executor (not shipped with every task)
  • Use for large lookup tables
python — editable
lookup = spark.sparkContext.broadcast(large_dict)
df.filter(col("key").isin(lookup.value.keys()))
  • Pitfalls:
    • If too large → OOM on driver (collects before broadcasting)
    • Memory used = table_size × num_executors
    • Must .unpersist() or .destroy() manually

Accumulators:

  • Write-only variables "added" to by executors, readable only by driver
python — editable
counter = spark.sparkContext.accumulator(0)
rdd.foreach(lambda x: counter.add(1))
print(counter.value)
  • Critical pitfall: In transformations (not actions), accumulators may be incremented more than once if tasks are retried or stages re-executed. Only use accumulators inside actions for guaranteed exactly-once semantics.

SECTION 7: SCENARIO-BASED ARCHITECTURE QUESTIONS

Q25: Scenario — Your Spark job has 1000 tasks but only 100 executor cores. How does Spark schedule them?

Answer:

  • Spark doesn't run all 1000 tasks simultaneously
  • The Task Scheduler maintains a queue of pending tasks
  • It assigns tasks to available slots (one slot = one core) respecting data locality
  • When a task completes, the next pending task is scheduled on the freed slot
  • If spark.locality.wait (default 3s) expires and no local slot is available, the task is scheduled on a less-local slot
  • Throughput: ~100 tasks running at any time, with 10 waves of tasks

Q26: Scenario — You're running a join between a 500 GB and a 50 MB table. The job is doing a Sort-Merge Join. What's wrong?

Answer: The 50 MB table should trigger a Broadcast Hash Join (default threshold is 10 MB), but it's not happening. Possible reasons:

  1. Statistics are wrong/missing: Spark doesn't know the table is only 50 MB

    • Fix: ANALYZE TABLE small_table COMPUTE STATISTICS
    • Or: Set spark.sql.autoBroadcastJoinThreshold=52428800 (50 MB)
  2. The 50 MB is the size AFTER filters but Spark uses pre-filter size for planning

    • Fix: AQE will detect this at runtime and switch to broadcast
  3. Column statistics missing: Without CBO, Spark uses file size

    • Fix: Force broadcast: df1.join(broadcast(df2), "key")

Q27: Scenario — Your job has 200 tasks, 199 finish in 2 minutes, but 1 task takes 45 minutes. What's happening?

Answer: This is classic data skew — one partition has disproportionately more data.

How to confirm:

  • Spark UI → Stage detail → sort tasks by duration
  • Check Input Size/Records for the slow task vs others
  • Check Shuffle Read Size for the slow task

Fix options (in order of preference):

  1. Enable AQE skew join optimization: spark.sql.adaptive.skewJoin.enabled=true
  2. Salting: Add random salt to the skewed key, replicate the other side
  3. Broadcast join: If the other table fits in memory
  4. Isolate-and-union: Process skewed keys separately, union results
  5. Pre-aggregate: If possible, reduce data before the skewed operation

Q28: Scenario — Your Spark job reads from S3 and is 3x slower than reading from HDFS. Why?

Answer: S3 is an object store, not a filesystem. Key differences:

  1. List operations are slow: S3 list is O(n) and requires multiple API calls. HDFS metadata is in-memory on NameNode.
  2. No data locality: With HDFS, tasks run on the node where data resides (PROCESS_LOCAL). With S3, all reads are remote (ANY).
  3. High latency per request: Each S3 GET has ~50-100 ms latency vs ~1 ms for HDFS
  4. No append: S3 doesn't support append; each write creates a new object
  5. Eventual consistency (historically): Though S3 is now strongly consistent for PUTs

Optimizations for S3:

python — editable
256 MB
spark.conf.set("spark.sql.files.openCostInBytes", "0")  # Combine small files aggressively">spark.conf.set("spark.hadoop.fs.s3a.connection.maximum", "200")
spark.conf.set("spark.sql.files.maxPartitionBytes", "268435456")  # 256 MB
spark.conf.set("spark.sql.files.openCostInBytes", "0")  # Combine small files aggressively
  • Use Delta Lake (reduces list operations via transaction log)
  • Use Auto Loader (file notification mode avoids listing)

Q29: Scenario — You have an iterative ML algorithm that runs 100 iterations. After iteration 50, the job gets extremely slow and eventually fails with StackOverflow. Why?

Answer: The lineage graph is growing with each iteration. After 100 iterations, the DAG has 100 levels of dependencies. When Spark tries to compute the plan or recover a partition, it traverses this deep lineage → StackOverflowError.

Fix: Checkpoint every N iterations:

python — editable
spark.sparkContext.setCheckpointDir("/checkpoint/path")

for i in range(100):
    rdd = rdd.map(transform_function)
    if i % 10 == 0:
        rdd.checkpoint()
        rdd.count()  # Force materialization

Or for DataFrames:

python — editable
if i % 10 == 0:
    df = df.checkpoint()  # Truncates lineage

Q30: What is spark.sql.files.maxPartitionBytes and spark.sql.files.openCostInBytes?

Answer:

  • maxPartitionBytes (default 128 MB): Maximum size of a partition when reading files. Spark splits large files into partitions of this size.
  • openCostInBytes (default 4 MB): Estimated cost of opening a file. Used to decide when to combine small files into one partition. If you have many tiny files, reducing this to 0 forces more aggressive file combining.

Example: 10,000 files of 1 KB each:

  • With openCostInBytes=4 MB: Each file is treated as 4 MB → 10,000 partitions (too many!)
  • With openCostInBytes=0: Files are combined until reaching maxPartitionBytes → much fewer partitions

SECTION 8: QUICK-FIRE QUESTIONS (Common in Phone Screens)

Q31: Transformation vs Action?

Answer: Transformations are lazy (return new RDD/DF, not computed until action). Actions trigger computation (return value or write to storage). Examples: map/filter = transformation; collect/count/write = action.

Q32: reduceByKey vs groupByKey?

Answer: reduceByKey does a map-side combine (local aggregation before shuffle). groupByKey shuffles ALL data first, then aggregates. reduceByKey always preferred — less data transferred.

Q33: DataFrame vs Dataset vs RDD?

Answer: RDD = low-level, no optimization. DataFrame = distributed table, Catalyst-optimized, schema-aware. Dataset = typed DataFrame (Scala/Java only). In PySpark, DataFrame = Dataset[Row]. Always prefer DataFrame over RDD.

Q34: Why lazy evaluation?

Answer: Enables Catalyst to see the full plan before executing → allows global optimizations (predicate pushdown, join reordering, column pruning). Without lazy evaluation, each operation would execute independently.

Q35: What triggers a new stage?

Answer: A wide dependency (shuffle). Each shuffle boundary creates a new stage. All narrow transformations are pipelined within a single stage.

Q36: spark.sql.shuffle.partitions vs spark.default.parallelism?

Answer: shuffle.partitions (default 200) = for DataFrame shuffle operations. default.parallelism = for RDD operations (default = total cores). They are independent settings.

Q37: What is a task, stage, job?

Answer:

  • Job = one action (e.g., count(), write())
  • Stage = set of tasks that can run in parallel without shuffle
  • Task = one unit of work on one partition

Q38: Explain data locality levels.

Answer: PROCESS_LOCAL (data in same JVM) > NODE_LOCAL (same node, different JVM) > NO_PREF (no preference) > RACK_LOCAL (same rack) > ANY (any node). Spark waits spark.locality.wait (3s) before downgrading.

Q39: How does explain() help?

Answer: Shows logical and physical plans. df.explain(True) shows all 4 phases: Parsed → Analyzed → Optimized → Physical. df.explain("cost") includes CBO statistics. Look for: join strategy, predicate pushdown, partition pruning, codegen nodes.

Q40: What is Dynamic Partition Pruning (DPP)?

Answer: (Spark 3.0+) When a fact table joins with a filtered dimension table, DPP pushes the dimension filter result into the fact table scan at runtime.

Without DPP: Full scan of fact_sales → join → filter With DPP: Spark first computes filtered dim_date IDs → uses them to prune fact_sales partitions during scan

sql
-- DPP kicks in automatically for this pattern:
SELECT * FROM fact_sales f
JOIN dim_date d ON f.date_id = d.id
WHERE d.year = 2024;

Config: spark.sql.optimizer.dynamicPartitionPruning.enabled=true (default in Spark 3.x)