🧱
Databricks
Advanced Spark Operations & Coding
🧱
🧱
Databricks · Section 7 of 18

Advanced Spark Operations & Coding

Advanced Spark Operations & Coding

💡 Interview Tip
Focus: DataFrame API, Joins, Windows, Streaming, UDFs, Coding Challenges Approach: Every topic starts with simple explanation → then interview-level depth

MEMORY MAP: PYSPARK OPERATIONS → JAWS-UC

🧠 PYSPARK OPERATIONS → JAWS-UC
PYSPARK OPERATIONSJAWS-UC
JJoins (5 types + 4 physical strategies)
AAggregations (groupBy, window, pivot, cube)
WWindow Functions (row_number, rank, lag/lead)
SStreaming (Structured Streaming + Auto Loader)
UUDFs (Python UDF, Pandas UDF, why UDFs are slow)
CCoding Patterns (dedup, SCD, top-N, gap detection)

SECTION 1: DATAFRAME API DEEP DIVE

Q1: What are the differences between select(), withColumn(), and selectExpr()? When is each appropriate?

Simple Explanation: Think of a spreadsheet. You have many columns and you want to transform or pick certain ones.

  • select() = "Give me ONLY these columns" — like highlighting specific columns and copying them to a new sheet
  • withColumn() = "Keep everything, but add/change ONE column" — like inserting a new column into the existing sheet
  • selectExpr() = "Give me these columns, but let me write SQL for them" — like using formulas in Excel

Technical details:

  • select(): Projects specific columns. Accepts Column objects or strings. Use when you want a subset of columns or need multiple transformations.
  • withColumn(): Adds or replaces a single column. Returns the full DataFrame with the new/modified column.
  • selectExpr(): Like select() but accepts SQL expression strings. Quick for ad-hoc: selectExpr("*", "col1 + col2 as sum_col").
python — editable
# select() — pick and transform columns
df.select("name", "age", (col("salary") * 1.1).alias("new_salary"))
# ← Returns ONLY name, age, new_salary (everything else is dropped)

# withColumn() — add/modify ONE column, keep everything
df.withColumn("new_salary", col("salary") * 1.1)
# ← Returns ALL original columns + new_salary

# selectExpr() — SQL expressions as strings
df.selectExpr("*", "salary * 1.1 as new_salary", "UPPER(name) as name_upper")
# ← Handy for quick SQL-style transforms without importing functions

Sample data flow:

Original: | name | age | salary |
| Alice | 30 | 50000 |
select(): | name | age | new_salary | ← only selected columns
| Alice | 30 | 55000 |
withColumn(): | name | age | salary | new_salary | ← ALL columns + new one
| Alice | 30 | 50000 | 55000 |

Interview Tip: If asked "when do you use each?", say: "select() for projections and multiple transforms in one shot, withColumn() for adding a single column while keeping everything, and selectExpr() when you want quick SQL expressions without importing functions."

What NOT to Say: "They're all the same." They have very different impacts on the logical plan and performance (see Q2).

Q2: Why is chaining multiple withColumn() calls a performance anti-pattern? What's the fix?

  1. Simple Explanation: Imagine you're giving instructions to a builder. Instead of saying "Build a house with 3 rooms, 2 bathrooms, and a kitchen" (one instruction), you say "Build room
  2. 1Now add room
  3. 2Now add room
  4. 3Now add bathroom 1..." — each instruction creates a new blueprint that wraps around the previous one. After 50 instructions, the builder is drowning in 50 nested blueprints.

That's exactly what happens inside Spark's query planner. Each withColumn() creates a new Project node in the logical plan. 50 calls = 50 nested Project nodes that Catalyst must crawl through.

Technical details:

Each withColumn() creates a new Project node in the logical plan. Chaining 50+ calls creates a deeply nested plan that Catalyst must analyze and optimize:

  • Extremely slow query planning (minutes)
  • Possible StackOverflowError during plan traversal

Visual: What the logical plan looks like

BAD — 4 withColumn() calls create 4 nested Project nodes:
Project [*, d = ...] ← withColumn("d", ...)
Project [*, c = ...] ← withColumn("c", ...)
Project [*, b = ...] ← withColumn("b", ...)
Project [*, a = ...] ← withColumn("a", ...)
Scan table
GOOD — 1 select() call creates 1 Project node:
Project [*, a = ..., b = ..., c = ..., d = ...] ← single select()
Scan table

Bad:

python — editable
df = df.withColumn("a", expr("..."))  # ← creates Project node 1
df = df.withColumn("b", expr("..."))  # ← creates Project node 2 wrapping node 1
df = df.withColumn("c", expr("..."))  # ← creates Project node 3 wrapping node 2
# ... 50 more times → StackOverflowError or minutes of planning time

Good:

python — editable
df = df.select(
    "*",
    expr("...").alias("a"),  # ← all transforms in ONE Project node
    expr("...").alias("b"),
    expr("...").alias("c"),
    # all at once
)

Or using functools.reduce:

python — editable
from functools import reduce
transforms = [("a", expr("...")), ("b", expr("...")), ("c", expr("..."))]
df = reduce(lambda d, t: d.withColumn(t[0], t[1]), transforms, df)
# ← Still creates nested nodes but cleaner code. For extreme cases, use select().

Interview Tip: This is a VERY common question. Show you know the internal reason (nested Project nodes in the logical plan), not just "it's slow." If you can say "I've seen this cause StackOverflowError in production with 100+ columns," that's even better.

What NOT to Say: "withColumn is always bad." It's fine for 1-5 columns. The anti-pattern is chaining 50+ calls.

Q3: Explain all types of joins in PySpark and their physical implementations.

Simple Explanation: A join combines two tables based on a matching key — like matching a guest list (Table A) with a seating chart (Table B) to figure out who sits where. Different join types answer different questions about what happens when someone is on one list but not the other.

Join Types — Visual with Actual Data:

Let's use two small tables:

emp_idnamedept_iddept_iddept_name
1Alice1010Engineering
2Bob2020Marketing
3Charlie3040Finance
4DianaNULL

Each join type — what comes out:

🧠 Memory Map
INNER JOIN (only matches)
| emp_id | name | dept_id | dept_name |
| 1 | Alice | 10 | Engineering | ← both tables have dept 10
| 2 | Bob | 20 | Marketing | ← both tables have dept 20
# Charlie (dept 30) dropped — no match in departments
# Diana (NULL) dropped — NULL never matches
# Finance (dept 40) dropped — no match in employees
LEFT OUTER JOIN (all from left, match from right)
| emp_id | name | dept_id | dept_name |
| 1 | Alice | 10 | Engineering |
| 2 | Bob | 20 | Marketing |
| 3 | Charlie | 30 | NULL | ← no dept 30 in rightNULL
| 4 | Diana | NULL | NULL | ← NULL keyno match → NULL
RIGHT OUTER JOIN (all from right, match from left)
| emp_id | name | dept_id | dept_name |
| 1 | Alice | 10 | Engineering |
| 2 | Bob | 20 | Marketing |
| NULL | NULL | 40 | Finance | ← no emp with dept 40NULLs
FULL OUTER JOIN (everything from both sides)
| emp_id | name | dept_id | dept_name |
| 1 | Alice | 10 | Engineering |
| 2 | Bob | 20 | Marketing |
| 3 | Charlie | 30 | NULL | ← left only
| 4 | Diana | NULL | NULL | ← left only (NULL key)
| NULL | NULL | 40 | Finance | ← right only
LEFT SEMI JOIN (left rows that HAVE a match — like SQL "IN")
| emp_id | name | dept_id |
| 1 | Alice | 10 | ← dept 10 exists in departments
| 2 | Bob | 20 | ← dept 20 exists in departments
# Notice: NO columns from right table appear
LEFT ANTI JOIN (left rows that have NO match — like SQL "NOT IN")
| emp_id | name | dept_id |
| 3 | Charlie | 30 | ← dept 30 NOT in departments
| 4 | Diana | NULL | ← NULL NOT in departments
CROSS JOIN (cartesian product — every row x every row)
| emp_id | name | dept_id | dept_name |
| 1 | Alice | 10 | Engineering |
| 1 | Alice | 10 | Marketing |
| 1 | Alice | 10 | Finance |
| 2 | Bob | 20 | Engineering |
... (4 employees × 3 departments = 12 rows total)

Physical Implementations (How Spark Actually Executes the Join):

StrategyWhen UsedShuffle?Notes
Broadcast Hash Join (BHJ)One side < 10 MB (default)NOFastest. Small side broadcast to all executors.
Sort-Merge Join (SMJ)Both sides large, equi-joinYESDefault for large-large. Both sides sorted by join key.
Shuffle Hash JoinOne side significantly smallerYESHash table built from smaller side per partition.
Broadcast Nested Loop (BNLJ)Non-equi join, one side smallNOBroadcast small side, nested loop.
Cartesian ProductCross join or non-equi, both largeYESExtremely expensive. Avoid if possible.

Decision Tree — Which join strategy does Spark pick?

🗂️Which join strategy does Spark pick?
One side < 10 MB? → BROADCAST HASH JOIN (fastest)
Both large + equi-join + keys sorted? → SORT MERGE JOIN (default for large)
Both large + equi-join + keys not sorted? → SHUFFLE HASH JOIN
Non-equi join? → BROADCAST NESTED LOOP or CARTESIAN

Analogies to remember:

  • Broadcast join = Teacher distributing handouts to every student (small table sent everywhere). No students need to move — the handout comes to them.
  • Sort Merge join = Two people merging sorted card decks. Both decks are sorted by number, you walk through both simultaneously matching pairs. Fast, but you need to sort first.
  • Shuffle = Moving furniture between apartments (expensive!). Data has to physically move across the network to land on the right executor.

Force a broadcast:

python — editable
from pyspark.sql.functions import broadcast
result = large_df.join(broadcast(small_df), "key")
# ← Forces Spark to broadcast small_df even if statistics say otherwise

Interview Tip: Draw the decision tree on the whiteboard. Interviewers love seeing you reason about which physical strategy Spark will pick. Mention that you can check the strategy with df.explain() — the plan will say "BroadcastHashJoin" or "SortMergeJoin."

What NOT to Say: "I always use broadcast join because it's faster." Broadcast join can cause driver OOM if the table is too large. You need to understand WHEN each strategy applies.

Q4: What is the default broadcast join threshold? What are the pitfalls of broadcast joins?

Simple Explanation: Spark automatically broadcasts a table if it thinks the table is smaller than 10 MB. The idea is: "If this table is tiny, just send a copy to every executor — no need to shuffle the big table." But this can go wrong in several ways.

Think of it like sending a printed copy of a document to every employee in the building. If the document is 2 pages, great. If it turns out to be 200 pages, you've just overwhelmed the print room (the driver).

Technical details:

Default: spark.sql.autoBroadcastJoinThreshold = 10 MB (10485760 bytes)

Pitfalls:

  1. Statistics can be wrong — Spark uses file size, not post-filter size. A 1 GB table filtered down to 1 MB? Spark still sees 1 GB and won't broadcast.
  2. Driver OOM — Driver collects the broadcast table before sending. If the table is bigger than expected, the driver crashes.
  3. Memory per executor — Total memory = table_size x num_executors (broadcast replicated everywhere). A 9 MB table with 100 executors = 900 MB of cluster memory used.
  4. Dynamic size — Table that was 5 MB yesterday might be 500 MB tomorrow.

When NOT to broadcast:

  • When the "small" table size is unpredictable
  • When the table is actually large after transformations
  • When the driver has limited memory
python — editable
# Check what Spark thinks the table size is
df.explain(True)  # ← look for "Statistics(sizeInBytes=...)" in the plan

# Disable broadcast entirely if causing issues
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# Or increase threshold if you have big executors
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "100m")

Interview Tip: If asked about broadcast joins, always mention the driver OOM risk. This shows production experience — it's a classic gotcha that only people who've debugged real pipelines know about.

What NOT to Say: "I set the broadcast threshold to 1 GB for everything." This is dangerous and shows you don't understand the driver memory implications.

Q5: How do you handle skewed data in a join? Explain ALL techniques.

Simple Explanation: Data skew means one key has MUCH more data than others. Imagine a post office where 90% of all mail goes to one zip code. That one mail carrier is overwhelmed while the others are idle.

In Spark, this means one partition has millions of rows while others have thousands. The one overloaded partition becomes the bottleneck — the entire job waits for it to finish.

Analogy: Shuffle = Moving furniture between apartments. Skew = 99% of the furniture goes to one apartment while 99 other apartments get almost nothing. That one apartment is overloaded and takes forever.

Technique 1: Salting (Most Common)

python — editable
10 partitions

# Step 2: Replicate the small side for each salt value
small_df = small_df.withColumn(
    "salt", explode(array([lit(i) for i in range(salt_buckets)]))
)
# ← Each row in small_df is copied 10 times (one for each salt value)

# Step 3: Join on key + salt
result = large_df.join(small_df, ["key", "salt"]).drop("salt")
# ← The hot key is now spread across 10 partitions instead of 1">from pyspark.sql.functions import lit, rand, floor, explode, array, col

salt_buckets = 10

# Step 1: Salt the large (skewed) side — add random number 0-9 to the key
large_df = large_df.withColumn("salt", floor(rand() * salt_buckets).cast("int"))
# ← Key "USA" becomes "USA_0", "USA_1", ..., "USA_9" — splits hot key into 10 partitions

# Step 2: Replicate the small side for each salt value
small_df = small_df.withColumn(
    "salt", explode(array([lit(i) for i in range(salt_buckets)]))
)
# ← Each row in small_df is copied 10 times (one for each salt value)

# Step 3: Join on key + salt
result = large_df.join(small_df, ["key", "salt"]).drop("salt")
# ← The hot key is now spread across 10 partitions instead of 1

Before salting vs after:

🧠 Memory Map
BEFORE (skewed)
Partition 0: key="USA"10 million rows ← BOTTLENECK
Partition 1: key="UK"1,000 rows
Partition 2: key="JP"1,000 rows
AFTER (salted with 10 buckets)
Partition 0: key="USA", salt=01 million rows ← evenly split!
Partition 1: key="USA", salt=11 million rows
...
Partition 9: key="USA", salt=91 million rows
Partition 10: key="UK", salt=01,000 rows

Technique 2: AQE Skew Join (Easiest)

python — editable
256 MB">spark.conf.set("spark.sql.adaptive.enabled", "true")          # ← enable AQE
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")  # ← enable auto skew handling
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
# ← a partition is "skewed" if it's 5x larger than the median partition
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
# ← AND the partition is larger than 256 MB

Technique 3: Isolate-and-Union

python — editable
# Step 1: Identify skewed keys (from analysis of your data)
skewed_keys = ["key1", "key2"]

# Step 2: Process skewed keys with broadcast join (fast, no shuffle)
skewed_result = large_df.filter(col("key").isin(skewed_keys)) \
    .join(broadcast(small_df), "key")

# Step 3: Process non-skewed keys normally (sort-merge join)
normal_result = large_df.filter(~col("key").isin(skewed_keys)) \
    .join(small_df, "key")

# Step 4: Combine both results
result = skewed_result.union(normal_result)

Technique 4: Two-Phase Aggregation (for groupBy skew)

python — editable
from pyspark.sql.functions import concat, lit, floor, rand, split, sum as _sum

# Phase 1: Partial aggregation with salt (splits hot keys)
salted = df.withColumn("salted_key", concat(col("key"), lit("_"), floor(rand() * 100).cast("string")))
partial = salted.groupBy("salted_key").agg(_sum("value").alias("partial_sum"))
# ← "USA" is now "USA_0" through "USA_99" — 100 partial sums

# Phase 2: Remove salt, final aggregation
result = partial.withColumn("key", split(col("salted_key"), "_")[0]) \
    .groupBy("key").agg(_sum("partial_sum").alias("total_sum"))
# ← Recombine "USA_0"..."USA_99" back into "USA" with final sum

Interview Tip: Start with "In Databricks, I'd first enable AQE which handles skew automatically. For extreme cases, I'd use salting." This shows you know the modern approach AND the manual technique.

What NOT to Say: "I'd just increase the number of partitions." Repartitioning doesn't fix skew — if one key has 90% of data, it still lands on one partition regardless of how many partitions you have.

SECTION 2: WINDOW FUNCTIONS

Q6: Explain window functions. What's the difference between row_number(), rank(), and dense_rank()?

Simple Explanation: Window functions let you do calculations WITHIN groups without collapsing the groups. Think of it as: "Within each department, rank employees by salary" — you still see every employee, but each one now has a rank number.

Analogy: Window functions = "Within each group, rank/number the rows." Like a class of students — you want to rank each student within their own class, not across the whole school.

Technical details:

python — editable
from pyspark.sql import Window
from pyspark.sql.functions import row_number, rank, dense_rank

w = Window.partitionBy("department").orderBy(col("salary").desc())
# ← "Within each department, order by salary highest first"

Step-by-step data walkthrough:

📋 Overview
INPUT DATA
| name | department | salary |
|---------|-----------|--------|
| Alice | Eng | 100 |
| Bob | Eng | 90 |
| Charlie | Eng | 90 |
| Diana | Eng | 80 |
| Eve | Sales | 95 |
| Frank | Sales | 85 |
STEP 1 — partitionBy("department"):
Group 1 (Eng): Alice(100), Bob(90), Charlie(90), Diana(80)
Group 2 (Sales): Eve(95), Frank(85)
STEP 2 — orderBy(salary.desc()) within each partition:
Group 1 (Eng): Alice(100) → Bob(90) → Charlie(90) → Diana(80)
Group 2 (Sales): Eve(95) → Frank(85)
STEP 3 — Apply the function:
namedepartmentsalaryrow_numberrankdense_rank
AliceEng100111
BobEng90222
CharlieEng90322
DianaEng80443
EveSales95111
FrankSales85222

Key differences with ties (Bob and Charlie both have salary 90):

FunctionResult for tiesGaps?Description
row_number()2, 3N/A — always uniqueArbitrary tiebreaker — one gets 2, the other gets 3
rank()2, 2 → skip to 4YES, gaps after tiesLike Olympic medals — two silvers, no bronze
dense_rank()2, 2 → next is 3NO gapsLike counting distinct salary levels
python — editable
# Code to produce the above:
result = df.select(
    "*",
    row_number().over(w).alias("row_number"),   # ← 1,2,3,4 (always unique)
    rank().over(w).alias("rank"),                # ← 1,2,2,4 (gaps after ties)
    dense_rank().over(w).alias("dense_rank"),    # ← 1,2,2,3 (no gaps)
)

Interview Tip: "Which one should I use for deduplication?" Always row_number() — it guarantees exactly one row per group (no ties). For "top N by category" where you want ties, use dense_rank().

What NOT to Say: "They're basically the same." Choosing the wrong one in a dedup query can give you duplicate results (rank/dense_rank don't guarantee uniqueness with ties).

Q7: Write PySpark code to compute running total, 7-day moving average, and percentage of total — all in one pass.

Simple Explanation: Imagine a sales dashboard. For each day, you want to see:

  1. Running total — "How much have we sold from day 1 until today?"
  2. 7-day moving average — "What's the average daily sales over the last 7 days?"
  3. Percentage of total — "What % of the entire year's sales happened today?"

Window functions let you compute ALL of these in a single query — no self-joins, no subqueries.

Technical details:

python — editable
from pyspark.sql import Window
from pyspark.sql.functions import sum as _sum, avg, col

# Window 1: Running total (all rows from start up to current row)
cumulative_w = Window.partitionBy("category").orderBy("date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
# ← "From the very first row to the current row"

# Window 2: 7-day moving average (current row + 6 prior rows = 7 rows)
moving_w = Window.partitionBy("category").orderBy("date") \
    .rowsBetween(-6, Window.currentRow)
# ← "From 6 rows back to the current row"

# Window 3: Total for percentage (ALL rows in the partition, no order)
total_w = Window.partitionBy("category")
# ← "Every single row in this category"

result = df.select(
    "*",
    _sum("revenue").over(cumulative_w).alias("running_total"),
    avg("revenue").over(moving_w).alias("moving_avg_7d"),
    (col("revenue") / _sum("revenue").over(total_w) * 100).alias("pct_of_total")
)

Sample data flow:

categorydaterevenue
Books2026-01-01100
Books2026-01-02150
Books2026-01-03200
Books2026-01-0450
categorydaterevenue
Books2026-01-01100
Books2026-01-02150
Books2026-01-03200
Books2026-01-0450

Interview Tip: Mention that this runs in ONE pass over the data. Without window functions, you'd need multiple self-joins or subqueries — much slower and harder to read.

What NOT to Say: "I'd use a groupBy and then join back." That works but is far less efficient than window functions for this use case.

Q8: What is the difference between rowsBetween and rangeBetween?

Simple Explanation: Both define the "window frame" — which rows the function looks at. The difference is HOW they count:

  • rowsBetween = counts by physical position (row 1, row 2, row 3...)
  • rangeBetween = counts by value (all rows where the value is within a certain range)

Analogy: Imagine you're at a concert. rowsBetween(-2, 0) = "me and the 2 people directly in front of me" (physical seats). rangeBetween(-2, 0) = "me and everyone whose ticket price is within $2 of mine" (based on value, not position).

Technical details:

daterevenue
2026-01-01100
2026-01-02150
2026-01-04200
2026-01-0550
  • rowsBetween: Physical offset by row count. -6, 0 = current row and 6 rows before.
  • rangeBetween: Logical offset by value. -6, 0 = current value and values up to 6 less.

Critical difference: With rangeBetween, if your data has gaps (e.g., missing dates), the window adjusts logically. With rowsBetween, it always uses the physical row positions.

python — editable
from pyspark.sql.functions import unix_timestamp

# Convert date to seconds for rangeBetween
days_7 = 7 * 86400  # ← 7 days in seconds
w = Window.partitionBy("category") \
    .orderBy(unix_timestamp("date")) \
    .rangeBetween(-days_7, 0)
# ← "All rows within 7 calendar days before the current row"

Interview Tip: If asked "How do you compute a 7-day moving average when dates have gaps?", use rangeBetween with unix_timestamp. This is a common follow-up that trips people up.

What NOT to Say: "I'd use rowsBetween(-6, 0) for a 7-day window." That only works if you have data for every single day with no gaps.

Q9: Scenario — Find the first and last purchase per customer, plus the time between their first and second purchase.

Simple Explanation: For each customer, we want: when did they first buy, when did they last buy, and how many days between purchase #1 and purchase #2? This tells us about customer retention — a short gap means they came back quickly.

Technical details:

python — editable
from pyspark.sql import Window
from pyspark.sql.functions import first, last, lead, datediff, col, row_number

w = Window.partitionBy("customer_id").orderBy("purchase_date")
# ← "Within each customer, sort purchases chronologically"

result = df.withColumn("purchase_rank", row_number().over(w)) \
    .withColumn("first_purchase", first("purchase_date").over(
        w.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("last_purchase", last("purchase_date").over(
        w.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )) \
    .withColumn("next_purchase", lead("purchase_date", 1).over(w)) \
    .filter(col("purchase_rank") == 1) \
    .withColumn("days_to_second_purchase",
        datediff(col("next_purchase"), col("purchase_date"))
    )

Sample data flow:

customer_idpurchase_date
C12026-01-01
C12026-01-15
C12026-03-01
customer_idpurchase_date
C12026-01-01
C12026-01-15
C12026-03-01
customer_idfirst_purchase
C12026-01-01

Interview Tip: Mention that lead() looks at the NEXT row while lag() looks at the PREVIOUS row. For "time to second purchase," you need lead from the first row, not lag from the second.

What NOT to Say: "I'd do a self-join." Window functions are far more efficient than self-joins for this pattern.

SECTION 3: PARTITIONING & BUCKETING

Q10: Explain repartition() vs coalesce(). When do you use each?

Simple Explanation: Both change the number of partitions in your DataFrame. Think of partitions as boxes of data.

  • repartition() = Dump ALL boxes out, then redistribute evenly into new boxes. Expensive (requires a full shuffle) but guarantees even distribution.
  • coalesce() = Merge adjacent boxes together WITHOUT moving most of the data. Cheap but can create uneven boxes.

Analogy: Moving into a new house.

  • repartition(4) = Unpack everything, then repack into exactly 4 evenly-filled boxes. Takes time but organized.
  • coalesce(4) = Just combine nearby boxes. Box 1 stays as is, box 2 gets merged into box 3, etc. Fast but some boxes might be overstuffed.

Technical details:

Aspectrepartition(n)coalesce(n)
ShuffleYES (full shuffle)NO (narrow transformation)
Increase partitions?YesNo (only decrease)
Even distribution?YesNo (merges adjacent partitions, can be uneven)
By column?Yes: repartition(n, "col")No
Use caseNeed even distribution, increase partitions, join optimizationReduce partitions before write

Repartition by column (hash-partitioned):

python — editable
10 partitions → 10 output files (no shuffle!)"># Co-locate all rows with same user_id on same partition
df = df.repartition(100, "user_id")
# ← Now a groupBy("user_id") or join on "user_id" won't need shuffle

# Common pattern: reduce partitions before writing to avoid small files
df.coalesce(10).write.format("delta").save("/path")
# ← Merges down to 10 partitions → 10 output files (no shuffle!)

Interview Tip: The most common use of coalesce() is right before a write to reduce the number of small output files. Say: "After a filter that reduces data by 90%, I coalesce to avoid writing thousands of tiny files."

What NOT to Say: "I use repartition before every write." That triggers an unnecessary shuffle. Use coalesce when reducing partitions.

Q11: What is bucketing? How does it eliminate shuffles?

Simple Explanation: Bucketing is like pre-sorting your filing cabinet by category ONCE, so you never have to sort it again. When you save a table, you tell Spark: "Organize this data into N buckets by this column." From then on, any join or groupBy on that column is already organized — no shuffle needed.

Technical details:

Bucketing pre-partitions data into a fixed number of buckets by hash of specified columns, and optionally sorts within each bucket.

python — editable
df.write.bucketBy(256, "user_id").sortBy("user_id").saveAsTable("bucketed_users")
# ← Data is written into 256 buckets, each containing specific user_ids
# ← Bucket assignment: bucket_number = hash(user_id) % 256

How it eliminates shuffles: When two bucketed tables with the same bucket count and bucket column are joined, Spark performs a Sort-Merge Join WITHOUT shuffle — data with the same key is already co-located.

WITHOUT bucketing:
Table A (random order) ──shuffle──┐
├── Sort-Merge Join
Table B (random order) ──shuffle──┘
Time: shuffle A + shuffle B + join = SLOW
WITH bucketing (both tables bucketed by user_id, 256 buckets):
Table A bucket 0 ─┐
Table B bucket 0 ─┘── join (no shuffle, already co-located!)
Table A bucket 1 ─┐
Table B bucket 1 ─┘── join (no shuffle!)
... × 256 buckets
Time: just the join = FAST

Caveats:

  • Only works with Hive-managed tables (saveAsTable, not save)
  • Bucket count must match between tables
  • spark.sql.sources.bucketing.enabled must be true
  • In Databricks, consider Liquid Clustering as a modern alternative

Interview Tip: Mention Liquid Clustering as the Databricks-native replacement for bucketing + Z-ORDER. It's simpler and auto-tunes. Shows you know modern Databricks features.

What NOT to Say: "I always bucket every table." Bucketing adds overhead on write and only helps if you frequently join on the same column.

Q12: Compare Hash Partitioning vs Range Partitioning.

Simple Explanation: Two ways to decide which partition a row goes to:

  • Hash partitioning = Take the key, run it through a math function, and the result tells you which partition. Like assigning students to classes by last-name hash. Fast, but no ordering.
  • Range partitioning = Split data into ranges (A-F, G-L, M-R, S-Z). Ordered, but requires knowing the data distribution first.

Technical details:

AspectHash PartitioningRange Partitioning
Algorithmpartition = hash(key) % numPartitionsPartitions by value ranges (requires sampling)
Use caseEqui-joins, groupByorderBy/sortBy, range queries
Skew riskYes, if hash distribution poor (e.g., many nulls)Can be balanced with good sampling
OutputUnordered within partitionsSorted partitions
python — editable
# Hash partitioning (implicit in repartition by column)
df.repartition(100, "user_id")
# ← partition = hash(user_id) % 100

# Range partitioning (implicit in orderBy/sortBy)
df.repartitionByRange(100, "date")
# ← partition 0 gets Jan, partition 1 gets Feb, etc.

Interview Tip: Hash partitioning is what Spark uses internally for shuffles during joins and groupBy. Range partitioning is what Spark uses for global sorting (orderBy).

What NOT to Say: "Hash partitioning always gives even distribution." If most of your keys hash to the same bucket (e.g., lots of NULLs), you get skew.

SECTION 4: UDFs & PERFORMANCE

Q13: What are UDFs? Why should you avoid them? What are the alternatives?

Simple Explanation: A UDF (User-Defined Function) is custom Python code that you plug into Spark. The problem? Spark runs on the JVM (Java Virtual Machine), but your Python code runs in a separate Python process. Every row of data has to be shipped from the JVM to Python and back — like sending mail between two buildings.

Analogy:

  • Native Spark functions = Walking inside one building (JVM). Everything is fast, the optimizer knows every step you take.
  • Python UDF = Sending mail between two buildings (JVM to Python and back). You have to package each letter (serialize), walk it to the other building (socket transfer), unpackage it (deserialize), process it, then package the result and walk it back. For EVERY. SINGLE. ROW.

Serialization diagram — why Python UDFs are slow:

📐 Architecture Diagram
                    ROW-BY-ROW PYTHON UDF
                    =====================

JVM (Spark Executor)              Python Worker Process
┌──────────────────┐              ┌──────────────────┐
│                  │   serialize  │                  │
│  Row 1 data  ────┼──────────►──┼──► process row 1 │
│                  │   (pickle)  │     (your code)   │
│  ◄───────────────┼──────────◄──┼──── result 1     │
│                  │ deserialize │                  │
│  Row 2 data  ────┼──────────►──┼──► process row 2 │
│                  │             │                  │
│  ◄───────────────┼──────────◄──┼──── result 2     │
│                  │             │                  │
│  ... × millions  │             │  ... × millions  │
└──────────────────┘              └──────────────────┘

Overhead per row: serialize + socket transfer + deserialize + Python GIL
Total overhead: O(num_rows) × per-row cost = VERY SLOW

                    PANDAS UDF (VECTORIZED)
                    =======================

JVM (Spark Executor)              Python Worker Process
┌──────────────────┐              ┌──────────────────┐
│                  │  Arrow batch │                  │
│  10,000 rows ────┼──────────►──┼──► pd.Series     │
│  (one batch)     │  (zero-copy)│  (vectorized ops) │
│                  │             │                  │
│  ◄───────────────┼──────────◄──┼──── results batch│
│                  │             │                  │
│  next 10K rows ──┼──────────►──┼──► pd.Series     │
└──────────────────┘              └──────────────────┘

Overhead per batch: 1 Arrow transfer (near zero-copy)
Total overhead: O(num_batches) × per-batch cost = MUCH FASTER

Technical details:

Why Python UDFs are slow:

  1. Data serialized from JVM → Python process (via socket) → back to JVM
  2. Each row individually processed in Python (no vectorization)
  3. Catalyst cannot optimize through UDFs (no predicate pushdown, no codegen)
  4. Python GIL limits true parallelism within a worker

Performance hierarchy (fastest to slowest):

  1. Built-in Spark SQL functions — Catalyst-optimized, codegen, runs in JVM (1x baseline)
  2. Pandas UDF (vectorized) — Arrow serialization, batch processing with pandas/numpy (3-5x slower)
  3. mapInPandas — Similar to Pandas UDF, for partition-level processing (5-10x slower)
  4. Row-at-a-time Python UDF — Avoid if possible (10-100x slower)

Interview Tip: If asked "How do you optimize a Python UDF?", say: "First, I try to replace it with built-in functions. If not possible, I convert it to a Pandas UDF for vectorized processing with Apache Arrow. As a last resort, I'd use mapInPandas for partition-level processing."

What NOT to Say: "UDFs are fine, they're just Python functions." This shows you don't understand the JVM-Python serialization overhead that makes them 10-100x slower.

Q14: Write a Pandas UDF. When do you use SCALAR vs GROUPED_MAP vs GROUPED_AGG?

Simple Explanation: Pandas UDFs come in 3 flavors, depending on what goes IN and what comes OUT:

  • Scalar = Column in → Column out (like a regular Spark function but written in pandas)
  • Grouped Map = Group of rows in → Group of rows out (for per-group processing like ML)
  • Grouped Aggregate = Group of rows in → Single value out (for custom aggregations)

Technical details:

Scalar Pandas UDF (column → column):

python — editable
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

@pandas_udf(DoubleType())
def normalize(s: pd.Series) -> pd.Series:
    return (s - s.mean()) / s.std()
    # ← Receives a batch of rows as pd.Series, returns pd.Series
    # ← Vectorized: numpy operations on the whole batch, not row-by-row

df = df.withColumn("normalized_salary", normalize(col("salary")))

Grouped Map (group → DataFrame):

python — editable
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(output_schema, PandasUDFType.GROUPED_MAP)
def train_model(pdf: pd.DataFrame) -> pd.DataFrame:
    # ← Receives ALL rows for one group as a pandas DataFrame
    model = LinearRegression().fit(pdf[features], pdf[target])
    pdf["prediction"] = model.predict(pdf[features])
    return pdf  # ← Returns a pandas DataFrame (can be different shape)

result = df.groupBy("category").apply(train_model)
# ← Trains a SEPARATE model per category — powerful for ML!

Grouped Aggregate (group → scalar):

python — editable
@pandas_udf(DoubleType(), PandasUDFType.GROUPED_AGG)
def weighted_mean(values: pd.Series, weights: pd.Series) -> float:
    return (values * weights).sum() / weights.sum()
    # ← Custom aggregation that Spark doesn't have built-in

result = df.groupBy("category").agg(weighted_mean(col("value"), col("weight")))

When to use which:

🧠 Memory Map
Need to transform a column?SCALAR
Example: normalize values, parse strings, custom math
Need to process entire groups?GROUPED_MAP
Example: train ML model per group, custom time-series per group
Need a custom aggregate?GROUPED_AGG
Example: weighted mean, trimmed mean, custom statistical functions

Interview Tip: In modern PySpark (3.0+), the decorator syntax is preferred. Mention that Scalar Pandas UDFs are the most common and give the best performance improvement over regular UDFs (3-100x faster due to Arrow serialization).

What NOT to Say: "I'd write a regular Python UDF instead." Always prefer Pandas UDFs when you must use custom Python logic.

Q15: What is mapInPandas and when do you use it?

Simple Explanation: mapInPandas lets you process entire partitions of data using pandas. Instead of getting one column or one group, you get an iterator of pandas DataFrames — each representing a chunk of the partition. This is perfect for "load a model once, score every row" patterns.

Analogy: Instead of sending each letter individually (row-at-a-time UDF) or even in batches (Pandas UDF), you send the entire mailbag to the Python worker and let it process everything at once. The model is loaded ONCE per partition, not per row or per batch.

Technical details:

python — editable
def predict_batch(iterator):
    import pickle
    model = pickle.load(open("/dbfs/models/model.pkl", "rb"))
    # ← Model loaded ONCE per partition (not per row!)
    for batch_df in iterator:
        # ← Each batch_df is a pandas DataFrame chunk
        batch_df["prediction"] = model.predict(batch_df[feature_cols])
        yield batch_df  # ← Yield results back as pandas DataFrame

result = spark_df.mapInPandas(predict_batch, schema=output_schema)
# ← Each partition: load model once → score all rows → return results

Use when:

  • You need custom Python logic that can't be expressed with built-in functions
  • Processing entire partitions (load model once, apply to all rows)
  • Need pandas/numpy for complex computations
  • You want to avoid the overhead of loading resources per batch

Interview Tip: Mention that mapInPandas uses Apache Arrow for zero-copy transfer between JVM and Python, making it much faster than traditional mapPartitions with manual serialization.

What NOT to Say: "I'd use a regular UDF to load the model and score each row." Loading a model per row is catastrophically slow.

SECTION 5: CACHING & CHECKPOINTING

Q16: Explain cache(), persist(), and checkpoint(). When would you use each?

Simple Explanation: When you reuse a DataFrame multiple times, Spark normally recomputes it from scratch each time (lazy evaluation). Caching, persisting, and checkpointing are ways to say "save this result so we don't have to recompute it."

Analogy:

  • cache() = Bookmarking a page in a book. Quick to find again, but if the book gets damaged (executor fails), you lose the bookmark and must search again.
  • persist() = Same as cache, but you can choose WHERE to save (memory, disk, or both). Like choosing between a sticky note (memory), a physical bookmark (disk), or both.
  • checkpoint() = Photocopying the page and storing the copy in a safe (HDFS/S3). Even if the original book is destroyed, you still have the copy. Also, you don't need to remember which chapter you were in — the copy is standalone.

Technical details:

MethodStorageLineageFault TolerantUse Case
cache()MEMORY_AND_DISKPreservedNo (recompute)Reused DataFrame
persist(MEMORY_ONLY)Memory onlyPreservedNoFits in memory, reused often
persist(DISK_ONLY)Disk onlyPreservedNoLarge data, infrequent reuse
persist(MEMORY_AND_DISK_SER)Memory (serialized) + diskPreservedNoMemory-constrained
checkpoint()Reliable storage (HDFS/S3)TruncatedYesLong lineage, iterative algorithms
localCheckpoint()Executor local storageTruncatedNoFast lineage break, less reliable

When to checkpoint vs cache:

  • Use cache() when the DataFrame is reused 2+ times and you want to avoid recomputation
  • Use checkpoint() when the lineage is very deep (iterative algorithms) to prevent StackOverflow
  • Always call an action after checkpoint to materialize: df.checkpoint(); df.count()
python — editable
# Common pattern: cache a filtered+transformed DataFrame used multiple times
filtered_df = raw_df.filter(col("status") == "active") \
    .select("user_id", "email", "created_at")
filtered_df.cache()       # ← mark for caching
filtered_df.count()       # ← trigger materialization (lazy!)

# Now these two operations read from cache, not from raw data:
filtered_df.groupBy("created_at").count().show()
filtered_df.join(other_df, "user_id").show()

# Don't forget to unpersist when done!
filtered_df.unpersist()   # ← free the memory

Interview Tip: Always mention unpersist(). Forgetting to release cached DataFrames is a common memory leak in production jobs. Also mention that cache() is lazy — the data isn't actually cached until an action triggers it.

What NOT to Say: "I cache everything to make it faster." Over-caching wastes memory and can cause executors to spill to disk, making things SLOWER.

SECTION 6: STRUCTURED STREAMING

Q17: Explain the Structured Streaming execution model.

Simple Explanation: Structured Streaming treats real-time data as a table that keeps growing. Every few seconds (or whatever interval you set), Spark looks at "what new rows arrived?" and processes just those new rows using the exact same code you'd write for a batch query.

Analogy: Imagine a restaurant where orders come in continuously. Instead of waiting until the restaurant closes to count all orders, the manager checks the order list every 10 seconds and processes only the NEW orders since the last check. Same counting method, just applied incrementally.

Technical details:

  • The stream is treated as an unbounded table
  • Each trigger processes new rows appended to this table
  • Uses the same Catalyst optimizer as batch queries

Trigger modes:

ModeBehavior
trigger(processingTime="10 seconds")Micro-batch every 10 seconds
trigger(once=True)Process all available, stop (deprecated)
trigger(availableNow=True)Process all available in multiple micro-batches, stop
Continuous (experimental)Row-by-row, ~1 ms latency, at-least-once only
python — editable
10 seconds") \
    .option("checkpointLocation", "/checkpoints/purchase_counts") \
    .toTable("silver_purchase_counts")
# ← Every 10 seconds: read new rows → filter → aggregate → write"># Basic streaming pipeline structure
result = spark.readStream \
    .format("delta") \
    .table("bronze_events") \
    .filter(col("event_type") == "purchase") \
    .groupBy("product_id").count()

result.writeStream \
    .format("delta") \
    .outputMode("complete") \
    .trigger(processingTime="10 seconds") \
    .option("checkpointLocation", "/checkpoints/purchase_counts") \
    .toTable("silver_purchase_counts")
# ← Every 10 seconds: read new rows → filter → aggregate → write

Interview Tip: Emphasize that the code is nearly identical to batch — "I can prototype in batch, then switch to streaming by changing read to readStream and write to writeStream." This is Structured Streaming's key design principle.

What NOT to Say: "Structured Streaming processes one row at a time." It uses micro-batches (unless using the experimental continuous mode).

Q18: What are output modes? When is each used?

Simple Explanation: After each micro-batch, Spark needs to know: "What results should I write to the sink?" The three output modes answer this differently:

  • Append = "Only write NEW rows that won't change" (like adding new entries to a log)
  • Complete = "Rewrite the ENTIRE result table every time" (like refreshing a dashboard)
  • Update = "Only write rows that CHANGED" (like updating a leaderboard)

Technical details:

ModeBehaviorWorks With
Append (default)Only new rows outputNon-aggregation queries, or aggregations with watermark
CompleteEntire result table outputOnly with aggregations
UpdateOnly changed rows outputAggregations (rows whose aggregate value changed)
🧠 Memory Map
Example: counting events per category
After batch 1: {Electronics: 10, Books: 5}
After batch 2: {Electronics: 15, Books: 5, Clothing: 3}
Append mode: Would write{Clothing: 3} (only truly new categories)
ERROR! Can't guarantee Electronics won't change.
Complete mode: Would write{Electronics: 15, Books: 5, Clothing: 3}
Entire result table every time. Safe but expensive.
Update mode: Would write{Electronics: 15, Clothing: 3}
Only the rows that changed. Books stayed at 5 so it's skipped.

Common mistake: Using append mode with aggregations without watermark → throws error because Spark can't guarantee old rows won't change.

Interview Tip: For most streaming-to-Delta pipelines without aggregation, use append. For aggregation dashboards, use update (more efficient than complete). Only use complete when the sink needs the full picture every time.

What NOT to Say: "I always use complete mode." Complete rewrites the entire result every micro-batch — extremely inefficient for large state.

Q19: Explain watermarking with a real scenario.

Simple Explanation: In the real world, data arrives late. A click that happened at 3:00 PM might only arrive at the server at 3:25 PM (due to network delays, offline devices, etc.). Watermarking tells Spark: "Wait up to X minutes for late data. After that, stop waiting and clean up."

Analogy: A professor has a homework deadline of Monday 5 PM. But they accept late submissions up to 30 minutes (the watermark). At 5:30 PM, they stop accepting papers and grade what they have. Without this cutoff, they'd wait forever and never grade anything.

Technical details:

Scenario: Clickstream sessionization. Events may arrive up to 30 minutes late.

python — editable
30 minutes") \
    .groupBy(
        col("user_id"),
        window("event_time", "1 hour")  # ← 1-hour tumbling window
    ).count()">from pyspark.sql.functions import window

clicks = spark.readStream.format("kafka").load().select(
    col("user_id"),
    col("event_time").cast("timestamp"),
    col("page_url")
)

# Define watermark: accept data up to 30 min late
sessionized = clicks \
    .withWatermark("event_time", "30 minutes") \
    .groupBy(
        col("user_id"),
        window("event_time", "1 hour")  # ← 1-hour tumbling window
    ).count()

What watermark does — step by step:

🧠 Memory Map
Time progresses:
3:00 PM — events arrive with event_time = 3:00 PM
max(event_time) = 3:00 PM
watermark = 3:00 - 30 min = 2:30 PM
→ Accept any event with event_time >= 2:30 PM
3:15 PM — events arrive with event_time = 3:15 PM
max(event_time) = 3:15 PM
watermark = 3:15 - 30 min = 2:45 PM
→ Accept any event with event_time >= 2:45 PM
→ Late event with event_time = 2:40 PM? DROPPED! (before watermark)
3:30 PM — max(event_time) = 3:30 PM
watermark = 3:00 PM
→ State for windows ending before 3:00 PM is CLEANED UP
→ This prevents unbounded state growth!

What watermark does (summary):

  1. Tracks max(event_time) seen so far
  2. watermark = max(event_time) - 30 minutes
  3. Events with event_time < watermark are dropped
  4. State older than watermark is cleaned up (prevents unbounded state growth)

Interview Tip: Always connect watermarks to STATE CLEANUP. The interviewer wants to hear: "Without watermarks, the state store grows forever. Watermarks let Spark know when it's safe to discard old state."

What NOT to Say: "Watermarks guarantee no data is ever lost." Late data arriving AFTER the watermark IS dropped. It's a trade-off between completeness and resource usage.

Q20: How do stream-stream joins work? What are the requirements?

Simple Explanation: Joining two streams is like matching real-time orders with real-time payments. Both arrive continuously, and you need to hold onto unmatched records from both sides until a match arrives (or until you give up waiting).

Technical details:

python — editable
2 hours")
# ← "Orders can arrive up to 2 hours late"
payments = payments_stream.withWatermark("payment_time", "3 hours")
# ← "Payments can arrive up to 3 hours late"

# Time-range condition limits state
joined = orders.join(
    payments,
    expr("""
        orders.order_id = payments.order_id AND
        payments.payment_time BETWEEN orders.order_time AND orders.order_time + interval 1 hour
    """),
    "left_outer"
)
# ← "Payment must arrive within 1 hour of order time""># Both streams must have watermarks
orders = orders_stream.withWatermark("order_time", "2 hours")
# ← "Orders can arrive up to 2 hours late"
payments = payments_stream.withWatermark("payment_time", "3 hours")
# ← "Payments can arrive up to 3 hours late"

# Time-range condition limits state
joined = orders.join(
    payments,
    expr("""
        orders.order_id = payments.order_id AND
        payments.payment_time BETWEEN orders.order_time AND orders.order_time + interval 1 hour
    """),
    "left_outer"
)
# ← "Payment must arrive within 1 hour of order time"

Requirements:

  1. Both sides must have watermarks defined
  2. Time-range conditions recommended to limit state
  3. For outer joins: a row is output with nulls once the watermark guarantees no future match is possible
  4. For inner joins: late data on either side is buffered until watermark allows cleanup
🧠 Memory Map
Without time-range condition:
Spark must buffer ALL unmatched orders and ALL unmatched payments FOREVER
→ State grows unboundedOOM
With time-range condition (payment within 1 hour of order):
Spark knows: if order_time = 3:00 PM and it's now 4:00 PM,
no payment can possibly matchsafe to discard that order from state
→ State is boundedstable memory usage

Interview Tip: Stream-stream joins are a favorite advanced topic. Mention the state implications: without time bounds and watermarks, state grows forever and the job eventually OOMs.

What NOT to Say: "Stream-stream joins work just like batch joins." They require watermarks and time-range conditions that batch joins don't need.

Q21: Explain the foreachBatch pattern. When is it needed?

Simple Explanation: foreachBatch is a bridge between streaming and batch. It says: "For each micro-batch, give me the data as a regular DataFrame — then I'll decide what to do with it." This unlocks batch-only operations (like MERGE) inside a streaming pipeline.

Analogy: Imagine a conveyor belt (the stream) dropping boxes onto a table every 30 seconds. foreachBatch lets you pick up each batch of boxes and do whatever you want with them — sort them, compare with existing inventory, ship some back — things you can't do while they're on the moving belt.

Technical details:

python — editable
1 minute") \
    .start()">def upsert_to_delta(batch_df, batch_id):
    # ← batch_df: regular DataFrame with this micro-batch's data
    # ← batch_id: unique ID for this batch (for idempotency)
    target = DeltaTable.forName(spark, "silver_orders")

    target.alias("t").merge(
        batch_df.alias("s"),
        "t.order_id = s.order_id"
    ).whenMatchedUpdateAll() \
     .whenNotMatchedInsertAll() \
     .execute()

spark.readStream.table("bronze_orders") \
    .writeStream \
    .foreachBatch(upsert_to_delta) \
    .option("checkpointLocation", "/checkpoints/silver_orders") \
    .trigger(processingTime="1 minute") \
    .start()

Use when:

  • MERGE into Delta Lake (can't do with regular streaming write)
  • Writing to multiple sinks in one pipeline
  • Calling external APIs per batch
  • Complex deduplication logic
  • Any operation that needs the full batch as a DataFrame

Interview Tip: foreachBatch is the answer to "How do you do streaming MERGE/upsert in Databricks?" — the most common streaming interview question. Always mention using batch_id for idempotency.

What NOT to Say: "I'd stop the stream, run a batch MERGE, then restart." That defeats the purpose of streaming.

Q22: How do you achieve exactly-once semantics in Structured Streaming?

Simple Explanation: "Exactly-once" means every record is processed exactly one time — not zero, not twice. This is surprisingly hard in distributed systems. Spark achieves it by combining three things: a replayable source, checkpointing, and an idempotent sink.

Analogy: Imagine processing bank transactions:

  1. Source must be replayable — like a numbered list where you can say "start from transaction #500" (Kafka offsets)
  2. Engine must remember where it stopped — "I finished up to transaction #499" (checkpointing)
  3. Sink must handle duplicates — "If I accidentally process #499 again, the result is the same" (idempotent writes)

Technical details:

Three requirements:

  1. Source: Must be replayable (Kafka with offsets, file source with checkpoints)
  2. Engine: Checkpointing tracks offsets and state. On restart, Spark replays from last committed offset.
  3. Sink: Must be idempotent (re-writing the same batch produces the same result)

Built-in exactly-once sinks:

  • Delta Lake (ACID transactions)
  • File sink (uses batch ID in file names)
  • Kafka sink (with idempotent producer)
python — editable
def idempotent_write(batch_df, batch_id):
    # ← Use batch_id to ensure idempotency
    # ← If this batch is replayed, the overwrite produces the same result
    batch_df.write.format("delta") \
        .mode("overwrite") \
        .option("replaceWhere", f"batch_id = {batch_id}") \
        .save("/path/to/output")

Interview Tip: The key insight is that exactly-once is achieved END-TO-END, not by any single component. Source + Engine + Sink must all cooperate. If any one fails (e.g., a non-idempotent sink), you lose the guarantee.

What NOT to Say: "Spark guarantees exactly-once automatically." Only true with the right source + sink combination. A write to a REST API without idempotency keys is NOT exactly-once.

Q23: Scenario — Your streaming pipeline's state store is growing unbounded. How do you fix it?

Simple Explanation: The state store holds intermediate data that the streaming query needs to remember (e.g., running counts, unmatched join records, dedup history). If it grows forever, your job will eventually run out of memory and crash.

This is like a to-do list that only adds items but never removes completed ones — eventually the list is so long you can't carry it.

Technical details:

  1. Add watermarks to bound the state
  2. Add time constraints on joins to limit buffered data
  3. Use RocksDB state store (disk-based, handles large state):
python — editable
spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
)
# ← Default state store is in-memory (HashMap). RocksDB uses disk + memory cache
# ← Can handle GBs of state that would OOM with the default store
  1. Set spark.sql.streaming.stateStore.minDeltasForSnapshot for compaction
  2. Monitor state via StreamingQueryListener:
python — editable
class StateMonitor(StreamingQueryListener):
    def onQueryProgress(self, event):
        state_info = event.progress.stateOperators
        for op in state_info:
            print(f"State rows: {op.numRowsTotal}, Memory: {op.memoryUsedBytes}")
            # ← Watch these numbers. If numRowsTotal only goes UP, you have a problem

Checklist for debugging unbounded state:

🗂️State growing unbounded?
Are watermarks defined?
NO → Add withWatermark() on the event-time column
YES → Is the watermark delay too large? Reduce it.
Are join conditions time-bounded?
NO → Add "BETWEEN order_time AND order_time + interval X"
YES → Is the interval too large?
Are you using dropDuplicates without watermark?
YES → Add watermark so Spark can expire old dedup state
Is the state store in-memory?
YES → Switch to RocksDB state store for disk-based state

Interview Tip: Start with watermarks (the most common fix), then mention RocksDB (shows deep knowledge). If the interviewer asks about monitoring, mention StreamingQueryListener.

What NOT to Say: "I'd just increase executor memory." That's a band-aid, not a fix. The state will keep growing and eventually exceed any amount of memory.

Q24: What is the difference between trigger(once=True) and trigger(availableNow=True)?

Simple Explanation: Both are for "batch-style streaming" — process all available data, then stop. The difference is HOW they process:

  • trigger(once=True) = Shoves ALL data into ONE micro-batch. Can OOM if there's a lot of data.
  • trigger(availableNow=True) = Splits data into MULTIPLE micro-batches, processes them sequentially, then stops. Memory-friendly.

Analogy: You have 1000 emails to process. once=True = try to open all 1000 at once (your computer might crash). availableNow=True = open 100 at a time, 10 batches, then stop (much safer).

Technical details:

Aspecttrigger(once=True)trigger(availableNow=True)
ProcessingOne single micro-batchMultiple micro-batches
ParallelismAll data in one batchSpreads across multiple batches
MemoryCan OOM on large backlogMore memory-friendly
StatusDeprecated (Spark 3.3+)Recommended replacement
Use casePeriodic batch-style runsPeriodic batch-style runs (better)
python — editable
100 GB arrived since last run → tries to process 100 GB at once → OOM

# New way (recommended):
df.writeStream.trigger(availableNow=True).start()
# ← Processes all available data in multiple micro-batches
# ← 100 GB arrives → splits into 10 x 10 GB batches → processes sequentially → safe"># Old way (deprecated):
df.writeStream.trigger(once=True).start()
# ← Processes ALL available data in a single micro-batch
# ← If 100 GB arrived since last run → tries to process 100 GB at once → OOM

# New way (recommended):
df.writeStream.trigger(availableNow=True).start()
# ← Processes all available data in multiple micro-batches
# ← 100 GB arrives → splits into 10 x 10 GB batches → processes sequentially → safe

Interview Tip: Always recommend availableNow=True over once=True. It shows you know the modern API and understand the memory implications. Common pattern: schedule this via Databricks Jobs to run every hour for cost-efficient near-real-time processing.

What NOT to Say: "They're the same thing." The memory behavior difference is significant for production workloads.

SECTION 7: CODING CHALLENGES

Q25: Deduplicate records keeping the most recent per key.

Simple Explanation: You have duplicate records for the same ID (e.g., multiple updates to the same user). You want to keep only the LATEST version of each record. This is the most common PySpark interview coding question.

Technical details:

python — editable
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window

window = Window.partitionBy("id").orderBy(col("updated_at").desc())
# ← "Within each id, sort by updated_at with newest first"

deduped = df.withColumn("rn", row_number().over(window)) \
    .filter(col("rn") == 1) \
    .drop("rn")
# ← row_number gives 1 to the newest, 2 to the second newest, etc.
# ← Filter rn == 1 keeps ONLY the newest record per id

Sample data flow:

idnameupdated_at
1Alice2026-01-01
1Alice_v22026-01-15
2Bob2026-01-10
idnameupdated_at
1Alice_v22026-01-15
1Alice2026-01-01
2Bob2026-01-10
idnameupdated_at
1Alice_v22026-01-15
2Bob2026-01-10

Interview Tip: Use row_number(), NOT rank() or dense_rank(). Those can give ties, meaning you'd keep multiple rows per key. row_number() guarantees exactly one row per partition.

What NOT to Say: "I'd use dropDuplicates(['id'])." That keeps an ARBITRARY record, not necessarily the most recent one. You can't control which record is kept.

Q26: Pivot a table — convert rows to columns.

Simple Explanation: Pivoting turns unique values in a column into separate columns. Like turning a vertical list of sales-by-category into a horizontal spreadsheet with one column per category.

Technical details:

python — editable
# Always pass distinct values list to avoid an extra job
pivoted = df.groupBy("date") \
    .pivot("category", ["Electronics", "Clothing", "Food"]) \
    .agg(sum("amount"))
# ← Pass the list ["Electronics", "Clothing", "Food"] explicitly!
# ← Without it, Spark runs an extra job to discover all distinct values

Sample data flow:

datecategoryamount
2026-01-01Electronics100
2026-01-01Clothing50
2026-01-01Food30
2026-01-02Electronics120
2026-01-02Food40
dateElectronicsClothing
2026-01-0110050
2026-01-02120NULL

Follow-up: How do you unpivot (melt)?

python — editable
from pyspark.sql.functions import expr

# Spark 3.4+ native unpivot:
unpivoted = df.unpivot("date", ["Electronics", "Clothing", "Food"], "category", "amount")

# Before 3.4 — use stack:
unpivoted = df.selectExpr(
    "date",
    "stack(3, 'Electronics', Electronics, 'Clothing', Clothing, 'Food', Food) as (category, amount)"
)
# ← stack(N, ...) takes N key-value pairs and turns columns back into rows

Interview Tip: Always pass the explicit list of values to pivot(). Without it, Spark scans the entire dataset first to discover distinct values — an expensive extra job.

What NOT to Say: "I'd use a for-loop to create separate DataFrames and union them." That's the anti-pattern that pivot was designed to replace.

Q27: Find gaps in a sequential series.

Simple Explanation: Given a series of IDs (1, 2, 3, 5, 6, 10), find where the gaps are (4 is missing, 7-8-9 are missing). This is useful for audit logs, sequence validation, and data quality checks.

Technical details:

python — editable
from pyspark.sql.functions import lead
from pyspark.sql import Window

w = Window.orderBy("sequence_id")
gaps = df.withColumn("next_id", lead("sequence_id").over(w)) \
    .filter(col("next_id") - col("sequence_id") > 1) \
    .select(
        col("sequence_id").alias("gap_start"),
        col("next_id").alias("gap_end"),
        (col("next_id") - col("sequence_id") - 1).alias("gap_size")
    )
# ← lead() looks at the NEXT row's value
# ← If current=3 and next=5, the gap is 4 (size 1)

Sample data flow:

5← gap: 4 is missing
10← gap: 7,8,9 are missing
sequence_idnext_id
12
23
35
56
610
10NULL
gap_startgap_end
35
610

Interview Tip: This pattern (lead/lag + filter) is the standard approach for detecting gaps, islands, and consecutive sequences. Master it — it appears in many variations.

What NOT to Say: "I'd generate all numbers and do a left anti join." That works but is wasteful for sparse sequences with large ranges.

Q28: Sessionize clickstream data (gap-based sessions).

Simple Explanation: Sessionization groups user clicks into "sessions." A new session starts when the user has been inactive for more than X minutes (e.g., 30 minutes). If Alice clicks at 1:00, 1:05, 1:10, then 3:00 — the first three clicks are session 1, and 3:00 starts session 2.

Technical details:

python — editable
30 minutes in seconds

df = df.withColumn("prev_time", lag("event_time").over(w)) \
    .withColumn("new_session",
        when(
            (unix_timestamp("event_time") - unix_timestamp("prev_time")) > session_timeout, 1
        ).when(col("prev_time").isNull(), 1)  # ← first event is always new session
        .otherwise(0)
    ) \
    .withColumn("session_id",
        _sum("new_session").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))
    )
# ← Running sum of new_session flags gives each session a unique incrementing ID">from pyspark.sql.functions import lag, when, unix_timestamp, sum as _sum, monotonically_increasing_id
from pyspark.sql import Window

w = Window.partitionBy("user_id").orderBy("event_time")
session_timeout = 30 * 60  # ← 30 minutes in seconds

df = df.withColumn("prev_time", lag("event_time").over(w)) \
    .withColumn("new_session",
        when(
            (unix_timestamp("event_time") - unix_timestamp("prev_time")) > session_timeout, 1
        ).when(col("prev_time").isNull(), 1)  # ← first event is always new session
        .otherwise(0)
    ) \
    .withColumn("session_id",
        _sum("new_session").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow))
    )
# ← Running sum of new_session flags gives each session a unique incrementing ID

Sample data flow:

📋 Overview
INPUT (user Alice)
| user_id | event_time | page |
|---------|-----------|-----------|
| Alice | 1:00 PM | /home |
| Alice | 1:05 PM | /products |
| Alice | 1:10 PM | /cart |
| Alice | 3:00 PM | /home | ← 110 min gap > 30 min timeout
| Alice | 3:02 PM | /checkout |
STEP 1 — lag() gets previous event time:
| event_time | prev_time | gap_minutes |
|-----------|-----------|-------------|
| 1:00 PM | NULL | N/A |
| 1:05 PM | 1:00 PM | 5 |
| 1:10 PM | 1:05 PM | 5 |
| 3:00 PM | 1:10 PM | 110 | ← > 30 min!
| 3:02 PM | 3:00 PM | 2 |
STEP 2 — flag new sessions:
| event_time | new_session |
|-----------|-------------|
| 1:00 PM | 1 | ← first event = new session
| 1:05 PM | 0 | ← 5 min gap < 30 min
| 1:10 PM | 0 |
| 3:00 PM | 1 | ← 110 min gap > 30 min = NEW SESSION
| 3:02 PM | 0 |
STEP 3 — running sum gives session IDs:
| event_time | new_session | session_id |
|-----------|-------------|------------|
| 1:00 PM | 1 | 1 |
| 1:05 PM | 0 | 1 |
| 1:10 PM | 0 | 1 |
| 3:00 PM | 1 | 2 | ← new session!
| 3:02 PM | 0 | 2 |

Interview Tip: The key insight is using a running sum over boolean flags. This "cumulative sum of flags" pattern appears in many problems beyond sessionization (islands-and-gaps, state changes, etc.).

What NOT to Say: "I'd use a UDF to loop through rows and assign session IDs." Window functions do this natively and much faster.

Q29: Flatten a deeply nested JSON structure.

Simple Explanation: JSON data often has nested objects and arrays. "Flattening" means turning the nested structure into a simple table with one row per leaf-level record. Each explode() turns an array into multiple rows.

Technical details:

python — editable
from pyspark.sql.functions import explode, col

raw = spark.read.json("/path/to/nested.json")

# Assume structure: {id, orders: [{order_id, amount, items: [{product, qty}]}]}
flat = raw.select("id", explode("orders").alias("order")) \
    .select(
        "id",
        col("order.order_id"),     # ← access struct fields with dot notation
        col("order.amount"),
        explode("order.items").alias("item")  # ← explode nested array
    ).select(
        "id",
        "order_id",
        "amount",
        col("item.product").alias("product_name"),
        col("item.qty").alias("quantity")
    )

Sample data flow:

idorder.order_idorder.amountorder.items
C1O1100[{Laptop,1},{Mouse,2}]
C1O250[{Book,3}]
idorder_idamountproduct_name
C1O1100Laptop
C1O1100Mouse
C1O250Book

Generic recursive flattener:

python — editable
from pyspark.sql.types import StructType, ArrayType

def flatten_df(df):
    """Recursively flatten all nested structs and arrays."""
    flat_cols = []
    for field in df.schema.fields:
        if isinstance(field.dataType, StructType):
            for subfield in field.dataType.fields:
                flat_cols.append(col(f"{field.name}.{subfield.name}").alias(f"{field.name}_{subfield.name}"))
        elif isinstance(field.dataType, ArrayType):
            df = df.withColumn(field.name, explode(col(field.name)))
            return flatten_df(df)  # ← Recurse after explode
        else:
            flat_cols.append(col(field.name))
    return df.select(flat_cols)

Interview Tip: Mention that explode creates a new row per array element, which multiplies the row count. For large arrays, this can cause data explosion. Consider using posexplode if you need the array index too.

What NOT to Say: "I'd convert to pandas and flatten there." That defeats the purpose of distributed processing and will fail on large datasets.

Q30: Write a query to find the top 3 products by revenue in each category.

Simple Explanation: Classic "top-N per group" pattern. First aggregate revenue per product, then rank within each category, then filter to keep only the top 3.

Technical details:

python — editable
from pyspark.sql.functions import dense_rank, col, sum as _sum
from pyspark.sql import Window

# Step 1: Aggregate revenue per product per category
product_revenue = df.groupBy("category", "product_id") \
    .agg(_sum("revenue").alias("total_revenue"))

# Step 2: Rank within each category
w = Window.partitionBy("category").orderBy(col("total_revenue").desc())
top3 = product_revenue.withColumn("rnk", dense_rank().over(w)) \
    .filter(col("rnk") <= 3) \
    .drop("rnk")

Sample data flow:

categoryproduct_idtotal_revenue
ElectronicsP15000
ElectronicsP23000
ElectronicsP33000
ElectronicsP41000
BooksP52000
BooksP61500
categoryproduct_idtotal_revenue
ElectronicsP15000
ElectronicsP23000
ElectronicsP33000
ElectronicsP41000
BooksP52000
BooksP61500

Interview Tip: Use dense_rank() if you want ties included (both P2 and P3 at rank 2). Use row_number() if you want exactly 3 rows per category regardless of ties. Clarify with the interviewer which behavior they want.

What NOT to Say: "I'd sort each group and take the first 3." Without a window function, you'd need expensive groupBy + collect_list + slice patterns.

Q31: Compute the running difference between consecutive rows.

Simple Explanation: For time-series data (sensor readings, stock prices), you often want to know: "How much did the value change from the previous reading?" Use lag() to look back one row.

Technical details:

python — editable
from pyspark.sql.functions import lag, col
from pyspark.sql import Window

w = Window.partitionBy("sensor_id").orderBy("timestamp")
result = df.withColumn("prev_value", lag("value", 1).over(w)) \
    .withColumn("delta", col("value") - col("prev_value"))
# ← lag("value", 1) = "the value from 1 row ago"
# ← delta = current value - previous value

Sample data flow:

sensor_idtimestampvalue
S11:00 PM100
S11:05 PM105
S11:10 PM98
sensor_idtimestampvalue
S11:00 PM100
S11:05 PM105
S11:10 PM98

Interview Tip: lag(col, N) looks N rows back, lead(col, N) looks N rows forward. You can also provide a default value: lag("value", 1, 0) returns 0 instead of NULL for the first row.

What NOT to Say: "I'd do a self-join on row number." Window functions are the standard approach and far more efficient.

Q32: Find employees whose salary is above the department average.

Simple Explanation: For each employee, compare their salary against the average for their department. Keep only those who earn more than the average. Window functions let you compute the department average without a separate groupBy + join.

Technical details:

python — editable
from pyspark.sql.functions import avg, col
from pyspark.sql import Window

w = Window.partitionBy("department")
result = df.withColumn("dept_avg", avg("salary").over(w)) \
    .filter(col("salary") > col("dept_avg")) \
    .drop("dept_avg")
# ← avg("salary").over(w) adds a column with the department average to EVERY row
# ← Then we simply filter rows where salary > dept_avg

Sample data flow:

namedepartmentsalary
AliceEng120K
BobEng80K
CharlieEng100K
DianaSales90K
EveSales70K
namedepartmentsalary
AliceEng120K
BobEng80K
CharlieEng100K
DianaSales90K
EveSales70K
namedepartmentsalary
AliceEng120K
DianaSales90K

Interview Tip: This shows the power of window functions over groupBy — you keep individual rows while computing group-level metrics. No self-join needed.

What NOT to Say: "I'd groupBy department, compute the average, then join back." That works but is less elegant and less efficient than a single window function.

Q33: Scenario — Given two DataFrames (orders and returns), find customers who placed orders but never returned anything.

Simple Explanation: "Find loyal customers who never return products." This is a classic "exists in A but not in B" pattern. The most efficient approach is a left_anti join.

Technical details:

python — editable
# Method 1: left_anti join (most efficient)
loyal_customers = orders_df.join(returns_df, "customer_id", "left_anti") \
    .select("customer_id").distinct()
# ← left_anti = "from left table, keep rows that DON'T match right table"
# ← Like SQL: WHERE customer_id NOT IN (SELECT customer_id FROM returns)

# Method 2: left_outer + filter (works but less efficient)
loyal_customers = orders_df.join(returns_df, "customer_id", "left_outer") \
    .filter(returns_df["return_id"].isNull()) \
    .select(orders_df["customer_id"]).distinct()
# ← Left join gives NULLs where no match exists, then filter for those NULLs

Sample data flow:

customer_idorder_idcustomer_idreturn_id
C1O1C1R1
C2O2C3R2
C3O3
C4O4
C2← has orders, no returns
C4← has orders, no returns

Interview Tip: Always mention left_anti first — it's the most efficient because Spark doesn't need to carry any columns from the right side. It just checks existence. Follow up with the left_outer + filter approach to show you know alternatives.

What NOT to Say: "I'd collect all return customer IDs into a list and use isin()." That collects data to the driver and fails with large datasets.

Q34: Calculate month-over-month growth rate per product.

Simple Explanation: For each product, compare this month's revenue to last month's. Growth rate = (this month - last month) / last month * 100%. Use lag() to look back one month.

Technical details:

python — editable
from pyspark.sql.functions import lag, col, round as _round
from pyspark.sql import Window

w = Window.partitionBy("product_id").orderBy("month")

growth = monthly_revenue.withColumn("prev_revenue", lag("revenue", 1).over(w)) \
    .withColumn("mom_growth_pct",
        _round(
            (col("revenue") - col("prev_revenue")) / col("prev_revenue") * 100, 2
        )
    )
# ← lag("revenue", 1) gets last month's revenue
# ← growth = (current - previous) / previous * 100

Sample data flow:

product_idmonthrevenue
P12026-0110000
P12026-0212000
P12026-039000
product_idmonthrevenue
P12026-0110000
P12026-0212000
P12026-039000

Interview Tip: Handle edge cases: what if prev_revenue is 0? Division by zero! Add a when clause: when(col("prev_revenue") > 0, growth_formula).otherwise(None).

What NOT to Say: "I'd self-join the table on month = month - 1." That works but window functions are the standard, cleaner approach.

Q35: Find all pairs of products frequently bought together (market basket analysis).

Simple Explanation: Given order data (which products were in each order), find which product pairs appear together most often. This is the classic "Customers who bought X also bought Y" problem.

Technical details:

python — editable
from pyspark.sql.functions import collect_set, explode, col, array_sort
from itertools import combinations

# Step 1: Get all products per order
order_products = df.groupBy("order_id") \
    .agg(collect_set("product_id").alias("products"))
# ← Each row: one order with a list of all products in it

# Step 2: Explode into pairs (using Pandas UDF since built-in is limited)
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
import pandas as pd

@pandas_udf(ArrayType(StringType()))
def get_pairs(products: pd.Series) -> pd.Series:
    return products.apply(lambda x: [f"{a}|{b}" for a, b in combinations(sorted(x), 2)])
    # ← For each order, generate all unique pairs: {A,B,C} → "A|B", "A|C", "B|C"

# Step 3: Count pair frequencies
pairs = order_products.withColumn("pair", explode(get_pairs(col("products")))) \
    .groupBy("pair").count() \
    .orderBy(col("count").desc())
# ← Most frequent pair = most commonly bought together

Sample data flow:

order_idproduct_id
O1Laptop
O1Mouse
O1Keyboard
O2Laptop
O2Mouse
order_idproducts
O1[Keyboard, Laptop, Mouse]
O2[Laptop, Mouse]
KeyboardLaptop
KeyboardMouse
LaptopMouse
LaptopMouse
paircount
LaptopMouse
KeyboardLaptop
KeyboardMouse

Interview Tip: For very large datasets, consider using Spark's FPGrowth algorithm from MLlib instead of this brute-force approach. Mention it to show ML awareness.

What NOT to Say: "I'd use nested for-loops to compare every pair." That's O(n^2) and not distributed.

Q36: Scenario — Process a large CSV with bad records. Keep good records, quarantine bad ones.

Simple Explanation: Real-world data is messy. Some rows might have wrong data types, missing fields, or corrupted characters. Instead of failing the entire pipeline, we separate "good" records from "bad" records and process them differently.

Analogy: Like airport security — valid passengers go through to their gate, flagged items go to a separate inspection area. You don't shut down the whole airport because of one suspicious bag.

Technical details:

python — editable
# Use PERMISSIVE mode with corrupt record column
df = spark.read.option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .schema(expected_schema) \
    .csv("/path/to/data.csv")
# ← PERMISSIVE mode: don't fail on bad rows, put them in _corrupt_record column

# Separate good and bad
good_records = df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
# ← Rows where _corrupt_record is NULL = successfully parsed = good

bad_records = df.filter(col("_corrupt_record").isNotNull()) \
    .select("_corrupt_record")
# ← Rows where _corrupt_record has a value = failed to parse = bad

# Write both
good_records.write.format("delta").mode("append").saveAsTable("silver_data")
bad_records.write.format("delta").mode("append").saveAsTable("quarantine_data")
# ← Good data continues downstream, bad data goes to quarantine for investigation

The three read modes:

PERMISSIVE (default): Puts corrupt rows in a special column. Pipeline continues.
DROPMALFORMED: Silently drops bad rows. Dangerous — you lose data without knowing.
FAILFAST: Throws exception on first bad row. Good for testing, bad for production.

Interview Tip: Always mention the quarantine pattern — it shows production maturity. In Databricks, this is part of the Medallion Architecture: bad records go to a quarantine table in the Bronze layer for later investigation.

What NOT to Say: "I'd use FAILFAST mode in production." One bad row would crash your entire pipeline.

Q37: Implement a custom aggregation — median (not built into Spark SQL).

Simple Explanation: Spark doesn't have a built-in exact median() function. You have two options: an approximate median (fast, good enough for most cases) or an exact median (uses window functions, more expensive).

Technical details:

python — editable
from pyspark.sql.functions import percentile_approx, expr

# Approximate median (fast, good enough for most cases)
result = df.groupBy("department") \
    .agg(percentile_approx("salary", 0.5).alias("median_salary"))
# ← percentile_approx gives ~99% accurate median with much less computation

# Exact median using window function
from pyspark.sql.functions import count, row_number, col, avg
from pyspark.sql import Window

w = Window.partitionBy("department").orderBy("salary")
total_w = Window.partitionBy("department")

result = df.withColumn("rn", row_number().over(w)) \
    .withColumn("cnt", count("*").over(total_w)) \
    .filter(
        (col("rn") == (col("cnt") / 2).cast("int") + 1) |
        ((col("cnt") % 2 == 0) & (col("rn") == (col("cnt") / 2).cast("int")))
    ) \
    .groupBy("department") \
    .agg(avg("salary").alias("median_salary"))
# ← For odd count: take the middle row
# ← For even count: take the two middle rows and average them

Sample data flow:

salaryrncnt
60K15
70K25
80K35
90K45
100K55
60K14
70K24
80K34
90K44

Interview Tip: Start with percentile_approx — it's the pragmatic answer. Mention the exact method as a follow-up. If the interviewer asks about accuracy, percentile_approx with default settings is accurate to within 0.01%.

What NOT to Say: "Spark can't compute median." It can — you just need to know the approach.

Q38: Write a streaming pipeline that reads from Kafka, deduplicates, and writes to Delta.

Simple Explanation: This is a complete end-to-end streaming pipeline. It reads JSON events from Kafka, removes duplicate events (using event_id), and writes clean data to a Delta table. This is the bread-and-butter of data engineering on Databricks.

Technical details:

python — editable
10 minutes") \
    .dropDuplicates(["event_id"])
# ← Within a 10-minute window, drop events with the same event_id
# ← Watermark bounds the dedup state (only remembers last 10 min of event_ids)

# Step 5: Write to Delta with foreachBatch for MERGE-based dedup
def upsert_events(batch_df, batch_id):
    from delta.tables import DeltaTable
    if DeltaTable.isDeltaTable(spark, "/delta/events"):
        target = DeltaTable.forPath(spark, "/delta/events")
        target.alias("t").merge(
            batch_df.alias("s"), "t.event_id = s.event_id"
        ).whenNotMatchedInsertAll().execute()
        # ← MERGE ensures idempotency: replayed batches don't create duplicates
    else:
        batch_df.write.format("delta").save("/delta/events")
        # ← First batch creates the table

deduped.writeStream \
    .foreachBatch(upsert_events) \
    .option("checkpointLocation", "/checkpoints/events") \
    .trigger(processingTime="30 seconds") \
    .start()
# ← Every 30 seconds: read new Kafka messages → parse → dedup → MERGE into Delta">from pyspark.sql.functions import from_json, col, expr

# Step 1: Define the expected schema
schema = "event_id STRING, user_id STRING, event_type STRING, event_time TIMESTAMP, payload STRING"

# Step 2: Read from Kafka
raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker1:9092") \
    .option("subscribe", "events") \
    .option("startingOffsets", "latest") \
    .load()
# ← Kafka gives you key, value, topic, partition, offset, timestamp
# ← The actual event data is in the "value" column as binary

# Step 3: Parse JSON from Kafka's value column
parsed = raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
    # ← cast binary to string, then parse JSON into struct
).select("data.*")
# ← Flatten the struct into individual columns

# Step 4: Deduplicate using watermark
deduped = parsed \
    .withWatermark("event_time", "10 minutes") \
    .dropDuplicates(["event_id"])
# ← Within a 10-minute window, drop events with the same event_id
# ← Watermark bounds the dedup state (only remembers last 10 min of event_ids)

# Step 5: Write to Delta with foreachBatch for MERGE-based dedup
def upsert_events(batch_df, batch_id):
    from delta.tables import DeltaTable
    if DeltaTable.isDeltaTable(spark, "/delta/events"):
        target = DeltaTable.forPath(spark, "/delta/events")
        target.alias("t").merge(
            batch_df.alias("s"), "t.event_id = s.event_id"
        ).whenNotMatchedInsertAll().execute()
        # ← MERGE ensures idempotency: replayed batches don't create duplicates
    else:
        batch_df.write.format("delta").save("/delta/events")
        # ← First batch creates the table

deduped.writeStream \
    .foreachBatch(upsert_events) \
    .option("checkpointLocation", "/checkpoints/events") \
    .trigger(processingTime="30 seconds") \
    .start()
# ← Every 30 seconds: read new Kafka messages → parse → dedup → MERGE into Delta

Interview Tip: This pipeline has TWO levels of deduplication: (1) dropDuplicates removes dupes within the stream, and (2) MERGE prevents dupes against the target table. Mention both — it shows thorough thinking.

What NOT to Say: "I'd just use append mode." Without deduplication, duplicate events from Kafka retries will create duplicate rows in your Delta table.