Question Bank

PySpark Interview Questions

All 47 questions — full access

✓ Full Access 47 questions shown
← Study Guide
1 Filter employees earning > 50k
Problem: Given an employees table with (emp_id, name, dept, salary),
return all employees with salary > 50000, sorted by salary desc.
python — editable
from pyspark.sql.functions import col

employees = spark.read.parquet("employees/")

result = employees \
    .filter(col("salary") > 50000) \
    .select("emp_id", "name", "dept", "salary") \
    .orderBy(col("salary").desc())

result.show()
2 Count orders per customer
Problem: Given orders(order_id, customer_id, amount, order_date),
find total orders and total amount spent per customer.
python — editable
from pyspark.sql.functions import count, sum, col

result = orders.groupBy("customer_id") \
    .agg(
        count("order_id").alias("total_orders"),
        sum("amount").alias("total_spent")
    ) \
    .orderBy(col("total_spent").desc())

result.show()
3 Find duplicate rows by email
Problem: Given users(user_id, email, created_at),
find all emails that appear more than once.
python — editable
from pyspark.sql.functions import count, col

# Method 1: groupBy + filter
duplicates = users.groupBy("email") \
    .agg(count("*").alias("cnt")) \
    .filter(col("cnt") > 1)

# Method 2: join back to get full rows
result = users.join(duplicates, "email", "inner") \
    .select("user_id", "email", "created_at", "cnt") \
    .orderBy("email")

result.show()
4 Total sales by date
Problem: Given transactions(txn_id, date, store_id, amount),
compute daily total sales, ordered by date.
python — editable
from pyspark.sql.functions import sum, col, to_date

result = transactions \
    .withColumn("date", to_date(col("date"))) \
    .groupBy("date") \
    .agg(sum("amount").alias("daily_sales")) \
    .orderBy("date")

result.show()
5 Add derived column (categorize salary)
Problem: Given employees(emp_id, name, salary),
add a "salary_band" column: High(>100k), Mid(50k-100k), Low(<50k)
python — editable
from pyspark.sql.functions import col, when

result = employees.withColumn(
    "salary_band",
    when(col("salary") > 100000, "High")
    .when(col("salary") >= 50000, "Mid")
    .otherwise("Low")
)

result.show()
6 Read CSV + handle nulls
Problem: Read a CSV file with nulls. Fill numeric nulls with 0,
string nulls with "Unknown". Drop rows where emp_id is null.
python — editable
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([
    StructField("emp_id",  IntegerType(), nullable=True),
    StructField("name",    StringType(),  nullable=True),
    StructField("dept",    StringType(),  nullable=True),
    StructField("salary",  DoubleType(),  nullable=True)
])

df = spark.read \
    .schema(schema) \
    .option("header", True) \
    .csv("employees.csv")

result = df \
    .na.drop(subset=["emp_id"]) \             # drop rows where emp_id is null
    .na.fill({"salary": 0.0, "dept": "Unknown"})  # fill remaining nulls

result.show()
7 Word count (RDD style)
Problem: Given a text file, count the frequency of each word.
Return top 10 most frequent words.
python — editable
# RDD approach (classic interview question)
rdd = sc.textFile("data/text_file.txt")

word_counts = rdd \
    .flatMap(lambda line: line.lower().split()) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .sortBy(lambda x: x[1], ascending=False)

word_counts.take(10)

# DataFrame approach (prefer in practice)
from pyspark.sql.functions import explode, split, lower, col

df = spark.read.text("data/text_file.txt")
result = df \
    .withColumn("word", explode(split(lower(col("value")), "\\s+"))) \
    .groupBy("word") \
    .count() \
    .orderBy(col("count").desc()) \
    .limit(10)

result.show()

SECTION 2: MEDIUM QUESTIONS (M)

8 Max salary per department
Problem: Given employees(emp_id, name, dept, salary),
find the highest salary in each department.
Company: Amazon, Google
python — editable
from pyspark.sql.functions import max, col

result = employees.groupBy("dept") \
    .agg(max("salary").alias("max_salary")) \
    .orderBy(col("max_salary").desc())

result.show()
9 Rank employees by salary per department
Problem: Rank employees within each department by salary (highest = rank 1).
Return emp_id, name, dept, salary, rank.
Company: Amazon
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

w = Window.partitionBy("dept").orderBy(col("salary").desc())

result = employees.withColumn("rank", dense_rank().over(w)) \
    .select("emp_id", "name", "dept", "salary", "rank") \
    .orderBy("dept", "rank")

result.show()
10 Top 3 salaries per department
Problem: Find the top 3 distinct salary levels per department.
If multiple employees share the 3rd highest salary, include all.
Company: Amazon (LeetCode #185)
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

w = Window.partitionBy("dept").orderBy(col("salary").desc())

result = employees \
    .withColumn("rnk", dense_rank().over(w)) \
    .filter(col("rnk") <= 3) \
    .select("dept", "name", "salary", "rnk") \
    .orderBy("dept", "rnk")

result.show()

# KEY INSIGHT: dense_rank → tied salaries get same rank, all appear
# If you used rank() → gaps: ranks 1,1,3 (skips 2)
# If you used row_number() → only 3 rows max, misses ties
11 Second highest salary
Problem: Find the second highest salary overall.
If no second highest exists, return null.
Company: Amazon (LeetCode #176)
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col

w = Window.orderBy(col("salary").desc())

result = employees \
    .withColumn("rnk", dense_rank().over(w)) \
    .filter(col("rnk") == 2) \
    .select("salary") \
    .distinct()

# Handle case where no second salary exists
if result.count() == 0:
    result = spark.createDataFrame([(None,)], ["salary"])

result.show()
12 Running total of revenue
Problem: Given revenue(date, amount), compute cumulative revenue
over time (running total from earliest date).
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col

w = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = revenue \
    .withColumn("running_total", sum("amount").over(w)) \
    .select("date", "amount", "running_total") \
    .orderBy("date")

result.show()
13 7-day rolling average
Problem: Given daily_sales(date, sales), compute 7-day rolling
average (current day + 6 previous days).
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col, round

w = Window.orderBy("date").rowsBetween(-6, Window.currentRow)  # 6 prior + current

result = daily_sales \
    .withColumn("rolling_7d_avg", round(avg("sales").over(w), 2)) \
    .select("date", "sales", "rolling_7d_avg") \
    .orderBy("date")

result.show()
14 Employees earning more than their manager
Problem: Given employees(emp_id, name, salary, manager_id),
find all employees who earn more than their direct manager.
Company: Google (LeetCode #181)
python — editable
from pyspark.sql.functions import col

emp = employees.alias("e")
mgr = employees.alias("m")

result = emp.join(mgr, col("e.manager_id") == col("m.emp_id"), "inner") \
    .filter(col("e.salary") > col("m.salary")) \
    .select(
        col("e.emp_id"),
        col("e.name").alias("employee"),
        col("e.salary").alias("emp_salary"),
        col("m.name").alias("manager"),
        col("m.salary").alias("mgr_salary")
    )

result.show()
15 Customers with no orders
Problem: Given customers(customer_id, name) and orders(order_id, customer_id),
find all customers who have never placed an order.
python — editable
# Method 1: left_anti join (cleanest, most efficient)
result = customers.join(orders, "customer_id", "left_anti") \
    .select("customer_id", "name")

# Method 2: left join + filter null
result = customers.join(orders, "customer_id", "left") \
    .filter(col("order_id").isNull()) \
    .select(customers.customer_id, customers.name)

result.show()

# ⚡ Prefer left_anti — cleaner, Catalyst handles it well
16 Deduplicate — keep latest record
Problem: Given events(event_id, user_id, event_type, created_at) with
duplicate rows (same user_id + event_type), keep only the latest.
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

w = Window.partitionBy("user_id", "event_type").orderBy(col("created_at").desc())

result = events \
    .withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

result.show()

# ⚡ Use row_number (not rank/dense_rank) — guarantees exactly 1 row per group
17 Month-over-Month revenue change
Problem: Given monthly_revenue(year_month, revenue), compute MoM %
change: (current - previous) / previous × 100
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, round

w = Window.orderBy("year_month")

result = monthly_revenue \
    .withColumn("prev_revenue", lag("revenue", 1).over(w)) \
    .withColumn(
        "mom_pct_change",
        round(
            (col("revenue") - col("prev_revenue")) / col("prev_revenue") * 100,
            2
        )
    ) \
    .select("year_month", "revenue", "prev_revenue", "mom_pct_change") \
    .orderBy("year_month")

result.show()

# ⚠️ First row: prev_revenue = null → mom_pct_change = null (expected)
# ⚠️ If prev_revenue can be 0: wrap denominator in nullif logic manually
18 Pivot: rows to columns (unpivot long → wide)
Problem: Given sales(product, month, revenue) in long format,
pivot to wide format with each month as a column.
python — editable
from pyspark.sql.functions import sum

result = sales.groupBy("product").pivot("month").agg(sum("revenue"))
result.show()

# Output:
# product | Jan  | Feb  | Mar
# --------|------|------|-----
# iPhone  | 1000 | 1200 | 900

# Optimization: provide list of pivot values to avoid extra scan
months = ["Jan", "Feb", "Mar", "Apr", "May"]
result = sales.groupBy("product").pivot("month", months).agg(sum("revenue"))
19 Explode array column + count tags
Problem: Given posts(post_id, user_id, tags) where tags is an array
like ["sports","tech","news"], find the top 5 most used tags.
python — editable
from pyspark.sql.functions import explode, col, count

result = posts \
    .withColumn("tag", explode(col("tags"))) \  # one row per tag
    .groupBy("tag") \
    .agg(count("*").alias("usage_count")) \
    .orderBy(col("usage_count").desc()) \
    .limit(5)

result.show()

SECTION 3: HARD QUESTIONS (H)

20 Find consecutive purchase days
Problem: Given orders(order_id, customer_id, order_date),
find customers who placed orders on at least 3 consecutive days.
Company: Amazon
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, datediff, count
from pyspark.sql.functions import row_number

# Step 1: Deduplicate (1 row per customer per date)
deduped = orders.select("customer_id", "order_date").distinct()

# Step 2: Assign row number per customer ordered by date
w = Window.partitionBy("customer_id").orderBy("order_date")
numbered = deduped.withColumn("rn", row_number().over(w))

# Step 3: Island key = date - rn (constant for consecutive dates)
from pyspark.sql.functions import date_sub, expr
islands = numbered.withColumn(
    "island_key",
    expr("date_sub(order_date, rn)")   # Spark SQL: date - rn = island constant
)

# Step 4: Group by island, find streaks >= 3
result = islands.groupBy("customer_id", "island_key") \
    .agg(count("*").alias("streak_len")) \
    .filter(col("streak_len") >= 3) \
    .select("customer_id") \
    .distinct()

result.show()
21 Session ID assignment (gap > 30 minutes = new session)
Problem: Given events(user_id, event_time), assign a session_id to
each event. A new session starts when gap > 30 minutes.
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, sum as _sum, unix_timestamp, when

w_order = Window.partitionBy("user_id").orderBy("event_time")

# Step 1: Flag new session start (gap > 30 min or first event)
flagged = events.withColumn(
    "prev_time", lag("event_time", 1).over(w_order)
).withColumn(
    "new_session",
    when(
        col("prev_time").isNull() |
        ((unix_timestamp("event_time") - unix_timestamp("prev_time")) > 1800),
        1
    ).otherwise(0)
)

# Step 2: Cumulative sum of new_session flags = session_id
w_cum = Window.partitionBy("user_id").orderBy("event_time") \
              .rowsBetween(Window.unboundedPreceding, Window.currentRow)

result = flagged.withColumn(
    "session_id",
    _sum("new_session").over(w_cum)
).select("user_id", "event_time", "session_id")

result.show()
22 Days with temperature higher than previous day
Problem: Given weather(id, record_date, temperature), find all dates
where temperature was higher than the previous day.
Company: Amazon, Google (LeetCode #197)
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, datediff

w = Window.orderBy("record_date")

result = weather \
    .withColumn("prev_temp", lag("temperature", 1).over(w)) \
    .withColumn("prev_date", lag("record_date", 1).over(w)) \
    .filter(
        (col("temperature") > col("prev_temp")) &
        (datediff(col("record_date"), col("prev_date")) == 1)  # must be consecutive day
    ) \
    .select("id", "record_date", "temperature")

result.show()

# ⚠️ KEY: must check datediff == 1 — data may have gaps (missing dates)
#         Without this check, you'd compare non-adjacent days
23 Longest streak of consecutive active days per user
Problem: Given logins(user_id, login_date), find the longest
consecutive daily login streak for each user.
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, count, max as _max
from pyspark.sql.functions import expr

# Step 1: Deduplicate (1 row per user per date)
deduped = logins.select("user_id", "login_date").distinct()

# Step 2: Row number per user ordered by date
w = Window.partitionBy("user_id").orderBy("login_date")
numbered = deduped.withColumn("rn", row_number().over(w))

# Step 3: island_key = date - rn (constant for consecutive sequences)
islands = numbered.withColumn(
    "island_key",
    expr("date_sub(login_date, rn)")
)

# Step 4: Count streak length per island
streak_lengths = islands.groupBy("user_id", "island_key") \
    .agg(count("*").alias("streak_len"))

# Step 5: Max streak per user
result = streak_lengths.groupBy("user_id") \
    .agg(_max("streak_len").alias("longest_streak")) \
    .orderBy(col("longest_streak").desc())

result.show()
24 Products bought together (market basket)
Problem: Given order_items(order_id, product_id), find all pairs of
products that appear in the same order. Show top 5 co-purchased pairs.
Company: Amazon
python — editable
from pyspark.sql.functions import col, count

o1 = order_items.alias("o1")
o2 = order_items.alias("o2")

result = o1.join(
    o2,
    (col("o1.order_id") == col("o2.order_id")) &
    (col("o1.product_id") < col("o2.product_id")),   # avoid duplicates + self-pairs
    "inner"
) \
.groupBy(col("o1.product_id").alias("product_a"),
         col("o2.product_id").alias("product_b")) \
.agg(count("*").alias("co_purchase_count")) \
.orderBy(col("co_purchase_count").desc()) \
.limit(5)

result.show()

# ⚡ KEY: o1.product_id < o2.product_id ensures each pair appears once
#         Without this: (A,B) and (B,A) both appear = duplicates
25 Funnel conversion rates
Problem: Given events(user_id, event_type) where event_type is one of:
'view', 'add_to_cart', 'purchase'
Compute conversion rate at each funnel stage.
Company: Meta, TikTok
python — editable
from pyspark.sql.functions import countDistinct, col, round, when, max as _max

# Method: MAX(CASE WHEN) per user — count users, not events
funnel = events.groupBy("user_id").agg(
    _max(when(col("event_type") == "view",         1).otherwise(0)).alias("viewed"),
    _max(when(col("event_type") == "add_to_cart",  1).otherwise(0)).alias("carted"),
    _max(when(col("event_type") == "purchase",     1).otherwise(0)).alias("purchased")
)

from pyspark.sql.functions import sum as _sum, lit

summary = funnel.agg(
    _sum("viewed").alias("total_views"),
    _sum("carted").alias("total_carted"),
    _sum("purchased").alias("total_purchased")
)

# Compute conversion rates
total_views  = summary.collect()[0]["total_views"]
total_carted = summary.collect()[0]["total_carted"]
total_purch  = summary.collect()[0]["total_purchased"]

print(f"View → Cart:     {total_carted/total_views*100:.1f}%")
print(f"Cart → Purchase: {total_purch/total_carted*100:.1f}%")
print(f"Overall:         {total_purch/total_views*100:.1f}%")

# ⚡ KEY: MAX(CASE WHEN) per user ensures each user counted once per stage
#         Even if user viewed 10 times, they count as 1 viewer
26 Dedup with composite key (keep most recent per key combo)
Problem: Given cdc_events(id, name, dept, salary, updated_at) representing
CDC stream with multiple updates per employee, keep the latest
record per (id, dept) combination.
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

w = Window.partitionBy("id", "dept").orderBy(col("updated_at").desc())

result = cdc_events \
    .withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1) \
    .drop("rn")

result.show()

# ⚡ row_number guarantees exactly 1 row per partition (unlike rank/dense_rank)
# Always use row_number for deduplication, not rank

SECTION 4: SCENARIO QUESTIONS (S)

27 Handle data skew with salting
Problem: You have orders(order_id, country_code, amount) and a countries
lookup table. 80% of orders are from "US". The join on country_code
is very slow. How do you fix it?
python — editable
from pyspark.sql.functions import rand, concat, lit, col, explode, array

SALT_FACTOR = 10

# APPROACH 1: Broadcast (if countries table is small — BEST approach)
from pyspark.sql.functions import broadcast
result = orders.join(broadcast(countries), "country_code")

# APPROACH 2: Salting (when both tables are large and skewed)

# Step 1: Salt the large orders table
orders_salted = orders.withColumn(
    "salted_key",
    concat(col("country_code"), lit("_"),
           (rand() * SALT_FACTOR).cast("int").cast("string"))
)

# Step 2: Explode countries to match all salt values (0 to SALT_FACTOR-1)
countries_exploded = countries.withColumn(
    "salt", explode(array([lit(i) for i in range(SALT_FACTOR)]))
).withColumn(
    "salted_key",
    concat(col("country_code"), lit("_"), col("salt").cast("string"))
)

# Step 3: Join on salted key
result = orders_salted.join(countries_exploded, "salted_key", "inner") \
    .drop("salted_key", "salt")

result.show()

# APPROACH 3: Enable AQE (automatic, no code change)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE detects hot partitions at runtime and splits them
28 Optimize slow join with broadcast hint
Problem: An ETL job joining transactions(100M rows) with
store_metadata(500 rows) is taking 2 hours due to Sort-Merge Join.
How do you fix it?
python — editable
10 MB

# SOLUTION 1: Manual broadcast hint (works immediately)
result = transactions.join(
    broadcast(store_metadata),  # forces BroadcastHashJoin, no shuffle on store_metadata
    "store_id"
)

# SOLUTION 2: Increase threshold (if you want auto-broadcast for similar tables)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800")  # 50 MB

# SOLUTION 3: Cache small table + broadcast
store_metadata.cache()
store_metadata.count()  # trigger cache
result = transactions.join(broadcast(store_metadata), "store_id")

# VERIFICATION: Check join type in plan
result.explain("formatted")
# Look for: BroadcastHashJoin (not SortMergeJoin)

# IMPACT: Sort-Merge Join = 2 shuffles + 2 sorts + merge per 100M rows
#         Broadcast Hash Join = 0 shuffles + hash lookup per 100M rows
#         Speedup: often 10-50x for large fact + small dim joins">from pyspark.sql.functions import broadcast

# Problem: Catalyst doesn't auto-broadcast (store_metadata stats not computed)
# Default autoBroadcastJoinThreshold = 10 MB

# SOLUTION 1: Manual broadcast hint (works immediately)
result = transactions.join(
    broadcast(store_metadata),  # forces BroadcastHashJoin, no shuffle on store_metadata
    "store_id"
)

# SOLUTION 2: Increase threshold (if you want auto-broadcast for similar tables)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800")  # 50 MB

# SOLUTION 3: Cache small table + broadcast
store_metadata.cache()
store_metadata.count()  # trigger cache
result = transactions.join(broadcast(store_metadata), "store_id")

# VERIFICATION: Check join type in plan
result.explain("formatted")
# Look for: BroadcastHashJoin (not SortMergeJoin)

# IMPACT: Sort-Merge Join = 2 shuffles + 2 sorts + merge per 100M rows
#         Broadcast Hash Join = 0 shuffles + hash lookup per 100M rows
#         Speedup: often 10-50x for large fact + small dim joins
29 Read multiple sources + track file origin
Problem: You have daily Parquet files in S3 for Jan, Feb, Mar 2024.
Read them all, add a "source_file" column, and combine.
Some files have extra columns not present in others.
python — editable
from pyspark.sql.functions import input_file_name, col, lit

# APPROACH 1: Glob pattern (all files match same schema)
df = spark.read.parquet("s3://bucket/data/year=2024/month=*/") \
    .withColumn("source_file", input_file_name())

# APPROACH 2: Read separately + unionByName (different schemas)
jan = spark.read.parquet("s3://bucket/data/year=2024/month=01/") \
    .withColumn("source_month", lit("Jan"))
feb = spark.read.parquet("s3://bucket/data/year=2024/month=02/") \
    .withColumn("source_month", lit("Feb"))
mar = spark.read.parquet("s3://bucket/data/year=2024/month=03/") \
    .withColumn("source_month", lit("Mar"))

# allowMissingColumns=True fills missing cols with null instead of failing
combined = jan.unionByName(feb, allowMissingColumns=True) \
              .unionByName(mar, allowMissingColumns=True)

combined.show()

# APPROACH 3: Python list of paths
paths = [
    "s3://bucket/2024-01/",
    "s3://bucket/2024-02/",
    "s3://bucket/2024-03/"
]
df = spark.read.parquet(*paths).withColumn("source_file", input_file_name())
30 Flatten nested JSON to flat table
Problem: You receive JSON with nested structure:
{
"order_id": 1,
"customer": {"id": 101, "name": "Alice", "city": "NYC"},
"items": [
{"sku": "A1", "qty": 2, "price": 10.0},
{"sku": "B2", "qty": 1, "price": 25.0}
]
}
Flatten to: order_id, customer_id, customer_name, city, sku, qty, price
python — editable
from pyspark.sql.functions import col, explode_outer

# Read raw JSON (schema inferred or explicit)
raw = spark.read.option("multiLine", True).json("orders/*.json")

# raw schema:
# order_id: long
# customer: struct<id: long, name: string, city: string>
# items: array<struct<sku: string, qty: int, price: double>>

# Step 1: Explode items array (one row per item)
exploded = raw.withColumn("item", explode_outer("items"))  # outer = keep null items

# Step 2: Select and flatten nested fields with dot notation
result = exploded.select(
    col("order_id"),
    col("customer.id").alias("customer_id"),
    col("customer.name").alias("customer_name"),
    col("customer.city").alias("city"),
    col("item.sku").alias("sku"),
    col("item.qty").alias("qty"),
    col("item.price").alias("price")
)

result.show()

# ⚡ explode_outer vs explode:
#   explode → drops orders with null/empty items array
#   explode_outer → keeps the order row with null item fields (safer)

SECTION 5: LARGE DATA + MEMORY + INFRASTRUCTURE SCENARIOS

31 Process 200 GB data with only 16 GB executor memory
Problem: You have 200 GB of data to process but each executor has only
16 GB of memory. The job keeps crashing with OOM errors.
Walk through how you would handle this end to end.
Company: Asked at Amazon, Google, Databricks
ROOT CAUSE ANALYSIS
The mistake is trying to hold too much data in memory at once.
Spark does NOT need to load 200 GB into RAM simultaneously.
Spark processes data in PARTITIONS — one partition per task per core.
Goal: keep each partition small enough to fit in one executor core's memory share.
python — editable
200 MB in memory
# 200 GB data → 200,000 MB / 128 MB per partition = ~1,600 partitions

data_size_gb = 200
target_partition_mb = 128
n_partitions = int((data_size_gb * 1024) / target_partition_mb)  # = 1600

spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))

# ── STEP 2: TUNE EXECUTOR MEMORY ─────────────────────────────────────────────
# With 16 GB executor, 4 cores → each core gets ~4 GB working memory
# Memory layout:
#   spark.executor.memory = 14g  (leave 2g for OS + overhead)
#   spark.executor.memoryOverhead = 2g  (native/Python overhead)
#   spark.memory.fraction = 0.6  → 8.4g for Spark execution + storage
#   Each core → ~2g execution memory for shuffle/joins

# ── STEP 3: AVOID CACHING 200 GB ──────────────────────────────────────────────
# DO NOT: df.cache()  ← 200 GB won't fit, causes eviction thrashing
# DO: Process in a streaming fashion (pipeline partitions through)

# ── STEP 4: FILTER EARLY — reduce data before joins ──────────────────────────
df = spark.read.parquet("s3://bucket/data/") \
    .filter(col("date") >= "2024-01-01") \   # push filter to Parquet reader
    .filter(col("region") == "US") \          # reduce data ASAP
    .select("id", "amount", "date")            # column pruning (read only needed cols)

# ── STEP 5: AVOID WIDE OPERATIONS ON FULL 200 GB ──────────────────────────────
# Bad: join 200 GB table with another 200 GB table (400 GB shuffle)
# Good: pre-aggregate one side first, then join reduced result
pre_agg = large_df.groupBy("customer_id") \
    .agg(sum("amount").alias("total"))   # 200 GB → maybe 10 GB after aggregation
result = pre_agg.join(customers, "customer_id")  # join 10 GB, not 200 GB

# ── STEP 6: ENABLE AQE ───────────────────────────────────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE will auto-split/merge partitions based on actual sizes

# ── STEP 7: WRITE IN PARTITIONS, NOT COLLECT ─────────────────────────────────
# NEVER: result.collect()  ← pulls 200 GB to Driver → Driver OOM
# DO: result.write.parquet("s3://output/")  ← each executor writes its partitions

# ── STEP 8: USE DISK SPILL (not OOM) ─────────────────────────────────────────
# If a shuffle really must exceed memory, configure spill to disk:
spark.conf.set("spark.executor.memory", "14g")
spark.conf.set("spark.memory.fraction", "0.6")
# Execution pool will spill to disk automatically (slowdown vs OOM)

# ── FULL CONFIG FOR THIS SCENARIO ────────────────────────────────────────────
spark = SparkSession.builder \
    .config("spark.executor.memory", "14g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.sql.shuffle.partitions", "1600") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") \  # 128 MB
    .config("spark.sql.files.maxPartitionBytes", "134217728") \   # 128 MB per input partition
    .getOrCreate()"># ── STEP 1: ESTIMATE RIGHT PARTITION COUNT ──────────────────────────────────
# Rule: each partition should be ~100-200 MB in memory
# 200 GB data → 200,000 MB / 128 MB per partition = ~1,600 partitions

data_size_gb = 200
target_partition_mb = 128
n_partitions = int((data_size_gb * 1024) / target_partition_mb)  # = 1600

spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))

# ── STEP 2: TUNE EXECUTOR MEMORY ─────────────────────────────────────────────
# With 16 GB executor, 4 cores → each core gets ~4 GB working memory
# Memory layout:
#   spark.executor.memory = 14g  (leave 2g for OS + overhead)
#   spark.executor.memoryOverhead = 2g  (native/Python overhead)
#   spark.memory.fraction = 0.6  → 8.4g for Spark execution + storage
#   Each core → ~2g execution memory for shuffle/joins

# ── STEP 3: AVOID CACHING 200 GB ──────────────────────────────────────────────
# DO NOT: df.cache()  ← 200 GB won't fit, causes eviction thrashing
# DO: Process in a streaming fashion (pipeline partitions through)

# ── STEP 4: FILTER EARLY — reduce data before joins ──────────────────────────
df = spark.read.parquet("s3://bucket/data/") \
    .filter(col("date") >= "2024-01-01") \   # push filter to Parquet reader
    .filter(col("region") == "US") \          # reduce data ASAP
    .select("id", "amount", "date")            # column pruning (read only needed cols)

# ── STEP 5: AVOID WIDE OPERATIONS ON FULL 200 GB ──────────────────────────────
# Bad: join 200 GB table with another 200 GB table (400 GB shuffle)
# Good: pre-aggregate one side first, then join reduced result
pre_agg = large_df.groupBy("customer_id") \
    .agg(sum("amount").alias("total"))   # 200 GB → maybe 10 GB after aggregation
result = pre_agg.join(customers, "customer_id")  # join 10 GB, not 200 GB

# ── STEP 6: ENABLE AQE ───────────────────────────────────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE will auto-split/merge partitions based on actual sizes

# ── STEP 7: WRITE IN PARTITIONS, NOT COLLECT ─────────────────────────────────
# NEVER: result.collect()  ← pulls 200 GB to Driver → Driver OOM
# DO: result.write.parquet("s3://output/")  ← each executor writes its partitions

# ── STEP 8: USE DISK SPILL (not OOM) ─────────────────────────────────────────
# If a shuffle really must exceed memory, configure spill to disk:
spark.conf.set("spark.executor.memory", "14g")
spark.conf.set("spark.memory.fraction", "0.6")
# Execution pool will spill to disk automatically (slowdown vs OOM)

# ── FULL CONFIG FOR THIS SCENARIO ────────────────────────────────────────────
spark = SparkSession.builder \
    .config("spark.executor.memory", "14g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.sql.shuffle.partitions", "1600") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") \  # 128 MB
    .config("spark.sql.files.maxPartitionBytes", "134217728") \   # 128 MB per input partition
    .getOrCreate()
INTERVIEW ANSWER FRAMEWORK
1. "Spark doesn't load 200 GB at once — it processes in partitions"
2. "Target 100-200 MB per partition~1600 partitions for 200 GB"
3. "Filter + column prune early to reduce data volume"
4. "Pre-aggregate before joins to reduce shuffle size"
5. "Enable AQE for runtime adaptation"
6. "Write to disk, never collect() large results"
7. "Configure disk spill as safety net (not OOM, just slower)"
32 Reading bad/corrupt data — 3 modes + handling
Problem: You are reading a CSV file from an external vendor. Some rows
have corrupt data (wrong column count, bad types, malformed JSON).
How do you handle bad records without failing the whole job?
python — editable
# ── THE 3 READ MODES ─────────────────────────────────────────────────────────
#
# PERMISSIVE  (default): Parse what you can, set corrupt record to null
#                        Adds _corrupt_record column with the bad raw line
# DROPMALFORMED:         Silently drop bad rows, keep good ones
# FAILFAST:             Throw exception on first bad record (fail the job)

# ── MODE 1: PERMISSIVE — capture bad rows for inspection ─────────────────────
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

schema = StructType([
    StructField("id",     IntegerType(),  nullable=True),
    StructField("name",   StringType(),   nullable=True),
    StructField("amount", DoubleType(),   nullable=True),
    StructField("_corrupt_record", StringType(), nullable=True)  # captures bad rows
])

df = spark.read \
    .schema(schema) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .option("header", True) \
    .csv("data/vendor_feed.csv")

# Separate good and bad records
good_records = df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
bad_records  = df.filter(col("_corrupt_record").isNotNull())

print(f"Good rows: {good_records.count()}")
print(f"Bad rows:  {bad_records.count()}")

# Save bad records for investigation / reprocessing
bad_records.write.mode("overwrite").json("s3://bucket/bad-records/")

# ── MODE 2: DROPMALFORMED — silently drop bad rows ────────────────────────────
df_clean = spark.read \
    .schema(schema) \
    .option("mode", "DROPMALFORMED") \
    .option("header", True) \
    .csv("data/vendor_feed.csv")
# Good for: non-critical data where a few bad rows are acceptable

# ── MODE 3: FAILFAST — crash on any bad row ───────────────────────────────────
df_strict = spark.read \
    .schema(schema) \
    .option("mode", "FAILFAST") \
    .option("header", True) \
    .csv("data/vendor_feed.csv")
# Good for: financial/compliance data where ANY bad row = stop everything

# ── BADRECORDSPATH — Spark 2.2+ auto-save bad records ────────────────────────
df = spark.read \
    .schema(schema) \
    .option("badRecordsPath", "s3://bucket/bad-records/") \
    .option("header", True) \
    .csv("data/vendor_feed.csv")
# Spark automatically writes all bad records + reasons to badRecordsPath
# Each bad record file: { "path": "...", "reason": "...", "record": "..." }

# ── JSON-SPECIFIC: multiline + corrupt handling ───────────────────────────────
df_json = spark.read \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt_record") \
    .option("multiLine", True) \   # for JSON arrays/objects spanning multiple lines
    .json("data/*.json")

# ── PARQUET corrupt file handling ─────────────────────────────────────────────
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")   # skip corrupt files
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")   # skip missing files
df_parquet = spark.read.parquet("s3://bucket/data/")
# Great for: reading historical S3 data where some files may be deleted/corrupt

# ── NULL HANDLING AFTER READ ──────────────────────────────────────────────────
# After PERMISSIVE read: types that failed cast → null
df_clean = df \
    .filter(col("id").isNotNull()) \       # drop rows where id failed to parse
    .na.fill({"amount": 0.0, "name": "UNKNOWN"})  # fill other nulls
🧠 PERMISSIVE → default, investigate bad data, save separately
MODE DECISION
PERMISSIVEdefault, investigate bad data, save separately
DROPMALFORMEDdata quality issues are expected, non-critical pipeline
FAILFASTfinancial/audit pipelines where bad data → stop everything
badRecordsPathproduction pipelines (auto-quarantine bad records)
ignoreCorruptFilesreading from S3/HDFS where files may disappear
33 Recursively read all files from a directory tree
Problem: You have files organized like:
/data/2024/01/01/part-001.parquet
/data/2024/01/02/part-001.parquet
/data/2024/02/01/part-001.parquet
How do you read ALL files recursively in one DataFrame?
python — editable
24 hours (Spark 3.4+):
df = spark.read \
    .option("modifiedAfter", "2024-01-01T00:00:00") \
    .option("modifiedBefore", "2024-01-02T00:00:00") \
    .option("recursiveFileLookup", "true") \
    .parquet("s3://bucket/data/")

# ── METHOD 5: Python glob → collect paths → pass to Spark ────────────────────
import boto3, os

# For S3: list all parquet files recursively
s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "data/2024/"
all_paths = [
    f"s3://{bucket}/{obj['Key']}"
    for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
    if obj["Key"].endswith(".parquet")
]

df = spark.read.parquet(*all_paths)

# For local filesystem:
import glob
all_paths = glob.glob("/data/**/*.parquet", recursive=True)  # recursive=True is key!
df = spark.read.parquet(*all_paths)

# ── ADD SOURCE FILE TRACKING ──────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name

df = spark.read \
    .option("recursiveFileLookup", "true") \
    .parquet("s3://bucket/data/") \
    .withColumn("source_file", input_file_name())

df.show(truncate=False)
# Each row knows exactly which file it came from

# ── VALIDATE WHAT WAS READ ────────────────────────────────────────────────────
df.select("source_file").distinct().show(100, truncate=False)
# Shows all unique file paths that contributed to the DataFrame"># ── METHOD 1: recursiveFileLookup (Spark 3.0+) ────────────────────────────────
# Reads ALL files recursively under the root path — ignores partition structure
df = spark.read \
    .option("recursiveFileLookup", "true") \
    .parquet("s3://bucket/data/")
# Reads every .parquet file in the entire /data/ tree recursively

# ── METHOD 2: Glob patterns (most flexible) ───────────────────────────────────
# All files 2 levels deep:
df = spark.read.parquet("s3://bucket/data/*/*/")

# All files any depth (Hadoop glob):
df = spark.read.parquet("s3://bucket/data/")  # Spark auto-recurses for Parquet dirs

# Specific year range:
df = spark.read.parquet("s3://bucket/data/2024/*/")

# Multiple specific months:
df = spark.read.parquet(
    "s3://bucket/data/2024/01/",
    "s3://bucket/data/2024/02/",
    "s3://bucket/data/2024/03/"
)

# ── METHOD 3: pathGlobFilter — read only specific file types ──────────────────
# In a mixed directory (CSVs + Parquets + JSONs), read only CSVs:
df = spark.read \
    .option("pathGlobFilter", "*.csv") \
    .option("recursiveFileLookup", "true") \
    .csv("s3://bucket/mixed-data/")

# Read only files matching a date pattern:
df = spark.read \
    .option("pathGlobFilter", "2024-01-*.parquet") \
    .parquet("s3://bucket/data/")

# ── METHOD 4: modifiedAfter / modifiedBefore — time-based filtering ───────────
# Read only files modified in the last 24 hours (Spark 3.4+):
df = spark.read \
    .option("modifiedAfter", "2024-01-01T00:00:00") \
    .option("modifiedBefore", "2024-01-02T00:00:00") \
    .option("recursiveFileLookup", "true") \
    .parquet("s3://bucket/data/")

# ── METHOD 5: Python glob → collect paths → pass to Spark ────────────────────
import boto3, os

# For S3: list all parquet files recursively
s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "data/2024/"
all_paths = [
    f"s3://{bucket}/{obj['Key']}"
    for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
    if obj["Key"].endswith(".parquet")
]

df = spark.read.parquet(*all_paths)

# For local filesystem:
import glob
all_paths = glob.glob("/data/**/*.parquet", recursive=True)  # recursive=True is key!
df = spark.read.parquet(*all_paths)

# ── ADD SOURCE FILE TRACKING ──────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name

df = spark.read \
    .option("recursiveFileLookup", "true") \
    .parquet("s3://bucket/data/") \
    .withColumn("source_file", input_file_name())

df.show(truncate=False)
# Each row knows exactly which file it came from

# ── VALIDATE WHAT WAS READ ────────────────────────────────────────────────────
df.select("source_file").distinct().show(100, truncate=False)
# Shows all unique file paths that contributed to the DataFrame
🧠 Memory Map
DECISION TABLE
ScenarioMethod
────────────────────────────────────────────────────────────────────
All files under root (same schema) → recursiveFileLookup=true
All files matching date patternglob: "path/2024/*/"
Only .csv files in mixed directorypathGlobFilter="*.csv"
Files from last N hours onlymodifiedAfter option
Know exact file paths (dynamic list) → spark.read.parquet(*paths)
Track which file each row came frominput_file_name()
34 OOM during groupBy on 500M rows
Problem: groupBy("country").agg(collect_list("event")) on 500M rows
causes executor OOM. How do you fix it?
python — editable
# ── DIAGNOSE THE PROBLEM ──────────────────────────────────────────────────────
# collect_list() = collects ALL values per key into memory on ONE executor
# If "US" has 400M events → collect_list creates 400M element list in 1 executor
# That's the OOM culprit, not groupBy itself

# ── SOLUTION 1: Don't use collect_list — aggregate instead ────────────────────
# Bad (OOM):
df.groupBy("country").agg(collect_list("event"))

# Good (if you need count/sum/stats, not the full list):
from pyspark.sql.functions import count, countDistinct, sum
df.groupBy("country").agg(
    count("event").alias("event_count"),
    countDistinct("event").alias("unique_events")
)

# ── SOLUTION 2: Use collect_set with limit ────────────────────────────────────
from pyspark.sql.functions import slice, collect_list, col

# Sample first N values per key (avoid unbounded collection)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

w = Window.partitionBy("country").orderBy("event_time")
df_limited = df.withColumn("rn", row_number().over(w)) \
               .filter(col("rn") <= 1000)   # max 1000 events per country

result = df_limited.groupBy("country").agg(collect_list("event").alias("top_events"))

# ── SOLUTION 3: Increase shuffle partitions to reduce per-partition size ───────
spark.conf.set("spark.sql.shuffle.partitions", "2000")  # more, smaller partitions
spark.conf.set("spark.executor.memory", "16g")
# More partitions = less data per task = less memory per task

# ── SOLUTION 4: Pre-filter to reduce data volume ─────────────────────────────
# Before the groupBy, eliminate data you don't need
df_filtered = df.filter(col("date") >= "2024-01-01") \
                .filter(col("is_valid") == True) \
                .select("country", "event", "event_time")   # column pruning
result = df_filtered.groupBy("country").agg(count("event"))

# ── SOLUTION 5: Use aggregateByKey (RDD) for full control ─────────────────────
# When DataFrame API can't express what you need
rdd = df.select("country", "event").rdd.map(lambda r: (r.country, r.event))

# Pre-aggregate locally: build set of unique events per partition
from pyspark.sql.functions import col

result_rdd = rdd.aggregateByKey(
    set(),                                           # initial accumulator = empty set
    lambda acc, val: acc | {val},                    # within partition: union set
    lambda acc1, acc2: acc1 | acc2                   # cross partition: union sets
)
result = spark.createDataFrame(
    result_rdd.map(lambda x: (x[0], list(x[1]))),
    ["country", "unique_events"]
)

# ── SOLUTION 6: Enable AQE skew handling ──────────────────────────────────────
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3")
# AQE splits the "US" partition into smaller sub-partitions automatically
35 Job works on 10GB weekdays, fails on 500GB Mondays
Problem: Daily ETL runs fine Mon-Fri on ~10 GB. Every Monday after the
weekend accumulation it processes ~500 GB and crashes.
How do you design this to handle both cases?
python — editable
10 GB, fail at 500 GB ─────────────────

# ── SOLUTION 1: Enable AQE (auto-adapts at runtime) ───────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE merges tiny partitions on small days, splits large ones on big days
# shuffle.partitions can be set to 2000 and AQE will coalesce on small days

# ── SOLUTION 2: Dynamic Allocation (scale executors to data) ───────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")   # scale up for Monday
spark.conf.set("spark.shuffle.service.enabled", "true")
# On 10 GB day: ~5 executors. On 500 GB Monday: auto-scales to 100+ executors

# ── SOLUTION 3: Data-driven shuffle partition calculation ─────────────────────
# Before running the main job, estimate data size and set partitions
def estimate_partitions(spark, path, target_mb=128):
    """Estimate optimal partition count from input data size."""
    file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
        .FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
                        spark.sparkContext._jsc.hadoopConfiguration()) \
        .getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
        .getLength()
    size_mb = file_sizes / (1024 * 1024)
    partitions = max(200, int(size_mb / target_mb))
    return partitions

n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))

# ── SOLUTION 4: Add checkpoint mid-job (survive retries on large runs) ─────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")

df = spark.read.parquet("s3://bucket/data/")

# After expensive operation: checkpoint to cut lineage + save progress
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint()  # writes to S3 — if job fails, restart from here
after_join.unpersist()

# ── SOLUTION 5: Partition the input table by date for pruning ─────────────────
# Write data partitioned by date so Monday read only reads weekend partition
df.write \
    .partitionBy("year", "month", "day") \
    .mode("append") \
    .parquet("s3://bucket/data-partitioned/")

# Monday read: only reads Sat+Sun partitions (not entire history)
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
    .filter(col("year") == 2024) \
    .filter(col("month") == 1) \
    .filter(col("day").isin(6, 7))   # Saturday + Sunday only

# ── COMPLETE ROBUST ETL PATTERN ───────────────────────────────────────────────
from datetime import datetime, timedelta

def run_etl(spark, date_str):
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
    spark.conf.set("spark.sql.shuffle.partitions", "2000")  # AQE coalesces down

    df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
        .filter(col("is_valid") == True) \
        .select("id", "amount", "category")

    result = df.groupBy("category") \
               .agg(sum("amount").alias("total"))

    result.write.mode("overwrite") \
          .parquet(f"s3://bucket/output/date={date_str}/")"># ── ROOT CAUSE: Static configs tuned for 10 GB, fail at 500 GB ─────────────────

# ── SOLUTION 1: Enable AQE (auto-adapts at runtime) ───────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE merges tiny partitions on small days, splits large ones on big days
# shuffle.partitions can be set to 2000 and AQE will coalesce on small days

# ── SOLUTION 2: Dynamic Allocation (scale executors to data) ───────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")   # scale up for Monday
spark.conf.set("spark.shuffle.service.enabled", "true")
# On 10 GB day: ~5 executors. On 500 GB Monday: auto-scales to 100+ executors

# ── SOLUTION 3: Data-driven shuffle partition calculation ─────────────────────
# Before running the main job, estimate data size and set partitions
def estimate_partitions(spark, path, target_mb=128):
    """Estimate optimal partition count from input data size."""
    file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
        .FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
                        spark.sparkContext._jsc.hadoopConfiguration()) \
        .getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
        .getLength()
    size_mb = file_sizes / (1024 * 1024)
    partitions = max(200, int(size_mb / target_mb))
    return partitions

n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))

# ── SOLUTION 4: Add checkpoint mid-job (survive retries on large runs) ─────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")

df = spark.read.parquet("s3://bucket/data/")

# After expensive operation: checkpoint to cut lineage + save progress
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint()  # writes to S3 — if job fails, restart from here
after_join.unpersist()

# ── SOLUTION 5: Partition the input table by date for pruning ─────────────────
# Write data partitioned by date so Monday read only reads weekend partition
df.write \
    .partitionBy("year", "month", "day") \
    .mode("append") \
    .parquet("s3://bucket/data-partitioned/")

# Monday read: only reads Sat+Sun partitions (not entire history)
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
    .filter(col("year") == 2024) \
    .filter(col("month") == 1) \
    .filter(col("day").isin(6, 7))   # Saturday + Sunday only

# ── COMPLETE ROBUST ETL PATTERN ───────────────────────────────────────────────
from datetime import datetime, timedelta

def run_etl(spark, date_str):
    spark.conf.set("spark.sql.adaptive.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.enabled", "true")
    spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
    spark.conf.set("spark.sql.shuffle.partitions", "2000")  # AQE coalesces down

    df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
        .filter(col("is_valid") == True) \
        .select("id", "amount", "category")

    result = df.groupBy("category") \
               .agg(sum("amount").alias("total"))

    result.write.mode("overwrite") \
          .parquet(f"s3://bucket/output/date={date_str}/")
36 Schema evolution — new column added to source
Problem: Your daily Parquet data source added a new column "discount_pct"
starting 2024-03-01. Older files don't have it. Reading all
historical data fails with schema mismatch. How to handle?
python — editable
# ── PROBLEM: Reading old + new files fails with schema mismatch ───────────────
# old files: (id, amount, category)
# new files: (id, amount, category, discount_pct)   ← new column
# spark.read.parquet("all/") → AnalysisException: schema mismatch

# ── SOLUTION 1: mergeSchema option (Parquet + Delta native) ───────────────────
df = spark.read \
    .option("mergeSchema", "true") \    # unions all schemas, missing cols = null
    .parquet("s3://bucket/data/")
# Old file rows: discount_pct = null
# New file rows: discount_pct = actual value

# ── SOLUTION 2: unionByName with allowMissingColumns ──────────────────────────
old_df = spark.read.parquet("s3://bucket/data/before-2024-03-01/")
new_df = spark.read.parquet("s3://bucket/data/from-2024-03-01/")

combined = old_df.unionByName(new_df, allowMissingColumns=True)
# old_df rows: discount_pct = null (added automatically)
# new_df rows: discount_pct = actual value

# ── SOLUTION 3: Define explicit schema that includes all columns ───────────────
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

full_schema = StructType([
    StructField("id",            LongType(),   nullable=True),
    StructField("amount",        DoubleType(), nullable=True),
    StructField("category",      StringType(), nullable=True),
    StructField("discount_pct",  DoubleType(), nullable=True)  # new col, nullable
])

# Read old files with explicit schema → missing columns filled with null
old_df = spark.read.schema(full_schema).parquet("s3://bucket/data/before-2024-03-01/")
# No error — schema is applied, discount_pct = null for all old rows

# ── SOLUTION 4: Delta Lake (best for production schema evolution) ─────────────
# When writing:
new_df.write \
    .format("delta") \
    .option("mergeSchema", "true") \   # allow adding new columns
    .mode("append") \
    .save("s3://bucket/delta-table/")

# Or set globally:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

# ── HANDLE NULL in new column after merge ─────────────────────────────────────
from pyspark.sql.functions import coalesce, lit

combined = combined.withColumn(
    "discount_pct",
    coalesce(col("discount_pct"), lit(0.0))   # treat missing as 0% discount
)
37 How to estimate the right number of partitions for a job
Problem: You're given a 500 GB dataset to aggregate. How do you decide
how many partitions to use? Walk through the math.
Company: Asked at Databricks, Google
python — editable
200 MB per partition (sweet spot)
# Too small: task scheduling overhead + small file problem
# Too large: OOM risk, low parallelism

# FORMULA: n_partitions = ceil(data_size_MB / target_partition_MB)
# For 500 GB: 500,000 MB / 128 MB = ~3,900 partitions → set to 4000

data_size_gb    = 500
target_part_mb  = 128
n_partitions    = int((data_size_gb * 1024) / target_part_mb)   # 4000

spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))

# ── ALSO: SET INPUT PARTITION SIZE ────────────────────────────────────────────
# Controls how Spark splits input files into tasks
spark.conf.set("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024))  # 128 MB

# ── CHECK CURRENT PARTITION COUNT ─────────────────────────────────────────────
df = spark.read.parquet("s3://bucket/500 gb-data/")
print(f"Input partitions: {df.rdd.getNumPartitions()}")
# Should be ~3900-4000 for 500 GB at 128 MB per partition

# ── VERIFY PARTITION BALANCE ──────────────────────────────────────────────────
from pyspark.sql.functions import spark_partition_id, count

df.withColumn("pid", spark_partition_id()) \
  .groupBy("pid") \
  .agg(count("*").alias("rows_in_partition")) \
  .describe("rows_in_partition") \  # check mean, stddev, min, max
  .show()
# Healthy: stddev/mean < 0.3 (relatively even)
# Skewed:  max >> mean (one partition has many more rows)

# ── CORES vs PARTITIONS RELATIONSHIP ─────────────────────────────────────────
# Rule: partitions should be a MULTIPLE of total cores
# 50 executors × 4 cores = 200 total cores
# Good partition counts: 200, 400, 800, 1600, 4000 (multiples of 200)
# This ensures all cores stay busy (no idle cores waiting)

total_cores = 50 * 4   # 200
# Round partition count to nearest multiple of total_cores
import math
n_partitions_adjusted = math.ceil(n_partitions / total_cores) * total_cores  # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions_adjusted))

# ── LET AQE DO FINAL TUNING ───────────────────────────────────────────────────
# Set partitions to a safe HIGH number → AQE coalesces down to right size
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728")  # 128 MB
spark.conf.set("spark.sql.shuffle.partitions", "8000")  # AQE will reduce this

# INTERVIEW SUMMARY:
# 1. Formula: data_MB / 128 MB = partition count
# 2. Round to multiple of total executor cores
# 3. Set input maxPartitionBytes = same target (128 MB)
# 4. Enable AQE to auto-tune at runtime
# 5. Verify with describe() on partition sizes"># ── THE FORMULA ───────────────────────────────────────────────────────────────
# Target partition size: 100-200 MB per partition (sweet spot)
# Too small: task scheduling overhead + small file problem
# Too large: OOM risk, low parallelism

# FORMULA: n_partitions = ceil(data_size_MB / target_partition_MB)
# For 500 GB: 500,000 MB / 128 MB = ~3,900 partitions → set to 4000

data_size_gb    = 500
target_part_mb  = 128
n_partitions    = int((data_size_gb * 1024) / target_part_mb)   # 4000

spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))

# ── ALSO: SET INPUT PARTITION SIZE ────────────────────────────────────────────
# Controls how Spark splits input files into tasks
spark.conf.set("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024))  # 128 MB

# ── CHECK CURRENT PARTITION COUNT ─────────────────────────────────────────────
df = spark.read.parquet("s3://bucket/500 gb-data/")
print(f"Input partitions: {df.rdd.getNumPartitions()}")
# Should be ~3900-4000 for 500 GB at 128 MB per partition

# ── VERIFY PARTITION BALANCE ──────────────────────────────────────────────────
from pyspark.sql.functions import spark_partition_id, count

df.withColumn("pid", spark_partition_id()) \
  .groupBy("pid") \
  .agg(count("*").alias("rows_in_partition")) \
  .describe("rows_in_partition") \  # check mean, stddev, min, max
  .show()
# Healthy: stddev/mean < 0.3 (relatively even)
# Skewed:  max >> mean (one partition has many more rows)

# ── CORES vs PARTITIONS RELATIONSHIP ─────────────────────────────────────────
# Rule: partitions should be a MULTIPLE of total cores
# 50 executors × 4 cores = 200 total cores
# Good partition counts: 200, 400, 800, 1600, 4000 (multiples of 200)
# This ensures all cores stay busy (no idle cores waiting)

total_cores = 50 * 4   # 200
# Round partition count to nearest multiple of total_cores
import math
n_partitions_adjusted = math.ceil(n_partitions / total_cores) * total_cores  # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions_adjusted))

# ── LET AQE DO FINAL TUNING ───────────────────────────────────────────────────
# Set partitions to a safe HIGH number → AQE coalesces down to right size
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728")  # 128 MB
spark.conf.set("spark.sql.shuffle.partitions", "8000")  # AQE will reduce this

# INTERVIEW SUMMARY:
# 1. Formula: data_MB / 128 MB = partition count
# 2. Round to multiple of total executor cores
# 3. Set input maxPartitionBytes = same target (128 MB)
# 4. Enable AQE to auto-tune at runtime
# 5. Verify with describe() on partition sizes
38 Avoid recomputing an expensive DataFrame 3 times
Problem: You build a DataFrame with an expensive join + aggregation.
You then use it for: (1) write to Parquet, (2) send to a report,
(3) compute anomalies. How to avoid computing it 3 times?
python — editable
# ── PROBLEM: Without caching ─────────────────────────────────────────────────
expensive_df = raw.join(reference, "id") \           # expensive
                  .groupBy("region") \
                  .agg(sum("amount").alias("total"))  # expensive aggregation

# Each action below triggers a FULL recomputation of expensive_df:
expensive_df.write.parquet("s3://output/")                # compute #1
expensive_df.filter(col("total") > 1e6).show()            # compute #2
anomalies = expensive_df.filter(col("total") > avg_total) # compute #3

# ── SOLUTION: Cache before first use ──────────────────────────────────────────
expensive_df = raw.join(reference, "id") \
                  .groupBy("region") \
                  .agg(sum("amount").alias("total"))

# Cache (materialize into executor memory/disk)
expensive_df.cache()
expensive_df.count()   # TRIGGER the cache — forces computation NOW, stores result

# Now all 3 uses hit the cache, not recompute:
expensive_df.write.parquet("s3://output/")                # reads from cache
expensive_df.filter(col("total") > 1e6).show()            # reads from cache
anomalies = expensive_df.filter(col("total") > 1000000)   # reads from cache

# Release cache when done
expensive_df.unpersist()

# ── CHOOSE STORAGE LEVEL ──────────────────────────────────────────────────────
from pyspark.storagelevel import StorageLevel

# If expensive_df fits in memory:
expensive_df.persist(StorageLevel.MEMORY_ONLY)        # fastest reads

# If it's large or memory is limited:
expensive_df.persist(StorageLevel.MEMORY_AND_DISK)    # spills to disk if needed

# If you want to be safe on a shared cluster:
expensive_df.persist(StorageLevel.DISK_ONLY)          # always on disk (slowest)

# ── ALTERNATIVE: CHECKPOINT (if lineage is long) ──────────────────────────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")

expensive_df.cache()
expensive_df.count()       # trigger cache
expensive_df.checkpoint()  # write to S3, cut lineage (extra safety)
expensive_df.unpersist()   # release cache (checkpoint is the source now)

# Now: expensive_df reads from S3 checkpoint on reuse
# Advantage: survives executor failures (no recompute from original data)

# ── WRITE ONCE, READ MULTIPLE TIMES PATTERN (most reliable) ────────────────────
# For production: materialize to Parquet, read it back multiple times
expensive_df.write.mode("overwrite").parquet("s3://tmp/expensive-result/")
materialized = spark.read.parquet("s3://tmp/expensive-result/")

materialized.write.parquet("s3://output/")
materialized.filter(col("total") > 1e6).show()
anomalies = materialized.filter(col("total") > 1000000)
# Trade-off: extra write cost → but most resilient, works across sessions
39 Read only specific file types from a mixed directory
Problem: A directory contains .csv, .parquet, .json and .tmp files mixed.
Read ONLY the .csv files recursively without reading the others.
python — editable
# ── METHOD 1: pathGlobFilter (Spark 3.0+) ─────────────────────────────────────
df = spark.read \
    .option("pathGlobFilter", "*.csv") \          # only .csv files
    .option("recursiveFileLookup", "true") \       # recursively
    .option("header", "true") \
    .csv("s3://bucket/mixed-directory/")

# Multiple extensions (Hadoop glob syntax):
df = spark.read \
    .option("pathGlobFilter", "*.{csv,tsv}") \    # csv OR tsv
    .option("recursiveFileLookup", "true") \
    .csv("s3://bucket/mixed-directory/")

# ── METHOD 2: Glob pattern in path ────────────────────────────────────────────
# All csv files any depth:
df = spark.read.csv("s3://bucket/mixed-directory/**/*.csv",
                    header=True)

# All csvs matching date pattern:
df = spark.read.csv("s3://bucket/data/2024-01-*.csv", header=True)

# ── METHOD 3: Manually list + filter using boto3 (S3) ────────────────────────
import boto3

s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "mixed-directory/"

response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
csv_paths = [
    f"s3://{bucket}/{obj['Key']}"
    for obj in response.get("Contents", [])
    if obj["Key"].endswith(".csv") and not obj["Key"].endswith(".tmp")
]

print(f"Found {len(csv_paths)} CSV files")
df = spark.read.option("header", True).csv(*csv_paths)

# ── METHOD 4: Exclude patterns using Python filter ────────────────────────────
import glob, os

all_files = glob.glob("/data/**/*", recursive=True)
# Keep only .parquet files, exclude hidden files and .tmp
parquet_files = [
    f for f in all_files
    if f.endswith(".parquet")
    and not os.path.basename(f).startswith("_")   # exclude _SUCCESS, _metadata
    and not os.path.basename(f).startswith(".")    # exclude hidden files
]

df = spark.read.parquet(*parquet_files)

# ── VERIFY WHAT WAS READ ──────────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name
df_with_source = df.withColumn("src", input_file_name())
df_with_source.select("src").distinct().show(20, truncate=False)
40 Handle late-arriving data in daily ETL
Problem: Your daily ETL runs at midnight. Occasionally, events from
the previous day arrive 2-3 hours late (after midnight).
These late records are missed by the daily partition load.
How do you handle this in a batch ETL pipeline?
python — editable
# ── STRATEGY 1: Reprocess last N days (simple, reliable) ──────────────────────
# Instead of processing only "today", always reprocess last 3 days
# Late data from yesterday will be picked up in today's run of "yesterday"

from datetime import datetime, timedelta

def run_etl_with_late_data(spark, run_date, lookback_days=3):
    """Process last N days to catch late arrivals."""
    dates = [
        (run_date - timedelta(days=i)).strftime("%Y-%m-%d")
        for i in range(lookback_days)
    ]

    df = spark.read.parquet("s3://bucket/raw/") \
        .filter(col("event_date").isin(dates))   # read last 3 days

    # Write with overwrite per partition (idempotent)
    df.write \
        .partitionBy("event_date") \
        .mode("overwrite") \
        .parquet("s3://bucket/processed/")
    # Overwrites Sat+Sun+Mon partitions — Mon run fixes any late Sat/Sun data

# ── STRATEGY 2: Delta Lake MERGE (upsert late records) ────────────────────────
from delta.tables import DeltaTable

def upsert_late_records(spark, new_records_df):
    """Upsert new/late records into existing Delta table."""
    delta_table = DeltaTable.forPath(spark, "s3://bucket/delta/events/")

    delta_table.alias("target").merge(
        new_records_df.alias("source"),
        "target.event_id = source.event_id"   # match on unique event ID
    ).whenMatchedUpdateAll() \                 # update if already exists (idempotent)
     .whenNotMatchedInsertAll() \              # insert if new
     .execute()

# ── STRATEGY 3: Separate late arrival table ────────────────────────────────────
# Detect late records at write time
from pyspark.sql.functions import current_timestamp, datediff, to_date, col

df = spark.read.parquet("s3://bucket/raw/today/") \
    .withColumn("processing_ts", current_timestamp()) \
    .withColumn("days_late",
        datediff(col("processing_ts"), col("event_date"))
    )

# Separate on-time vs late
on_time = df.filter(col("days_late") == 0)
late     = df.filter(col("days_late") > 0)

# Write on-time to regular partition
on_time.write.partitionBy("event_date").mode("append") \
       .parquet("s3://bucket/events/")

# Write late records to separate location for auditing + reprocessing
late.write.mode("append") \
    .parquet(f"s3://bucket/late-arrivals/{datetime.today().strftime('%Y-%m-%d')}/")

# ── STRATEGY 4: Watermark filter — ignore very old late data ──────────────────
MAX_LATE_DAYS = 7   # SLA: data older than 7 days is rejected

df_filtered = df.filter(
    col("event_date") >= (current_timestamp().cast("date") - MAX_LATE_DAYS)
)

# Log how much data was rejected
total       = df.count()
accepted    = df_filtered.count()
rejected    = total - accepted
print(f"Rejected {rejected} records older than {MAX_LATE_DAYS} days")

SECTION 6: AZURE CLOUD STORAGE + REAL-WORLD READING

💡 Interview Tip
Interviewers ask this to verify you've actually used PySpark in production on Azure. They don't expect you to memorize SAS tokens — they want to see you know the concepts.
41 Read from Azure Blob Storage (old storage / wasbs://)
Problem: You have data in Azure Blob Storage (classic, NOT Data Lake).
Container: "raw-data", Storage Account: "mystorageacct"
File: parquet/orders/2024/*.parquet
How do you read it in PySpark?
Company: Any company using Azure
python — editable
# Azure Blob Storage uses the wasbs:// (secure) or wasb:// protocol
# Format: wasbs://<container>@<storage-account>.blob.core.windows.net/<path>

# ── METHOD 1: Account Key (simplest, not for production) ─────────────────────
storage_account = "mystorageacct"
account_key     = "your_account_key_here"   # from Azure Portal → Access Keys

spark.conf.set(
    f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
    account_key
)

df = spark.read.parquet(
    "wasbs://raw-data@mystorageacct.blob.core.windows.net/parquet/orders/2024/"
)
df.show()

# ── METHOD 2: SAS Token (scoped access, time-limited) ────────────────────────
sas_token = "sv=2023-01-01&ss=b&srt=sco&sp=rl&..."   # from Azure Portal

spark.conf.set(
    f"fs.azure.sas.raw-data.{storage_account}.blob.core.windows.net",
    sas_token
)

df = spark.read.csv(
    "wasbs://raw-data@mystorageacct.blob.core.windows.net/csv/customers/",
    header=True,
    inferSchema=True
)

# ── METHOD 3: Service Principal / App Registration (production standard) ──────
# Used in Databricks / ADF / Synapse pipelines
tenant_id     = "your-tenant-id"
client_id     = "your-app-client-id"
client_secret = "your-app-client-secret"

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.blob.core.windows.net",
    "OAuth"
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account}.blob.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account}.blob.core.windows.net",
    client_id
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account}.blob.core.windows.net",
    client_secret
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account}.blob.core.windows.net",
    f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
)

df = spark.read.parquet(
    "wasbs://raw-data@mystorageacct.blob.core.windows.net/parquet/orders/"
)
🧠 Memory Map
WASBS URL FORMAT
wasbs://<container-name>@<storage-account>.blob.core.windows.net/<path>
wasb:// = non-SSL (avoid in production)
wasbs:// = SSL encrypted (always use this)
AUTH OPTIONS (worst → best for production)
Account Keyfull access, leaked key = disaster
SAS Tokenscoped + time-limited, better
Service PrincipalApp Registration in AAD, production standard
Managed Identityno secrets at all (Databricks/Synapse only)
42 Read from Azure Data Lake Storage Gen2 (ADLS Gen2 / abfss://)
Problem: Your company moved from Blob Storage to ADLS Gen2.
Storage Account: "datalakeprod", Container (filesystem): "silver"
Path: /processed/transactions/year=2024/
How do you read it? What changed from Blob Storage?
Company: Any company on Azure with modern data lake setup
python — editable
# ADLS Gen2 uses abfss:// (hierarchical namespace enabled on storage account)
# Format: abfss://<filesystem>@<storage-account>.dfs.core.windows.net/<path>
#
# KEY DIFFERENCE from Blob Storage:
#   Blob:  wasbs://<container>@<account>.blob.core.windows.net/
#   ADLS2: abfss://<filesystem>@<account>.dfs.core.windows.net/
#   ↑ "dfs" endpoint vs "blob" endpoint
#   ↑ abfss:// vs wasbs://

storage_account = "datalakeprod"
filesystem      = "silver"          # like a container in Blob, but hierarchical

# ── METHOD 1: Account Key ─────────────────────────────────────────────────────
spark.conf.set(
    f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
    "your-account-key"
)

df = spark.read.parquet(
    f"abfss://{filesystem}@{storage_account}.dfs.core.windows.net/processed/transactions/year=2024/"
)
df.show()

# ── METHOD 2: Service Principal (production) ──────────────────────────────────
tenant_id     = "your-tenant-id"
client_id     = "your-service-principal-client-id"
client_secret = "your-service-principal-secret"

spark.conf.set(
    f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
    "OAuth"
)
spark.conf.set(
    f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net",
    client_id
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net",
    client_secret
)
spark.conf.set(
    f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
    f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
)

# Read Parquet from ADLS Gen2
orders = spark.read.parquet(
    "abfss://silver@datalakeprod.dfs.core.windows.net/processed/transactions/"
)

# Read Delta table from ADLS Gen2
delta_df = spark.read.format("delta").load(
    "abfss://gold@datalakeprod.dfs.core.windows.net/curated/orders_final/"
)

# ── METHOD 3: Managed Identity (Databricks / Synapse — NO SECRETS) ─────────────
# In Azure Databricks with cluster-level Managed Identity configured:
# No spark.conf needed at all! Azure handles auth transparently.

df = spark.read.parquet(
    "abfss://silver@datalakeprod.dfs.core.windows.net/processed/transactions/"
)
# Works automatically when Managed Identity is assigned to Databricks workspace

# ── READ MULTIPLE FORMATS FROM ADLS GEN2 ─────────────────────────────────────
base = "abfss://silver@datalakeprod.dfs.core.windows.net"

# CSV
customers = spark.read.option("header", True).csv(f"{base}/raw/customers/")

# JSON
events = spark.read.option("multiLine", True).json(f"{base}/raw/events/")

# Parquet (partitioned)
transactions = spark.read.parquet(f"{base}/processed/transactions/year=2024/month=*/")

# Delta
gold_layer = spark.read.format("delta").load(f"{base}/gold/fact_orders/")

# ORC
hive_data = spark.read.orc(f"{base}/hive-warehouse/sales/")
FeatureBlob StorageADLS Gen2
Protocolwasbs://abfss://
Endpoint.blob.core.windows.dfs.core.windows.net
Hierarchical namespaceNO (flat)YES (folder structure)
ACLs (fine-grained)NOYES (POSIX-style ACLs)
Big data performanceLowerHigher (optimized I/O)
Use forGeneral blob storeData Lake / Delta Lake
43 Databricks: Mount ADLS Gen2 and read without long paths
Problem: Your team keeps repeating the full abfss:// path everywhere.
How do you mount Azure storage in Databricks so notebooks can
use simple paths like /mnt/silver/processed/ instead?
Company: Any Databricks on Azure shop
python — editable
# ── STEP 1: MOUNT THE STORAGE (run once, persists across cluster restarts) ────
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type":
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id":     dbutils.secrets.get("kv-scope", "sp-client-id"),
    "fs.azure.account.oauth2.client.secret": dbutils.secrets.get("kv-scope", "sp-secret"),
    "fs.azure.account.oauth2.client.endpoint":
        f"https://login.microsoftonline.com/{dbutils.secrets.get('kv-scope','tenant-id')}/oauth2/token"
}

# Mount the "silver" filesystem to /mnt/silver
dbutils.fs.mount(
    source      = "abfss://silver@datalakeprod.dfs.core.windows.net/",
    mount_point = "/mnt/silver",
    extra_configs = configs
)

# Mount the "gold" filesystem to /mnt/gold
dbutils.fs.mount(
    source      = "abfss://gold@datalakeprod.dfs.core.windows.net/",
    mount_point = "/mnt/gold",
    extra_configs = configs
)

# ── STEP 2: READ USING SIMPLE /mnt/ PATHS (in any notebook) ──────────────────
# Instead of: abfss://silver@datalakeprod.dfs.core.windows.net/processed/orders/
# Just use:
df = spark.read.parquet("/mnt/silver/processed/orders/")
customers = spark.read.option("header", True).csv("/mnt/silver/raw/customers/")
gold_df   = spark.read.format("delta").load("/mnt/gold/curated/fact_sales/")

# ── STEP 3: LIST AND MANAGE MOUNTS ───────────────────────────────────────────
# List all current mounts
display(dbutils.fs.mounts())

# Check if mount exists before mounting (avoid error on duplicate)
mounted = any(m.mountPoint == "/mnt/silver" for m in dbutils.fs.mounts())
if not mounted:
    dbutils.fs.mount(source="abfss://silver@...", mount_point="/mnt/silver",
                     extra_configs=configs)

# List files in mounted path
dbutils.fs.ls("/mnt/silver/processed/orders/")

# Unmount when needed
dbutils.fs.unmount("/mnt/silver")

# ── USE SECRETS (NEVER HARDCODE CREDENTIALS) ──────────────────────────────────
# In Databricks, always use dbutils.secrets — never paste keys in notebooks
# Create secret scope linked to Azure Key Vault:
# dbutils.secrets.get(scope="my-kv-scope", key="storage-account-key")

# ── ALTERNATIVE: Unity Catalog External Location (modern Databricks) ──────────
# Instead of mounts, Unity Catalog manages storage access centrally:
# CREATE EXTERNAL LOCATION silver_lake
#   URL 'abfss://silver@datalakeprod.dfs.core.windows.net/'
#   WITH (STORAGE CREDENTIAL my_credential);
#
# Then read directly:
df = spark.read.parquet("abfss://silver@datalakeprod.dfs.core.windows.net/data/")
# Access controlled by Unity Catalog permissions, not per-notebook config
44 Read from Azure Synapse Analytics (SQL Pool) via PySpark
Problem: You need to join Spark data with a large table sitting in
Azure Synapse dedicated SQL pool. How do you read it into
a Spark DataFrame efficiently?
python — editable
1 partition = 1 thread = very slow for large tables
# Fix: Add parallel read config:
df = spark.read.format("jdbc") \
    .option("url", synapse_url) \
    .option("dbtable", "dbo.FactSales") \
    .option("user", "sqladminuser") \
    .option("password", dbutils.secrets.get("scope", "synapse-password")) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("numPartitions", "20") \         # parallel reads
    .option("partitionColumn", "SaleID") \   # split by this column
    .option("lowerBound", "1") \
    .option("upperBound", "10000000") \      # estimated max SaleID
    .load()

# ── METHOD 2: Synapse Connector for Databricks (PolyBase — fast!) ──────────────
# Azure Synapse Analytics connector uses PolyBase/COPY INTO via ADLS staging
# Much faster than JDBC for large tables (parallel bulk export)

df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", synapse_url) \
    .option("tempDir", "abfss://temp@datalakeprod.dfs.core.windows.net/staging/") \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbTable", "dbo.FactSales") \
    .load()

# ── METHOD 3: Push query down (read only what you need) ───────────────────────
query = """
    (SELECT SaleID, CustomerID, Amount, SaleDate
     FROM dbo.FactSales
     WHERE SaleDate >= '2024-01-01'
     AND Region = 'APAC') AS filtered_sales
"""

df = spark.read.format("jdbc") \
    .option("url", synapse_url) \
    .option("dbtable", query) \   # push filter to Synapse, not pull all data
    .option("user", "sqladminuser") \
    .option("password", dbutils.secrets.get("scope", "synapse-password")) \
    .load()"># ── METHOD 1: JDBC (simple but slow — single thread, no parallelism) ──────────
synapse_url = (
    "jdbc:sqlserver://myworkspace.sql.azuresynapse.net:1433;"
    "database=mydb;encrypt=true;trustServerCertificate=false;"
    "hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30"
)

df = spark.read.format("jdbc") \
    .option("url", synapse_url) \
    .option("dbtable", "dbo.FactSales") \
    .option("user", "sqladminuser") \
    .option("password", dbutils.secrets.get("scope", "synapse-password")) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

# ⚠️ TRAP: Default JDBC = 1 partition = 1 thread = very slow for large tables
# Fix: Add parallel read config:
df = spark.read.format("jdbc") \
    .option("url", synapse_url) \
    .option("dbtable", "dbo.FactSales") \
    .option("user", "sqladminuser") \
    .option("password", dbutils.secrets.get("scope", "synapse-password")) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .option("numPartitions", "20") \         # parallel reads
    .option("partitionColumn", "SaleID") \   # split by this column
    .option("lowerBound", "1") \
    .option("upperBound", "10000000") \      # estimated max SaleID
    .load()

# ── METHOD 2: Synapse Connector for Databricks (PolyBase — fast!) ──────────────
# Azure Synapse Analytics connector uses PolyBase/COPY INTO via ADLS staging
# Much faster than JDBC for large tables (parallel bulk export)

df = spark.read \
    .format("com.databricks.spark.sqldw") \
    .option("url", synapse_url) \
    .option("tempDir", "abfss://temp@datalakeprod.dfs.core.windows.net/staging/") \
    .option("forwardSparkAzureStorageCredentials", "true") \
    .option("dbTable", "dbo.FactSales") \
    .load()

# ── METHOD 3: Push query down (read only what you need) ───────────────────────
query = """
    (SELECT SaleID, CustomerID, Amount, SaleDate
     FROM dbo.FactSales
     WHERE SaleDate >= '2024-01-01'
     AND Region = 'APAC') AS filtered_sales
"""

df = spark.read.format("jdbc") \
    .option("url", synapse_url) \
    .option("dbtable", query) \   # push filter to Synapse, not pull all data
    .option("user", "sqladminuser") \
    .option("password", dbutils.secrets.get("scope", "synapse-password")) \
    .load()
45 Read from Azure Event Hubs / Kafka into PySpark (Structured Streaming)
Problem: You need to read real-time events from Azure Event Hubs
into a PySpark DataFrame. How do you set this up?
(They ask this to check if you know Event Hubs ≈ Kafka protocol)
python — editable
# Azure Event Hubs is Kafka-compatible → use Spark Kafka connector
# Event Hubs Kafka endpoint: <namespace>.servicebus.windows.net:9093

# ── CONNECTION STRING from Azure Portal → Event Hubs → Shared Access Policies
connection_string = dbutils.secrets.get("scope", "eventhub-connection-string")
# Format: "Endpoint=sb://ns.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=..."

# Build Kafka SASL config from Event Hubs connection string
SASL_CONFIG = (
    f'org.apache.kafka.common.security.plain.PlainLoginModule required '
    f'username="$ConnectionString" '
    f'password="{connection_string}";'
)

# ── READ AS STREAMING DATAFRAME ───────────────────────────────────────────────
stream_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            "mynamespace.servicebus.windows.net:9093") \
    .option("subscribe", "my-event-hub-name") \          # topic = event hub name
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", SASL_CONFIG) \
    .option("startingOffsets", "latest") \               # or "earliest"
    .load()

# ── PARSE THE MESSAGE (value is binary → deserialize) ────────────────────────
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

event_schema = StructType([
    StructField("event_id",   StringType(),    True),
    StructField("user_id",    StringType(),    True),
    StructField("amount",     DoubleType(),    True),
    StructField("event_time", TimestampType(), True)
])

parsed_df = stream_df \
    .select(
        col("key").cast("string").alias("partition_key"),
        from_json(col("value").cast("string"), event_schema).alias("data"),
        col("timestamp").alias("ingest_time")
    ) \
    .select("partition_key", "data.*", "ingest_time")

# ── WRITE STREAM TO ADLS GEN2 / DELTA ────────────────────────────────────────
query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation",
            "abfss://checkpoints@datalake.dfs.core.windows.net/eventhub-stream/") \
    .start("abfss://silver@datalakeprod.dfs.core.windows.net/streaming/events/")

query.awaitTermination()

# ── READ AS BATCH (for one-time historical read) ──────────────────────────────
batch_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers",
            "mynamespace.servicebus.windows.net:9093") \
    .option("subscribe", "my-event-hub-name") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.sasl.jaas.config", SASL_CONFIG) \
    .option("startingOffsets", "earliest") \     # read ALL historical events
    .option("endingOffsets", "latest") \
    .load()
46 Full Azure Data Lake Architecture + PySpark read pattern (Medallion)
Problem: Describe how data flows in a typical Azure data lake setup
and how PySpark reads from each layer.
(High-level design question — tests real-world Azure experience)
📐 Architecture Diagram
MEDALLION ARCHITECTURE ON AZURE:

  External Sources
  (Blob, RDBMS, APIs, Event Hubs)
         ↓  [ADF / Databricks ingest]
  ┌─────────────────────────────────────┐
  │  BRONZE Layer (raw, immutable)      │  ← abfss://bronze@datalake.dfs...
  │  Format: raw CSV/JSON/Parquet       │  ← partition by ingest_date
  │  No transformation, as-is           │
  └─────────────────────────────────────┘
         ↓  [Databricks / Synapse Spark]
  ┌─────────────────────────────────────┐
  │  SILVER Layer (cleaned, conformed)  │  ← abfss://silver@datalake.dfs...
  │  Format: Delta Lake                 │  ← partition by event_date
  │  Deduped, nulls handled, typed      │
  └─────────────────────────────────────┘
         ↓  [Databricks / Synapse Spark]
  ┌─────────────────────────────────────┐
  │  GOLD Layer (business aggregates)   │  ← abfss://gold@datalake.dfs...
  │  Format: Delta Lake                 │  ← partition by region/product
  │  Fact + Dimension tables, KPIs      │
  └─────────────────────────────────────┘
         ↓  [Synapse SQL Pool / Power BI]
  Dashboards / Reports / ML Models
python — editable
# ── READ PATTERN FOR EACH LAYER ───────────────────────────────────────────────

# Setup (done once per session in production via cluster config or Managed Identity)
spark.conf.set(
    "fs.azure.account.auth.type.datalakeprod.dfs.core.windows.net", "OAuth"
)
spark.conf.set(
    "fs.azure.account.oauth.provider.type.datalakeprod.dfs.core.windows.net",
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
    "fs.azure.account.oauth2.client.id.datalakeprod.dfs.core.windows.net",
    dbutils.secrets.get("kv-scope", "sp-client-id")
)
spark.conf.set(
    "fs.azure.account.oauth2.client.secret.datalakeprod.dfs.core.windows.net",
    dbutils.secrets.get("kv-scope", "sp-secret")
)
spark.conf.set(
    "fs.azure.account.oauth2.client.endpoint.datalakeprod.dfs.core.windows.net",
    "https://login.microsoftonline.com/<tenant-id>/oauth2/token"
)

BASE = "abfss://{layer}@datalakeprod.dfs.core.windows.net"

# ── BRONZE: read raw files ─────────────────────────────────────────────────────
bronze_csv = spark.read \
    .option("header", True) \
    .option("mode", "PERMISSIVE") \
    .option("columnNameOfCorruptRecord", "_corrupt") \
    .csv(f"{BASE.format(layer='bronze')}/raw/orders/ingest_date=2024-01-15/")

# ── SILVER: read cleaned Delta ─────────────────────────────────────────────────
silver_orders = spark.read \
    .format("delta") \
    .load(f"{BASE.format(layer='silver')}/orders/") \
    .filter(col("event_date") >= "2024-01-01")  # partition pruning

# Time travel: audit/debugging
silver_yesterday = spark.read.format("delta") \
    .option("versionAsOf", 10) \
    .load(f"{BASE.format(layer='silver')}/orders/")

# ── GOLD: read aggregated Delta for reporting ──────────────────────────────────
gold_kpis = spark.read \
    .format("delta") \
    .load(f"{BASE.format(layer='gold')}/fact_daily_revenue/") \
    .filter(col("region") == "APAC")

# ── WRITE BACK TO SILVER (after transformation) ────────────────────────────────
from delta.tables import DeltaTable

DeltaTable.forPath(
    spark,
    f"{BASE.format(layer='silver')}/orders/"
).alias("target").merge(
    bronze_csv.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()
Storage TypeProtocolEndpoint suffix
Azure Blob (classic)wasbs://.blob.core.windows.net
ADLS Gen1adl://.azuredatalakestore.net
ADLS Gen2 (modern)abfss://.dfs.core.windows.net
Azure Files(SMB/NFS).file.core.windows.net
47 Add to tracking table

Update the master tracking table at the top of this file:

| Q41 | Read from Azure Blob Storage (wasbs://) | Azure | S | wasbs/account-key/SAS/SP | ✓ | | Q42 | Read from ADLS Gen2 (abfss://) | Azure | S | abfss/OAuth/SP/MI | ✓ | | Q43 | Mount ADLS Gen2 in Databricks | Databricks | S | dbutils.fs.mount/secrets | ✓ | | Q44 | Read from Azure Synapse SQL Pool | Azure | S | JDBC/Synapse connector | ✓ | | Q45 | Read from Azure Event Hubs (Kafka) | Azure | S | Kafka/Structured Streaming | ✓ | | Q46 | Medallion architecture + full read pattern | Databricks | S | Bronze/Silver/Gold layers | ✓ |

HOW TO ADD YOUR OWN QUESTIONS

1. Add a row to the master tracking table (increment Q#, never reuse)
2. Add solved question in the relevant section
3. Tag with: company (if known), level B/M/H/S, topic
4. Share new questionsI'll classify and add full solutions
QUESTIONS TO CLASSIFY (add below, I'll solve + categorize)
→ (paste your questions here)