PySpark Interview — Question Bank
Structured coding questions with full PySpark solutions Pattern: B=Basic | M=Medium | H=Hard | S=Scenario Add new questions at the bottom — never renumber existing ones
MASTER TRACKING TABLE
| Q# | Question | Company | Level | Topic | Solved |
|---|---|---|---|---|---|
| Q01 | Filter employees earning > 50k | General | B | filter/select | ✓ |
| Q02 | Count orders per customer | General | B | groupBy/agg | ✓ |
| Q03 | Find duplicate rows by email | General | B | dropDuplicates/groupBy | ✓ |
| Q04 | Total sales by date | General | B | groupBy/sum | ✓ |
| Q05 | Add a new derived column | General | B | withColumn | ✓ |
| Q06 | Read CSV + handle nulls | General | B | na.fill/isNull | ✓ |
| Q07 | Word count in text column | General | B | flatMap/reduceByKey | ✓ |
| Q08 | Find max salary per department | Amazon/Google | M | groupBy/max | ✓ |
| Q09 | Rank employees by salary per dept | Amazon | M | window/dense_rank | ✓ |
| Q10 | Top 3 salaries per department | Amazon | M | window/dense_rank | ✓ |
| Q11 | Second highest salary | Amazon | M | window/dense_rank | ✓ |
| Q12 | Running total of revenue | General | M | window/sum | ✓ |
| Q13 | 7-day rolling average | General | M | window/avg | ✓ |
| Q14 | Employees earning more than their manager | M | self-join | ✓ | |
| Q15 | Customers with no orders (NOT IN) | General | M | left_anti join | ✓ |
| Q16 | Deduplicate — keep latest record | General | M | window/row_number | ✓ |
| Q17 | MoM revenue change (%) | General | M | window/lag | ✓ |
| Q18 | Pivot: rows to columns | General | M | pivot | ✓ |
| Q19 | Explode array column + count tags | General | M | explode/groupBy | ✓ |
| Q20 | Find consecutive purchase days | Amazon | H | window/lag+filter | ✓ |
| Q21 | Session ID assignment (30-min gap) | General | H | window/lag+sum | ✓ |
| Q22 | Temperature rise from previous day | Amazon/Google | H | window/lag | ✓ |
| Q23 | Longest streak of active days | General | H | gaps & islands | ✓ |
| Q24 | Products bought together (market basket) | Amazon | H | self-join | ✓ |
| Q25 | Funnel conversion rates | Meta/TikTok | H | pivot/conditional agg | ✓ |
| Q26 | Deduplicate with complex multi-key logic | General | H | window/row_number | ✓ |
| Q27 | Handle data skew with salting | Uber/Google | S | repartition/salting | ✓ |
| Q28 | Optimize slow join with broadcast | General | S | broadcast hint | ✓ |
| Q29 | Read multiple sources + track origin | General | S | union/input_file_name | ✓ |
| Q30 | Flatten nested JSON to flat table | General | S | struct/explode | ✓ |
| Q31 | Process 200 GB data with 16 GB executor memory | FAANG | S | partitioning/streaming | ✓ |
| Q32 | Read bad/corrupt data — 3 modes | General | S | badRecordsPath/modes | ✓ |
| Q33 | Recursively read all files in directory tree | General | S | recursiveFileLookup | ✓ |
| Q34 | OOM during groupBy on 500M rows | General | S | reduceByKey/AQE | ✓ |
| Q35 | Job succeeds on 10 GB, fails on 500 GB on Monday | General | S | dynamic alloc/AQE | ✓ |
| Q36 | Schema evolution — new column added to source | General | S | mergeSchema/unionByName | ✓ |
| Q37 | Estimate partitions needed for a job | General | S | partition math | ✓ |
| Q38 | Avoid recomputing expensive DataFrame 3 times | General | S | cache/persist strategy | ✓ |
| Q39 | Read only specific file types from mixed directory | General | S | pathGlobFilter | ✓ |
| Q40 | Handle late-arriving data in daily ETL | General | S | watermark/filter/upsert | ✓ |
SECTION 1: BASIC QUESTIONS (B)
Q01 — Filter employees earning > 50k
from pyspark.sql.functions import col
employees = spark.read.parquet("employees/")
result = employees \
.filter(col("salary") > 50000) \
.select("emp_id", "name", "dept", "salary") \
.orderBy(col("salary").desc())
result.show()
Q02 — Count orders per customer
from pyspark.sql.functions import count, sum, col
result = orders.groupBy("customer_id") \
.agg(
count("order_id").alias("total_orders"),
sum("amount").alias("total_spent")
) \
.orderBy(col("total_spent").desc())
result.show()
Q03 — Find duplicate rows by email
from pyspark.sql.functions import count, col
# Method 1: groupBy + filter
duplicates = users.groupBy("email") \
.agg(count("*").alias("cnt")) \
.filter(col("cnt") > 1)
# Method 2: join back to get full rows
result = users.join(duplicates, "email", "inner") \
.select("user_id", "email", "created_at", "cnt") \
.orderBy("email")
result.show()
Q04 — Total sales by date
from pyspark.sql.functions import sum, col, to_date
result = transactions \
.withColumn("date", to_date(col("date"))) \
.groupBy("date") \
.agg(sum("amount").alias("daily_sales")) \
.orderBy("date")
result.show()
Q05 — Add derived column (categorize salary)
from pyspark.sql.functions import col, when
result = employees.withColumn(
"salary_band",
when(col("salary") > 100000, "High")
.when(col("salary") >= 50000, "Mid")
.otherwise("Low")
)
result.show()
Q06 — Read CSV + handle nulls
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
schema = StructType([
StructField("emp_id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
StructField("dept", StringType(), nullable=True),
StructField("salary", DoubleType(), nullable=True)
])
df = spark.read \
.schema(schema) \
.option("header", True) \
.csv("employees.csv")
result = df \
.na.drop(subset=["emp_id"]) \ # drop rows where emp_id is null
.na.fill({"salary": 0.0, "dept": "Unknown"}) # fill remaining nulls
result.show()
Q07 — Word count (RDD style)
# RDD approach (classic interview question)
rdd = sc.textFile("data/text_file.txt")
word_counts = rdd \
.flatMap(lambda line: line.lower().split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.sortBy(lambda x: x[1], ascending=False)
word_counts.take(10)
# DataFrame approach (prefer in practice)
from pyspark.sql.functions import explode, split, lower, col
df = spark.read.text("data/text_file.txt")
result = df \
.withColumn("word", explode(split(lower(col("value")), "\\s+"))) \
.groupBy("word") \
.count() \
.orderBy(col("count").desc()) \
.limit(10)
result.show()
SECTION 2: MEDIUM QUESTIONS (M)
Q08 — Max salary per department
from pyspark.sql.functions import max, col
result = employees.groupBy("dept") \
.agg(max("salary").alias("max_salary")) \
.orderBy(col("max_salary").desc())
result.show()
Q09 — Rank employees by salary per department
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
w = Window.partitionBy("dept").orderBy(col("salary").desc())
result = employees.withColumn("rank", dense_rank().over(w)) \
.select("emp_id", "name", "dept", "salary", "rank") \
.orderBy("dept", "rank")
result.show()
Q10 — Top 3 salaries per department
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
w = Window.partitionBy("dept").orderBy(col("salary").desc())
result = employees \
.withColumn("rnk", dense_rank().over(w)) \
.filter(col("rnk") <= 3) \
.select("dept", "name", "salary", "rnk") \
.orderBy("dept", "rnk")
result.show()
# KEY INSIGHT: dense_rank → tied salaries get same rank, all appear
# If you used rank() → gaps: ranks 1,1,3 (skips 2)
# If you used row_number() → only 3 rows max, misses ties
Q11 — Second highest salary
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, col
w = Window.orderBy(col("salary").desc())
result = employees \
.withColumn("rnk", dense_rank().over(w)) \
.filter(col("rnk") == 2) \
.select("salary") \
.distinct()
# Handle case where no second salary exists
if result.count() == 0:
result = spark.createDataFrame([(None,)], ["salary"])
result.show()
Q12 — Running total of revenue
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, col
w = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
result = revenue \
.withColumn("running_total", sum("amount").over(w)) \
.select("date", "amount", "running_total") \
.orderBy("date")
result.show()
Q13 — 7-day rolling average
from pyspark.sql.window import Window
from pyspark.sql.functions import avg, col, round
w = Window.orderBy("date").rowsBetween(-6, Window.currentRow) # 6 prior + current
result = daily_sales \
.withColumn("rolling_7d_avg", round(avg("sales").over(w), 2)) \
.select("date", "sales", "rolling_7d_avg") \
.orderBy("date")
result.show()
Q14 — Employees earning more than their manager
from pyspark.sql.functions import col
emp = employees.alias("e")
mgr = employees.alias("m")
result = emp.join(mgr, col("e.manager_id") == col("m.emp_id"), "inner") \
.filter(col("e.salary") > col("m.salary")) \
.select(
col("e.emp_id"),
col("e.name").alias("employee"),
col("e.salary").alias("emp_salary"),
col("m.name").alias("manager"),
col("m.salary").alias("mgr_salary")
)
result.show()
Q15 — Customers with no orders
# Method 1: left_anti join (cleanest, most efficient)
result = customers.join(orders, "customer_id", "left_anti") \
.select("customer_id", "name")
# Method 2: left join + filter null
result = customers.join(orders, "customer_id", "left") \
.filter(col("order_id").isNull()) \
.select(customers.customer_id, customers.name)
result.show()
# ⚡ Prefer left_anti — cleaner, Catalyst handles it well
Q16 — Deduplicate — keep latest record
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
w = Window.partitionBy("user_id", "event_type").orderBy(col("created_at").desc())
result = events \
.withColumn("rn", row_number().over(w)) \
.filter(col("rn") == 1) \
.drop("rn")
result.show()
# ⚡ Use row_number (not rank/dense_rank) — guarantees exactly 1 row per group
Q17 — Month-over-Month revenue change
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, round
w = Window.orderBy("year_month")
result = monthly_revenue \
.withColumn("prev_revenue", lag("revenue", 1).over(w)) \
.withColumn(
"mom_pct_change",
round(
(col("revenue") - col("prev_revenue")) / col("prev_revenue") * 100,
2
)
) \
.select("year_month", "revenue", "prev_revenue", "mom_pct_change") \
.orderBy("year_month")
result.show()
# ⚠️ First row: prev_revenue = null → mom_pct_change = null (expected)
# ⚠️ If prev_revenue can be 0: wrap denominator in nullif logic manually
Q18 — Pivot: rows to columns (unpivot long → wide)
from pyspark.sql.functions import sum
result = sales.groupBy("product").pivot("month").agg(sum("revenue"))
result.show()
# Output:
# product | Jan | Feb | Mar
# --------|------|------|-----
# iPhone | 1000 | 1200 | 900
# Optimization: provide list of pivot values to avoid extra scan
months = ["Jan", "Feb", "Mar", "Apr", "May"]
result = sales.groupBy("product").pivot("month", months).agg(sum("revenue"))
Q19 — Explode array column + count tags
from pyspark.sql.functions import explode, col, count
result = posts \
.withColumn("tag", explode(col("tags"))) \ # one row per tag
.groupBy("tag") \
.agg(count("*").alias("usage_count")) \
.orderBy(col("usage_count").desc()) \
.limit(5)
result.show()
SECTION 3: HARD QUESTIONS (H)
Q20 — Find consecutive purchase days
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, datediff, count
from pyspark.sql.functions import row_number
# Step 1: Deduplicate (1 row per customer per date)
deduped = orders.select("customer_id", "order_date").distinct()
# Step 2: Assign row number per customer ordered by date
w = Window.partitionBy("customer_id").orderBy("order_date")
numbered = deduped.withColumn("rn", row_number().over(w))
# Step 3: Island key = date - rn (constant for consecutive dates)
from pyspark.sql.functions import date_sub, expr
islands = numbered.withColumn(
"island_key",
expr("date_sub(order_date, rn)") # Spark SQL: date - rn = island constant
)
# Step 4: Group by island, find streaks >= 3
result = islands.groupBy("customer_id", "island_key") \
.agg(count("*").alias("streak_len")) \
.filter(col("streak_len") >= 3) \
.select("customer_id") \
.distinct()
result.show()
Q21 — Session ID assignment (gap > 30 minutes = new session)
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, sum as _sum, unix_timestamp, when
w_order = Window.partitionBy("user_id").orderBy("event_time")
# Step 1: Flag new session start (gap > 30 min or first event)
flagged = events.withColumn(
"prev_time", lag("event_time", 1).over(w_order)
).withColumn(
"new_session",
when(
col("prev_time").isNull() |
((unix_timestamp("event_time") - unix_timestamp("prev_time")) > 1800),
1
).otherwise(0)
)
# Step 2: Cumulative sum of new_session flags = session_id
w_cum = Window.partitionBy("user_id").orderBy("event_time") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
result = flagged.withColumn(
"session_id",
_sum("new_session").over(w_cum)
).select("user_id", "event_time", "session_id")
result.show()
Q22 — Days with temperature higher than previous day
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, col, datediff
w = Window.orderBy("record_date")
result = weather \
.withColumn("prev_temp", lag("temperature", 1).over(w)) \
.withColumn("prev_date", lag("record_date", 1).over(w)) \
.filter(
(col("temperature") > col("prev_temp")) &
(datediff(col("record_date"), col("prev_date")) == 1) # must be consecutive day
) \
.select("id", "record_date", "temperature")
result.show()
# ⚠️ KEY: must check datediff == 1 — data may have gaps (missing dates)
# Without this check, you'd compare non-adjacent days
Q23 — Longest streak of consecutive active days per user
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col, count, max as _max
from pyspark.sql.functions import expr
# Step 1: Deduplicate (1 row per user per date)
deduped = logins.select("user_id", "login_date").distinct()
# Step 2: Row number per user ordered by date
w = Window.partitionBy("user_id").orderBy("login_date")
numbered = deduped.withColumn("rn", row_number().over(w))
# Step 3: island_key = date - rn (constant for consecutive sequences)
islands = numbered.withColumn(
"island_key",
expr("date_sub(login_date, rn)")
)
# Step 4: Count streak length per island
streak_lengths = islands.groupBy("user_id", "island_key") \
.agg(count("*").alias("streak_len"))
# Step 5: Max streak per user
result = streak_lengths.groupBy("user_id") \
.agg(_max("streak_len").alias("longest_streak")) \
.orderBy(col("longest_streak").desc())
result.show()
Q24 — Products bought together (market basket)
from pyspark.sql.functions import col, count
o1 = order_items.alias("o1")
o2 = order_items.alias("o2")
result = o1.join(
o2,
(col("o1.order_id") == col("o2.order_id")) &
(col("o1.product_id") < col("o2.product_id")), # avoid duplicates + self-pairs
"inner"
) \
.groupBy(col("o1.product_id").alias("product_a"),
col("o2.product_id").alias("product_b")) \
.agg(count("*").alias("co_purchase_count")) \
.orderBy(col("co_purchase_count").desc()) \
.limit(5)
result.show()
# ⚡ KEY: o1.product_id < o2.product_id ensures each pair appears once
# Without this: (A,B) and (B,A) both appear = duplicates
Q25 — Funnel conversion rates
from pyspark.sql.functions import countDistinct, col, round, when, max as _max
# Method: MAX(CASE WHEN) per user — count users, not events
funnel = events.groupBy("user_id").agg(
_max(when(col("event_type") == "view", 1).otherwise(0)).alias("viewed"),
_max(when(col("event_type") == "add_to_cart", 1).otherwise(0)).alias("carted"),
_max(when(col("event_type") == "purchase", 1).otherwise(0)).alias("purchased")
)
from pyspark.sql.functions import sum as _sum, lit
summary = funnel.agg(
_sum("viewed").alias("total_views"),
_sum("carted").alias("total_carted"),
_sum("purchased").alias("total_purchased")
)
# Compute conversion rates
total_views = summary.collect()[0]["total_views"]
total_carted = summary.collect()[0]["total_carted"]
total_purch = summary.collect()[0]["total_purchased"]
print(f"View → Cart: {total_carted/total_views*100:.1f}%")
print(f"Cart → Purchase: {total_purch/total_carted*100:.1f}%")
print(f"Overall: {total_purch/total_views*100:.1f}%")
# ⚡ KEY: MAX(CASE WHEN) per user ensures each user counted once per stage
# Even if user viewed 10 times, they count as 1 viewer
Q26 — Dedup with composite key (keep most recent per key combo)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
w = Window.partitionBy("id", "dept").orderBy(col("updated_at").desc())
result = cdc_events \
.withColumn("rn", row_number().over(w)) \
.filter(col("rn") == 1) \
.drop("rn")
result.show()
# ⚡ row_number guarantees exactly 1 row per partition (unlike rank/dense_rank)
# Always use row_number for deduplication, not rank
SECTION 4: SCENARIO QUESTIONS (S)
Q27 — Handle data skew with salting
from pyspark.sql.functions import rand, concat, lit, col, explode, array
SALT_FACTOR = 10
# APPROACH 1: Broadcast (if countries table is small — BEST approach)
from pyspark.sql.functions import broadcast
result = orders.join(broadcast(countries), "country_code")
# APPROACH 2: Salting (when both tables are large and skewed)
# Step 1: Salt the large orders table
orders_salted = orders.withColumn(
"salted_key",
concat(col("country_code"), lit("_"),
(rand() * SALT_FACTOR).cast("int").cast("string"))
)
# Step 2: Explode countries to match all salt values (0 to SALT_FACTOR-1)
countries_exploded = countries.withColumn(
"salt", explode(array([lit(i) for i in range(SALT_FACTOR)]))
).withColumn(
"salted_key",
concat(col("country_code"), lit("_"), col("salt").cast("string"))
)
# Step 3: Join on salted key
result = orders_salted.join(countries_exploded, "salted_key", "inner") \
.drop("salted_key", "salt")
result.show()
# APPROACH 3: Enable AQE (automatic, no code change)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE detects hot partitions at runtime and splits them
Q28 — Optimize slow join with broadcast hint
10 MB
# SOLUTION 1: Manual broadcast hint (works immediately)
result = transactions.join(
broadcast(store_metadata), # forces BroadcastHashJoin, no shuffle on store_metadata
"store_id"
)
# SOLUTION 2: Increase threshold (if you want auto-broadcast for similar tables)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB
# SOLUTION 3: Cache small table + broadcast
store_metadata.cache()
store_metadata.count() # trigger cache
result = transactions.join(broadcast(store_metadata), "store_id")
# VERIFICATION: Check join type in plan
result.explain("formatted")
# Look for: BroadcastHashJoin (not SortMergeJoin)
# IMPACT: Sort-Merge Join = 2 shuffles + 2 sorts + merge per 100M rows
# Broadcast Hash Join = 0 shuffles + hash lookup per 100M rows
# Speedup: often 10-50x for large fact + small dim joins">from pyspark.sql.functions import broadcast
# Problem: Catalyst doesn't auto-broadcast (store_metadata stats not computed)
# Default autoBroadcastJoinThreshold = 10 MB
# SOLUTION 1: Manual broadcast hint (works immediately)
result = transactions.join(
broadcast(store_metadata), # forces BroadcastHashJoin, no shuffle on store_metadata
"store_id"
)
# SOLUTION 2: Increase threshold (if you want auto-broadcast for similar tables)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "52428800") # 50 MB
# SOLUTION 3: Cache small table + broadcast
store_metadata.cache()
store_metadata.count() # trigger cache
result = transactions.join(broadcast(store_metadata), "store_id")
# VERIFICATION: Check join type in plan
result.explain("formatted")
# Look for: BroadcastHashJoin (not SortMergeJoin)
# IMPACT: Sort-Merge Join = 2 shuffles + 2 sorts + merge per 100M rows
# Broadcast Hash Join = 0 shuffles + hash lookup per 100M rows
# Speedup: often 10-50x for large fact + small dim joins
Q29 — Read multiple sources + track file origin
from pyspark.sql.functions import input_file_name, col, lit
# APPROACH 1: Glob pattern (all files match same schema)
df = spark.read.parquet("s3://bucket/data/year=2024/month=*/") \
.withColumn("source_file", input_file_name())
# APPROACH 2: Read separately + unionByName (different schemas)
jan = spark.read.parquet("s3://bucket/data/year=2024/month=01/") \
.withColumn("source_month", lit("Jan"))
feb = spark.read.parquet("s3://bucket/data/year=2024/month=02/") \
.withColumn("source_month", lit("Feb"))
mar = spark.read.parquet("s3://bucket/data/year=2024/month=03/") \
.withColumn("source_month", lit("Mar"))
# allowMissingColumns=True fills missing cols with null instead of failing
combined = jan.unionByName(feb, allowMissingColumns=True) \
.unionByName(mar, allowMissingColumns=True)
combined.show()
# APPROACH 3: Python list of paths
paths = [
"s3://bucket/2024-01/",
"s3://bucket/2024-02/",
"s3://bucket/2024-03/"
]
df = spark.read.parquet(*paths).withColumn("source_file", input_file_name())
Q30 — Flatten nested JSON to flat table
from pyspark.sql.functions import col, explode_outer
# Read raw JSON (schema inferred or explicit)
raw = spark.read.option("multiLine", True).json("orders/*.json")
# raw schema:
# order_id: long
# customer: struct<id: long, name: string, city: string>
# items: array<struct<sku: string, qty: int, price: double>>
# Step 1: Explode items array (one row per item)
exploded = raw.withColumn("item", explode_outer("items")) # outer = keep null items
# Step 2: Select and flatten nested fields with dot notation
result = exploded.select(
col("order_id"),
col("customer.id").alias("customer_id"),
col("customer.name").alias("customer_name"),
col("customer.city").alias("city"),
col("item.sku").alias("sku"),
col("item.qty").alias("qty"),
col("item.price").alias("price")
)
result.show()
# ⚡ explode_outer vs explode:
# explode → drops orders with null/empty items array
# explode_outer → keeps the order row with null item fields (safer)
SECTION 5: LARGE DATA + MEMORY + INFRASTRUCTURE SCENARIOS
Q31 — Process 200 GB data with only 16 GB executor memory
200 MB in memory
# 200 GB data → 200,000 MB / 128 MB per partition = ~1,600 partitions
data_size_gb = 200
target_partition_mb = 128
n_partitions = int((data_size_gb * 1024) / target_partition_mb) # = 1600
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))
# ── STEP 2: TUNE EXECUTOR MEMORY ─────────────────────────────────────────────
# With 16 GB executor, 4 cores → each core gets ~4 GB working memory
# Memory layout:
# spark.executor.memory = 14g (leave 2g for OS + overhead)
# spark.executor.memoryOverhead = 2g (native/Python overhead)
# spark.memory.fraction = 0.6 → 8.4g for Spark execution + storage
# Each core → ~2g execution memory for shuffle/joins
# ── STEP 3: AVOID CACHING 200 GB ──────────────────────────────────────────────
# DO NOT: df.cache() ← 200 GB won't fit, causes eviction thrashing
# DO: Process in a streaming fashion (pipeline partitions through)
# ── STEP 4: FILTER EARLY — reduce data before joins ──────────────────────────
df = spark.read.parquet("s3://bucket/data/") \
.filter(col("date") >= "2024-01-01") \ # push filter to Parquet reader
.filter(col("region") == "US") \ # reduce data ASAP
.select("id", "amount", "date") # column pruning (read only needed cols)
# ── STEP 5: AVOID WIDE OPERATIONS ON FULL 200 GB ──────────────────────────────
# Bad: join 200 GB table with another 200 GB table (400 GB shuffle)
# Good: pre-aggregate one side first, then join reduced result
pre_agg = large_df.groupBy("customer_id") \
.agg(sum("amount").alias("total")) # 200 GB → maybe 10 GB after aggregation
result = pre_agg.join(customers, "customer_id") # join 10 GB, not 200 GB
# ── STEP 6: ENABLE AQE ───────────────────────────────────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE will auto-split/merge partitions based on actual sizes
# ── STEP 7: WRITE IN PARTITIONS, NOT COLLECT ─────────────────────────────────
# NEVER: result.collect() ← pulls 200 GB to Driver → Driver OOM
# DO: result.write.parquet("s3://output/") ← each executor writes its partitions
# ── STEP 8: USE DISK SPILL (not OOM) ─────────────────────────────────────────
# If a shuffle really must exceed memory, configure spill to disk:
spark.conf.set("spark.executor.memory", "14g")
spark.conf.set("spark.memory.fraction", "0.6")
# Execution pool will spill to disk automatically (slowdown vs OOM)
# ── FULL CONFIG FOR THIS SCENARIO ────────────────────────────────────────────
spark = SparkSession.builder \
.config("spark.executor.memory", "14g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.sql.shuffle.partitions", "1600") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") \ # 128 MB
.config("spark.sql.files.maxPartitionBytes", "134217728") \ # 128 MB per input partition
.getOrCreate()"># ── STEP 1: ESTIMATE RIGHT PARTITION COUNT ──────────────────────────────────
# Rule: each partition should be ~100-200 MB in memory
# 200 GB data → 200,000 MB / 128 MB per partition = ~1,600 partitions
data_size_gb = 200
target_partition_mb = 128
n_partitions = int((data_size_gb * 1024) / target_partition_mb) # = 1600
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))
# ── STEP 2: TUNE EXECUTOR MEMORY ─────────────────────────────────────────────
# With 16 GB executor, 4 cores → each core gets ~4 GB working memory
# Memory layout:
# spark.executor.memory = 14g (leave 2g for OS + overhead)
# spark.executor.memoryOverhead = 2g (native/Python overhead)
# spark.memory.fraction = 0.6 → 8.4g for Spark execution + storage
# Each core → ~2g execution memory for shuffle/joins
# ── STEP 3: AVOID CACHING 200 GB ──────────────────────────────────────────────
# DO NOT: df.cache() ← 200 GB won't fit, causes eviction thrashing
# DO: Process in a streaming fashion (pipeline partitions through)
# ── STEP 4: FILTER EARLY — reduce data before joins ──────────────────────────
df = spark.read.parquet("s3://bucket/data/") \
.filter(col("date") >= "2024-01-01") \ # push filter to Parquet reader
.filter(col("region") == "US") \ # reduce data ASAP
.select("id", "amount", "date") # column pruning (read only needed cols)
# ── STEP 5: AVOID WIDE OPERATIONS ON FULL 200 GB ──────────────────────────────
# Bad: join 200 GB table with another 200 GB table (400 GB shuffle)
# Good: pre-aggregate one side first, then join reduced result
pre_agg = large_df.groupBy("customer_id") \
.agg(sum("amount").alias("total")) # 200 GB → maybe 10 GB after aggregation
result = pre_agg.join(customers, "customer_id") # join 10 GB, not 200 GB
# ── STEP 6: ENABLE AQE ───────────────────────────────────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
# AQE will auto-split/merge partitions based on actual sizes
# ── STEP 7: WRITE IN PARTITIONS, NOT COLLECT ─────────────────────────────────
# NEVER: result.collect() ← pulls 200 GB to Driver → Driver OOM
# DO: result.write.parquet("s3://output/") ← each executor writes its partitions
# ── STEP 8: USE DISK SPILL (not OOM) ─────────────────────────────────────────
# If a shuffle really must exceed memory, configure spill to disk:
spark.conf.set("spark.executor.memory", "14g")
spark.conf.set("spark.memory.fraction", "0.6")
# Execution pool will spill to disk automatically (slowdown vs OOM)
# ── FULL CONFIG FOR THIS SCENARIO ────────────────────────────────────────────
spark = SparkSession.builder \
.config("spark.executor.memory", "14g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.memoryOverhead", "2g") \
.config("spark.sql.shuffle.partitions", "1600") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
.config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") \ # 128 MB
.config("spark.sql.files.maxPartitionBytes", "134217728") \ # 128 MB per input partition
.getOrCreate()
Q32 — Reading bad/corrupt data — 3 modes + handling
# ── THE 3 READ MODES ─────────────────────────────────────────────────────────
#
# PERMISSIVE (default): Parse what you can, set corrupt record to null
# Adds _corrupt_record column with the bad raw line
# DROPMALFORMED: Silently drop bad rows, keep good ones
# FAILFAST: Throw exception on first bad record (fail the job)
# ── MODE 1: PERMISSIVE — capture bad rows for inspection ─────────────────────
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
schema = StructType([
StructField("id", IntegerType(), nullable=True),
StructField("name", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("_corrupt_record", StringType(), nullable=True) # captures bad rows
])
df = spark.read \
.schema(schema) \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.option("header", True) \
.csv("data/vendor_feed.csv")
# Separate good and bad records
good_records = df.filter(col("_corrupt_record").isNull()).drop("_corrupt_record")
bad_records = df.filter(col("_corrupt_record").isNotNull())
print(f"Good rows: {good_records.count()}")
print(f"Bad rows: {bad_records.count()}")
# Save bad records for investigation / reprocessing
bad_records.write.mode("overwrite").json("s3://bucket/bad-records/")
# ── MODE 2: DROPMALFORMED — silently drop bad rows ────────────────────────────
df_clean = spark.read \
.schema(schema) \
.option("mode", "DROPMALFORMED") \
.option("header", True) \
.csv("data/vendor_feed.csv")
# Good for: non-critical data where a few bad rows are acceptable
# ── MODE 3: FAILFAST — crash on any bad row ───────────────────────────────────
df_strict = spark.read \
.schema(schema) \
.option("mode", "FAILFAST") \
.option("header", True) \
.csv("data/vendor_feed.csv")
# Good for: financial/compliance data where ANY bad row = stop everything
# ── BADRECORDSPATH — Spark 2.2+ auto-save bad records ────────────────────────
df = spark.read \
.schema(schema) \
.option("badRecordsPath", "s3://bucket/bad-records/") \
.option("header", True) \
.csv("data/vendor_feed.csv")
# Spark automatically writes all bad records + reasons to badRecordsPath
# Each bad record file: { "path": "...", "reason": "...", "record": "..." }
# ── JSON-SPECIFIC: multiline + corrupt handling ───────────────────────────────
df_json = spark.read \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt_record") \
.option("multiLine", True) \ # for JSON arrays/objects spanning multiple lines
.json("data/*.json")
# ── PARQUET corrupt file handling ─────────────────────────────────────────────
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true") # skip corrupt files
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true") # skip missing files
df_parquet = spark.read.parquet("s3://bucket/data/")
# Great for: reading historical S3 data where some files may be deleted/corrupt
# ── NULL HANDLING AFTER READ ──────────────────────────────────────────────────
# After PERMISSIVE read: types that failed cast → null
df_clean = df \
.filter(col("id").isNotNull()) \ # drop rows where id failed to parse
.na.fill({"amount": 0.0, "name": "UNKNOWN"}) # fill other nulls
Q33 — Recursively read all files from a directory tree
24 hours (Spark 3.4+):
df = spark.read \
.option("modifiedAfter", "2024-01-01T00:00:00") \
.option("modifiedBefore", "2024-01-02T00:00:00") \
.option("recursiveFileLookup", "true") \
.parquet("s3://bucket/data/")
# ── METHOD 5: Python glob → collect paths → pass to Spark ────────────────────
import boto3, os
# For S3: list all parquet files recursively
s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "data/2024/"
all_paths = [
f"s3://{bucket}/{obj['Key']}"
for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
if obj["Key"].endswith(".parquet")
]
df = spark.read.parquet(*all_paths)
# For local filesystem:
import glob
all_paths = glob.glob("/data/**/*.parquet", recursive=True) # recursive=True is key!
df = spark.read.parquet(*all_paths)
# ── ADD SOURCE FILE TRACKING ──────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name
df = spark.read \
.option("recursiveFileLookup", "true") \
.parquet("s3://bucket/data/") \
.withColumn("source_file", input_file_name())
df.show(truncate=False)
# Each row knows exactly which file it came from
# ── VALIDATE WHAT WAS READ ────────────────────────────────────────────────────
df.select("source_file").distinct().show(100, truncate=False)
# Shows all unique file paths that contributed to the DataFrame"># ── METHOD 1: recursiveFileLookup (Spark 3.0+) ────────────────────────────────
# Reads ALL files recursively under the root path — ignores partition structure
df = spark.read \
.option("recursiveFileLookup", "true") \
.parquet("s3://bucket/data/")
# Reads every .parquet file in the entire /data/ tree recursively
# ── METHOD 2: Glob patterns (most flexible) ───────────────────────────────────
# All files 2 levels deep:
df = spark.read.parquet("s3://bucket/data/*/*/")
# All files any depth (Hadoop glob):
df = spark.read.parquet("s3://bucket/data/") # Spark auto-recurses for Parquet dirs
# Specific year range:
df = spark.read.parquet("s3://bucket/data/2024/*/")
# Multiple specific months:
df = spark.read.parquet(
"s3://bucket/data/2024/01/",
"s3://bucket/data/2024/02/",
"s3://bucket/data/2024/03/"
)
# ── METHOD 3: pathGlobFilter — read only specific file types ──────────────────
# In a mixed directory (CSVs + Parquets + JSONs), read only CSVs:
df = spark.read \
.option("pathGlobFilter", "*.csv") \
.option("recursiveFileLookup", "true") \
.csv("s3://bucket/mixed-data/")
# Read only files matching a date pattern:
df = spark.read \
.option("pathGlobFilter", "2024-01-*.parquet") \
.parquet("s3://bucket/data/")
# ── METHOD 4: modifiedAfter / modifiedBefore — time-based filtering ───────────
# Read only files modified in the last 24 hours (Spark 3.4+):
df = spark.read \
.option("modifiedAfter", "2024-01-01T00:00:00") \
.option("modifiedBefore", "2024-01-02T00:00:00") \
.option("recursiveFileLookup", "true") \
.parquet("s3://bucket/data/")
# ── METHOD 5: Python glob → collect paths → pass to Spark ────────────────────
import boto3, os
# For S3: list all parquet files recursively
s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "data/2024/"
all_paths = [
f"s3://{bucket}/{obj['Key']}"
for obj in s3.list_objects_v2(Bucket=bucket, Prefix=prefix)["Contents"]
if obj["Key"].endswith(".parquet")
]
df = spark.read.parquet(*all_paths)
# For local filesystem:
import glob
all_paths = glob.glob("/data/**/*.parquet", recursive=True) # recursive=True is key!
df = spark.read.parquet(*all_paths)
# ── ADD SOURCE FILE TRACKING ──────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name
df = spark.read \
.option("recursiveFileLookup", "true") \
.parquet("s3://bucket/data/") \
.withColumn("source_file", input_file_name())
df.show(truncate=False)
# Each row knows exactly which file it came from
# ── VALIDATE WHAT WAS READ ────────────────────────────────────────────────────
df.select("source_file").distinct().show(100, truncate=False)
# Shows all unique file paths that contributed to the DataFrame
Q34 — OOM during groupBy on 500M rows
# ── DIAGNOSE THE PROBLEM ──────────────────────────────────────────────────────
# collect_list() = collects ALL values per key into memory on ONE executor
# If "US" has 400M events → collect_list creates 400M element list in 1 executor
# That's the OOM culprit, not groupBy itself
# ── SOLUTION 1: Don't use collect_list — aggregate instead ────────────────────
# Bad (OOM):
df.groupBy("country").agg(collect_list("event"))
# Good (if you need count/sum/stats, not the full list):
from pyspark.sql.functions import count, countDistinct, sum
df.groupBy("country").agg(
count("event").alias("event_count"),
countDistinct("event").alias("unique_events")
)
# ── SOLUTION 2: Use collect_set with limit ────────────────────────────────────
from pyspark.sql.functions import slice, collect_list, col
# Sample first N values per key (avoid unbounded collection)
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window.partitionBy("country").orderBy("event_time")
df_limited = df.withColumn("rn", row_number().over(w)) \
.filter(col("rn") <= 1000) # max 1000 events per country
result = df_limited.groupBy("country").agg(collect_list("event").alias("top_events"))
# ── SOLUTION 3: Increase shuffle partitions to reduce per-partition size ───────
spark.conf.set("spark.sql.shuffle.partitions", "2000") # more, smaller partitions
spark.conf.set("spark.executor.memory", "16g")
# More partitions = less data per task = less memory per task
# ── SOLUTION 4: Pre-filter to reduce data volume ─────────────────────────────
# Before the groupBy, eliminate data you don't need
df_filtered = df.filter(col("date") >= "2024-01-01") \
.filter(col("is_valid") == True) \
.select("country", "event", "event_time") # column pruning
result = df_filtered.groupBy("country").agg(count("event"))
# ── SOLUTION 5: Use aggregateByKey (RDD) for full control ─────────────────────
# When DataFrame API can't express what you need
rdd = df.select("country", "event").rdd.map(lambda r: (r.country, r.event))
# Pre-aggregate locally: build set of unique events per partition
from pyspark.sql.functions import col
result_rdd = rdd.aggregateByKey(
set(), # initial accumulator = empty set
lambda acc, val: acc | {val}, # within partition: union set
lambda acc1, acc2: acc1 | acc2 # cross partition: union sets
)
result = spark.createDataFrame(
result_rdd.map(lambda x: (x[0], list(x[1]))),
["country", "unique_events"]
)
# ── SOLUTION 6: Enable AQE skew handling ──────────────────────────────────────
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "3")
# AQE splits the "US" partition into smaller sub-partitions automatically
Q35 — Job works on 10 GB weekdays, fails on 500 GB Mondays
10 GB, fail at 500 GB ─────────────────
# ── SOLUTION 1: Enable AQE (auto-adapts at runtime) ───────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE merges tiny partitions on small days, splits large ones on big days
# shuffle.partitions can be set to 2000 and AQE will coalesce on small days
# ── SOLUTION 2: Dynamic Allocation (scale executors to data) ───────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200") # scale up for Monday
spark.conf.set("spark.shuffle.service.enabled", "true")
# On 10 GB day: ~5 executors. On 500 GB Monday: auto-scales to 100+ executors
# ── SOLUTION 3: Data-driven shuffle partition calculation ─────────────────────
# Before running the main job, estimate data size and set partitions
def estimate_partitions(spark, path, target_mb=128):
"""Estimate optimal partition count from input data size."""
file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
.FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
spark.sparkContext._jsc.hadoopConfiguration()) \
.getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
.getLength()
size_mb = file_sizes / (1024 * 1024)
partitions = max(200, int(size_mb / target_mb))
return partitions
n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))
# ── SOLUTION 4: Add checkpoint mid-job (survive retries on large runs) ─────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df = spark.read.parquet("s3://bucket/data/")
# After expensive operation: checkpoint to cut lineage + save progress
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint() # writes to S3 — if job fails, restart from here
after_join.unpersist()
# ── SOLUTION 5: Partition the input table by date for pruning ─────────────────
# Write data partitioned by date so Monday read only reads weekend partition
df.write \
.partitionBy("year", "month", "day") \
.mode("append") \
.parquet("s3://bucket/data-partitioned/")
# Monday read: only reads Sat+Sun partitions (not entire history)
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
.filter(col("year") == 2024) \
.filter(col("month") == 1) \
.filter(col("day").isin(6, 7)) # Saturday + Sunday only
# ── COMPLETE ROBUST ETL PATTERN ───────────────────────────────────────────────
from datetime import datetime, timedelta
def run_etl(spark, date_str):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.sql.shuffle.partitions", "2000") # AQE coalesces down
df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
.filter(col("is_valid") == True) \
.select("id", "amount", "category")
result = df.groupBy("category") \
.agg(sum("amount").alias("total"))
result.write.mode("overwrite") \
.parquet(f"s3://bucket/output/date={date_str}/")"># ── ROOT CAUSE: Static configs tuned for 10 GB, fail at 500 GB ─────────────────
# ── SOLUTION 1: Enable AQE (auto-adapts at runtime) ───────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE merges tiny partitions on small days, splits large ones on big days
# shuffle.partitions can be set to 2000 and AQE will coalesce on small days
# ── SOLUTION 2: Dynamic Allocation (scale executors to data) ───────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200") # scale up for Monday
spark.conf.set("spark.shuffle.service.enabled", "true")
# On 10 GB day: ~5 executors. On 500 GB Monday: auto-scales to 100+ executors
# ── SOLUTION 3: Data-driven shuffle partition calculation ─────────────────────
# Before running the main job, estimate data size and set partitions
def estimate_partitions(spark, path, target_mb=128):
"""Estimate optimal partition count from input data size."""
file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
.FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
spark.sparkContext._jsc.hadoopConfiguration()) \
.getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
.getLength()
size_mb = file_sizes / (1024 * 1024)
partitions = max(200, int(size_mb / target_mb))
return partitions
n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))
# ── SOLUTION 4: Add checkpoint mid-job (survive retries on large runs) ─────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df = spark.read.parquet("s3://bucket/data/")
# After expensive operation: checkpoint to cut lineage + save progress
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint() # writes to S3 — if job fails, restart from here
after_join.unpersist()
# ── SOLUTION 5: Partition the input table by date for pruning ─────────────────
# Write data partitioned by date so Monday read only reads weekend partition
df.write \
.partitionBy("year", "month", "day") \
.mode("append") \
.parquet("s3://bucket/data-partitioned/")
# Monday read: only reads Sat+Sun partitions (not entire history)
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
.filter(col("year") == 2024) \
.filter(col("month") == 1) \
.filter(col("day").isin(6, 7)) # Saturday + Sunday only
# ── COMPLETE ROBUST ETL PATTERN ───────────────────────────────────────────────
from datetime import datetime, timedelta
def run_etl(spark, date_str):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.sql.shuffle.partitions", "2000") # AQE coalesces down
df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
.filter(col("is_valid") == True) \
.select("id", "amount", "category")
result = df.groupBy("category") \
.agg(sum("amount").alias("total"))
result.write.mode("overwrite") \
.parquet(f"s3://bucket/output/date={date_str}/")
Q36 — Schema evolution — new column added to source
# ── PROBLEM: Reading old + new files fails with schema mismatch ───────────────
# old files: (id, amount, category)
# new files: (id, amount, category, discount_pct) ← new column
# spark.read.parquet("all/") → AnalysisException: schema mismatch
# ── SOLUTION 1: mergeSchema option (Parquet + Delta native) ───────────────────
df = spark.read \
.option("mergeSchema", "true") \ # unions all schemas, missing cols = null
.parquet("s3://bucket/data/")
# Old file rows: discount_pct = null
# New file rows: discount_pct = actual value
# ── SOLUTION 2: unionByName with allowMissingColumns ──────────────────────────
old_df = spark.read.parquet("s3://bucket/data/before-2024-03-01/")
new_df = spark.read.parquet("s3://bucket/data/from-2024-03-01/")
combined = old_df.unionByName(new_df, allowMissingColumns=True)
# old_df rows: discount_pct = null (added automatically)
# new_df rows: discount_pct = actual value
# ── SOLUTION 3: Define explicit schema that includes all columns ───────────────
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType
full_schema = StructType([
StructField("id", LongType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("category", StringType(), nullable=True),
StructField("discount_pct", DoubleType(), nullable=True) # new col, nullable
])
# Read old files with explicit schema → missing columns filled with null
old_df = spark.read.schema(full_schema).parquet("s3://bucket/data/before-2024-03-01/")
# No error — schema is applied, discount_pct = null for all old rows
# ── SOLUTION 4: Delta Lake (best for production schema evolution) ─────────────
# When writing:
new_df.write \
.format("delta") \
.option("mergeSchema", "true") \ # allow adding new columns
.mode("append") \
.save("s3://bucket/delta-table/")
# Or set globally:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# ── HANDLE NULL in new column after merge ─────────────────────────────────────
from pyspark.sql.functions import coalesce, lit
combined = combined.withColumn(
"discount_pct",
coalesce(col("discount_pct"), lit(0.0)) # treat missing as 0% discount
)
Q37 — How to estimate the right number of partitions for a job
200 MB per partition (sweet spot)
# Too small: task scheduling overhead + small file problem
# Too large: OOM risk, low parallelism
# FORMULA: n_partitions = ceil(data_size_MB / target_partition_MB)
# For 500 GB: 500,000 MB / 128 MB = ~3,900 partitions → set to 4000
data_size_gb = 500
target_part_mb = 128
n_partitions = int((data_size_gb * 1024) / target_part_mb) # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))
# ── ALSO: SET INPUT PARTITION SIZE ────────────────────────────────────────────
# Controls how Spark splits input files into tasks
spark.conf.set("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024)) # 128 MB
# ── CHECK CURRENT PARTITION COUNT ─────────────────────────────────────────────
df = spark.read.parquet("s3://bucket/500 gb-data/")
print(f"Input partitions: {df.rdd.getNumPartitions()}")
# Should be ~3900-4000 for 500 GB at 128 MB per partition
# ── VERIFY PARTITION BALANCE ──────────────────────────────────────────────────
from pyspark.sql.functions import spark_partition_id, count
df.withColumn("pid", spark_partition_id()) \
.groupBy("pid") \
.agg(count("*").alias("rows_in_partition")) \
.describe("rows_in_partition") \ # check mean, stddev, min, max
.show()
# Healthy: stddev/mean < 0.3 (relatively even)
# Skewed: max >> mean (one partition has many more rows)
# ── CORES vs PARTITIONS RELATIONSHIP ─────────────────────────────────────────
# Rule: partitions should be a MULTIPLE of total cores
# 50 executors × 4 cores = 200 total cores
# Good partition counts: 200, 400, 800, 1600, 4000 (multiples of 200)
# This ensures all cores stay busy (no idle cores waiting)
total_cores = 50 * 4 # 200
# Round partition count to nearest multiple of total_cores
import math
n_partitions_adjusted = math.ceil(n_partitions / total_cores) * total_cores # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions_adjusted))
# ── LET AQE DO FINAL TUNING ───────────────────────────────────────────────────
# Set partitions to a safe HIGH number → AQE coalesces down to right size
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") # 128 MB
spark.conf.set("spark.sql.shuffle.partitions", "8000") # AQE will reduce this
# INTERVIEW SUMMARY:
# 1. Formula: data_MB / 128 MB = partition count
# 2. Round to multiple of total executor cores
# 3. Set input maxPartitionBytes = same target (128 MB)
# 4. Enable AQE to auto-tune at runtime
# 5. Verify with describe() on partition sizes"># ── THE FORMULA ───────────────────────────────────────────────────────────────
# Target partition size: 100-200 MB per partition (sweet spot)
# Too small: task scheduling overhead + small file problem
# Too large: OOM risk, low parallelism
# FORMULA: n_partitions = ceil(data_size_MB / target_partition_MB)
# For 500 GB: 500,000 MB / 128 MB = ~3,900 partitions → set to 4000
data_size_gb = 500
target_part_mb = 128
n_partitions = int((data_size_gb * 1024) / target_part_mb) # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions))
# ── ALSO: SET INPUT PARTITION SIZE ────────────────────────────────────────────
# Controls how Spark splits input files into tasks
spark.conf.set("spark.sql.files.maxPartitionBytes", str(128 * 1024 * 1024)) # 128 MB
# ── CHECK CURRENT PARTITION COUNT ─────────────────────────────────────────────
df = spark.read.parquet("s3://bucket/500 gb-data/")
print(f"Input partitions: {df.rdd.getNumPartitions()}")
# Should be ~3900-4000 for 500 GB at 128 MB per partition
# ── VERIFY PARTITION BALANCE ──────────────────────────────────────────────────
from pyspark.sql.functions import spark_partition_id, count
df.withColumn("pid", spark_partition_id()) \
.groupBy("pid") \
.agg(count("*").alias("rows_in_partition")) \
.describe("rows_in_partition") \ # check mean, stddev, min, max
.show()
# Healthy: stddev/mean < 0.3 (relatively even)
# Skewed: max >> mean (one partition has many more rows)
# ── CORES vs PARTITIONS RELATIONSHIP ─────────────────────────────────────────
# Rule: partitions should be a MULTIPLE of total cores
# 50 executors × 4 cores = 200 total cores
# Good partition counts: 200, 400, 800, 1600, 4000 (multiples of 200)
# This ensures all cores stay busy (no idle cores waiting)
total_cores = 50 * 4 # 200
# Round partition count to nearest multiple of total_cores
import math
n_partitions_adjusted = math.ceil(n_partitions / total_cores) * total_cores # 4000
spark.conf.set("spark.sql.shuffle.partitions", str(n_partitions_adjusted))
# ── LET AQE DO FINAL TUNING ───────────────────────────────────────────────────
# Set partitions to a safe HIGH number → AQE coalesces down to right size
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "134217728") # 128 MB
spark.conf.set("spark.sql.shuffle.partitions", "8000") # AQE will reduce this
# INTERVIEW SUMMARY:
# 1. Formula: data_MB / 128 MB = partition count
# 2. Round to multiple of total executor cores
# 3. Set input maxPartitionBytes = same target (128 MB)
# 4. Enable AQE to auto-tune at runtime
# 5. Verify with describe() on partition sizes
Q38 — Avoid recomputing an expensive DataFrame 3 times
# ── PROBLEM: Without caching ─────────────────────────────────────────────────
expensive_df = raw.join(reference, "id") \ # expensive
.groupBy("region") \
.agg(sum("amount").alias("total")) # expensive aggregation
# Each action below triggers a FULL recomputation of expensive_df:
expensive_df.write.parquet("s3://output/") # compute #1
expensive_df.filter(col("total") > 1e6).show() # compute #2
anomalies = expensive_df.filter(col("total") > avg_total) # compute #3
# ── SOLUTION: Cache before first use ──────────────────────────────────────────
expensive_df = raw.join(reference, "id") \
.groupBy("region") \
.agg(sum("amount").alias("total"))
# Cache (materialize into executor memory/disk)
expensive_df.cache()
expensive_df.count() # TRIGGER the cache — forces computation NOW, stores result
# Now all 3 uses hit the cache, not recompute:
expensive_df.write.parquet("s3://output/") # reads from cache
expensive_df.filter(col("total") > 1e6).show() # reads from cache
anomalies = expensive_df.filter(col("total") > 1000000) # reads from cache
# Release cache when done
expensive_df.unpersist()
# ── CHOOSE STORAGE LEVEL ──────────────────────────────────────────────────────
from pyspark.storagelevel import StorageLevel
# If expensive_df fits in memory:
expensive_df.persist(StorageLevel.MEMORY_ONLY) # fastest reads
# If it's large or memory is limited:
expensive_df.persist(StorageLevel.MEMORY_AND_DISK) # spills to disk if needed
# If you want to be safe on a shared cluster:
expensive_df.persist(StorageLevel.DISK_ONLY) # always on disk (slowest)
# ── ALTERNATIVE: CHECKPOINT (if lineage is long) ──────────────────────────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
expensive_df.cache()
expensive_df.count() # trigger cache
expensive_df.checkpoint() # write to S3, cut lineage (extra safety)
expensive_df.unpersist() # release cache (checkpoint is the source now)
# Now: expensive_df reads from S3 checkpoint on reuse
# Advantage: survives executor failures (no recompute from original data)
# ── WRITE ONCE, READ MULTIPLE TIMES PATTERN (most reliable) ────────────────────
# For production: materialize to Parquet, read it back multiple times
expensive_df.write.mode("overwrite").parquet("s3://tmp/expensive-result/")
materialized = spark.read.parquet("s3://tmp/expensive-result/")
materialized.write.parquet("s3://output/")
materialized.filter(col("total") > 1e6).show()
anomalies = materialized.filter(col("total") > 1000000)
# Trade-off: extra write cost → but most resilient, works across sessions
Q39 — Read only specific file types from a mixed directory
# ── METHOD 1: pathGlobFilter (Spark 3.0+) ─────────────────────────────────────
df = spark.read \
.option("pathGlobFilter", "*.csv") \ # only .csv files
.option("recursiveFileLookup", "true") \ # recursively
.option("header", "true") \
.csv("s3://bucket/mixed-directory/")
# Multiple extensions (Hadoop glob syntax):
df = spark.read \
.option("pathGlobFilter", "*.{csv,tsv}") \ # csv OR tsv
.option("recursiveFileLookup", "true") \
.csv("s3://bucket/mixed-directory/")
# ── METHOD 2: Glob pattern in path ────────────────────────────────────────────
# All csv files any depth:
df = spark.read.csv("s3://bucket/mixed-directory/**/*.csv",
header=True)
# All csvs matching date pattern:
df = spark.read.csv("s3://bucket/data/2024-01-*.csv", header=True)
# ── METHOD 3: Manually list + filter using boto3 (S3) ────────────────────────
import boto3
s3 = boto3.client("s3")
bucket, prefix = "my-bucket", "mixed-directory/"
response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
csv_paths = [
f"s3://{bucket}/{obj['Key']}"
for obj in response.get("Contents", [])
if obj["Key"].endswith(".csv") and not obj["Key"].endswith(".tmp")
]
print(f"Found {len(csv_paths)} CSV files")
df = spark.read.option("header", True).csv(*csv_paths)
# ── METHOD 4: Exclude patterns using Python filter ────────────────────────────
import glob, os
all_files = glob.glob("/data/**/*", recursive=True)
# Keep only .parquet files, exclude hidden files and .tmp
parquet_files = [
f for f in all_files
if f.endswith(".parquet")
and not os.path.basename(f).startswith("_") # exclude _SUCCESS, _metadata
and not os.path.basename(f).startswith(".") # exclude hidden files
]
df = spark.read.parquet(*parquet_files)
# ── VERIFY WHAT WAS READ ──────────────────────────────────────────────────────
from pyspark.sql.functions import input_file_name
df_with_source = df.withColumn("src", input_file_name())
df_with_source.select("src").distinct().show(20, truncate=False)
Q40 — Handle late-arriving data in daily ETL
# ── STRATEGY 1: Reprocess last N days (simple, reliable) ──────────────────────
# Instead of processing only "today", always reprocess last 3 days
# Late data from yesterday will be picked up in today's run of "yesterday"
from datetime import datetime, timedelta
def run_etl_with_late_data(spark, run_date, lookback_days=3):
"""Process last N days to catch late arrivals."""
dates = [
(run_date - timedelta(days=i)).strftime("%Y-%m-%d")
for i in range(lookback_days)
]
df = spark.read.parquet("s3://bucket/raw/") \
.filter(col("event_date").isin(dates)) # read last 3 days
# Write with overwrite per partition (idempotent)
df.write \
.partitionBy("event_date") \
.mode("overwrite") \
.parquet("s3://bucket/processed/")
# Overwrites Sat+Sun+Mon partitions — Mon run fixes any late Sat/Sun data
# ── STRATEGY 2: Delta Lake MERGE (upsert late records) ────────────────────────
from delta.tables import DeltaTable
def upsert_late_records(spark, new_records_df):
"""Upsert new/late records into existing Delta table."""
delta_table = DeltaTable.forPath(spark, "s3://bucket/delta/events/")
delta_table.alias("target").merge(
new_records_df.alias("source"),
"target.event_id = source.event_id" # match on unique event ID
).whenMatchedUpdateAll() \ # update if already exists (idempotent)
.whenNotMatchedInsertAll() \ # insert if new
.execute()
# ── STRATEGY 3: Separate late arrival table ────────────────────────────────────
# Detect late records at write time
from pyspark.sql.functions import current_timestamp, datediff, to_date, col
df = spark.read.parquet("s3://bucket/raw/today/") \
.withColumn("processing_ts", current_timestamp()) \
.withColumn("days_late",
datediff(col("processing_ts"), col("event_date"))
)
# Separate on-time vs late
on_time = df.filter(col("days_late") == 0)
late = df.filter(col("days_late") > 0)
# Write on-time to regular partition
on_time.write.partitionBy("event_date").mode("append") \
.parquet("s3://bucket/events/")
# Write late records to separate location for auditing + reprocessing
late.write.mode("append") \
.parquet(f"s3://bucket/late-arrivals/{datetime.today().strftime('%Y-%m-%d')}/")
# ── STRATEGY 4: Watermark filter — ignore very old late data ──────────────────
MAX_LATE_DAYS = 7 # SLA: data older than 7 days is rejected
df_filtered = df.filter(
col("event_date") >= (current_timestamp().cast("date") - MAX_LATE_DAYS)
)
# Log how much data was rejected
total = df.count()
accepted = df_filtered.count()
rejected = total - accepted
print(f"Rejected {rejected} records older than {MAX_LATE_DAYS} days")
SECTION 6: AZURE CLOUD STORAGE + REAL-WORLD READING
Q41 — Read from Azure Blob Storage (old storage / wasbs://)
# Azure Blob Storage uses the wasbs:// (secure) or wasb:// protocol
# Format: wasbs://<container>@<storage-account>.blob.core.windows.net/<path>
# ── METHOD 1: Account Key (simplest, not for production) ─────────────────────
storage_account = "mystorageacct"
account_key = "your_account_key_here" # from Azure Portal → Access Keys
spark.conf.set(
f"fs.azure.account.key.{storage_account}.blob.core.windows.net",
account_key
)
df = spark.read.parquet(
"wasbs://raw-data@mystorageacct.blob.core.windows.net/parquet/orders/2024/"
)
df.show()
# ── METHOD 2: SAS Token (scoped access, time-limited) ────────────────────────
sas_token = "sv=2023-01-01&ss=b&srt=sco&sp=rl&..." # from Azure Portal
spark.conf.set(
f"fs.azure.sas.raw-data.{storage_account}.blob.core.windows.net",
sas_token
)
df = spark.read.csv(
"wasbs://raw-data@mystorageacct.blob.core.windows.net/csv/customers/",
header=True,
inferSchema=True
)
# ── METHOD 3: Service Principal / App Registration (production standard) ──────
# Used in Databricks / ADF / Synapse pipelines
tenant_id = "your-tenant-id"
client_id = "your-app-client-id"
client_secret = "your-app-client-secret"
spark.conf.set(
f"fs.azure.account.auth.type.{storage_account}.blob.core.windows.net",
"OAuth"
)
spark.conf.set(
f"fs.azure.account.oauth.provider.type.{storage_account}.blob.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
f"fs.azure.account.oauth2.client.id.{storage_account}.blob.core.windows.net",
client_id
)
spark.conf.set(
f"fs.azure.account.oauth2.client.secret.{storage_account}.blob.core.windows.net",
client_secret
)
spark.conf.set(
f"fs.azure.account.oauth2.client.endpoint.{storage_account}.blob.core.windows.net",
f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
)
df = spark.read.parquet(
"wasbs://raw-data@mystorageacct.blob.core.windows.net/parquet/orders/"
)
Q42 — Read from Azure Data Lake Storage Gen2 (ADLS Gen2 / abfss://)
# ADLS Gen2 uses abfss:// (hierarchical namespace enabled on storage account)
# Format: abfss://<filesystem>@<storage-account>.dfs.core.windows.net/<path>
#
# KEY DIFFERENCE from Blob Storage:
# Blob: wasbs://<container>@<account>.blob.core.windows.net/
# ADLS2: abfss://<filesystem>@<account>.dfs.core.windows.net/
# ↑ "dfs" endpoint vs "blob" endpoint
# ↑ abfss:// vs wasbs://
storage_account = "datalakeprod"
filesystem = "silver" # like a container in Blob, but hierarchical
# ── METHOD 1: Account Key ─────────────────────────────────────────────────────
spark.conf.set(
f"fs.azure.account.key.{storage_account}.dfs.core.windows.net",
"your-account-key"
)
df = spark.read.parquet(
f"abfss://{filesystem}@{storage_account}.dfs.core.windows.net/processed/transactions/year=2024/"
)
df.show()
# ── METHOD 2: Service Principal (production) ──────────────────────────────────
tenant_id = "your-tenant-id"
client_id = "your-service-principal-client-id"
client_secret = "your-service-principal-secret"
spark.conf.set(
f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net",
"OAuth"
)
spark.conf.set(
f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net",
client_id
)
spark.conf.set(
f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net",
client_secret
)
spark.conf.set(
f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net",
f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
)
# Read Parquet from ADLS Gen2
orders = spark.read.parquet(
"abfss://silver@datalakeprod.dfs.core.windows.net/processed/transactions/"
)
# Read Delta table from ADLS Gen2
delta_df = spark.read.format("delta").load(
"abfss://gold@datalakeprod.dfs.core.windows.net/curated/orders_final/"
)
# ── METHOD 3: Managed Identity (Databricks / Synapse — NO SECRETS) ─────────────
# In Azure Databricks with cluster-level Managed Identity configured:
# No spark.conf needed at all! Azure handles auth transparently.
df = spark.read.parquet(
"abfss://silver@datalakeprod.dfs.core.windows.net/processed/transactions/"
)
# Works automatically when Managed Identity is assigned to Databricks workspace
# ── READ MULTIPLE FORMATS FROM ADLS GEN2 ─────────────────────────────────────
base = "abfss://silver@datalakeprod.dfs.core.windows.net"
# CSV
customers = spark.read.option("header", True).csv(f"{base}/raw/customers/")
# JSON
events = spark.read.option("multiLine", True).json(f"{base}/raw/events/")
# Parquet (partitioned)
transactions = spark.read.parquet(f"{base}/processed/transactions/year=2024/month=*/")
# Delta
gold_layer = spark.read.format("delta").load(f"{base}/gold/fact_orders/")
# ORC
hive_data = spark.read.orc(f"{base}/hive-warehouse/sales/")
| Feature | Blob Storage | ADLS Gen2 |
|---|---|---|
| Protocol | wasbs:// | abfss:// |
| Endpoint | .blob.core.windows | .dfs.core.windows.net |
| Hierarchical namespace | NO (flat) | YES (folder structure) |
| ACLs (fine-grained) | NO | YES (POSIX-style ACLs) |
| Big data performance | Lower | Higher (optimized I/O) |
| Use for | General blob store | Data Lake / Delta Lake |
Q43 — Databricks: Mount ADLS Gen2 and read without long paths
# ── STEP 1: MOUNT THE STORAGE (run once, persists across cluster restarts) ────
configs = {
"fs.azure.account.auth.type": "OAuth",
"fs.azure.account.oauth.provider.type":
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
"fs.azure.account.oauth2.client.id": dbutils.secrets.get("kv-scope", "sp-client-id"),
"fs.azure.account.oauth2.client.secret": dbutils.secrets.get("kv-scope", "sp-secret"),
"fs.azure.account.oauth2.client.endpoint":
f"https://login.microsoftonline.com/{dbutils.secrets.get('kv-scope','tenant-id')}/oauth2/token"
}
# Mount the "silver" filesystem to /mnt/silver
dbutils.fs.mount(
source = "abfss://silver@datalakeprod.dfs.core.windows.net/",
mount_point = "/mnt/silver",
extra_configs = configs
)
# Mount the "gold" filesystem to /mnt/gold
dbutils.fs.mount(
source = "abfss://gold@datalakeprod.dfs.core.windows.net/",
mount_point = "/mnt/gold",
extra_configs = configs
)
# ── STEP 2: READ USING SIMPLE /mnt/ PATHS (in any notebook) ──────────────────
# Instead of: abfss://silver@datalakeprod.dfs.core.windows.net/processed/orders/
# Just use:
df = spark.read.parquet("/mnt/silver/processed/orders/")
customers = spark.read.option("header", True).csv("/mnt/silver/raw/customers/")
gold_df = spark.read.format("delta").load("/mnt/gold/curated/fact_sales/")
# ── STEP 3: LIST AND MANAGE MOUNTS ───────────────────────────────────────────
# List all current mounts
display(dbutils.fs.mounts())
# Check if mount exists before mounting (avoid error on duplicate)
mounted = any(m.mountPoint == "/mnt/silver" for m in dbutils.fs.mounts())
if not mounted:
dbutils.fs.mount(source="abfss://silver@...", mount_point="/mnt/silver",
extra_configs=configs)
# List files in mounted path
dbutils.fs.ls("/mnt/silver/processed/orders/")
# Unmount when needed
dbutils.fs.unmount("/mnt/silver")
# ── USE SECRETS (NEVER HARDCODE CREDENTIALS) ──────────────────────────────────
# In Databricks, always use dbutils.secrets — never paste keys in notebooks
# Create secret scope linked to Azure Key Vault:
# dbutils.secrets.get(scope="my-kv-scope", key="storage-account-key")
# ── ALTERNATIVE: Unity Catalog External Location (modern Databricks) ──────────
# Instead of mounts, Unity Catalog manages storage access centrally:
# CREATE EXTERNAL LOCATION silver_lake
# URL 'abfss://silver@datalakeprod.dfs.core.windows.net/'
# WITH (STORAGE CREDENTIAL my_credential);
#
# Then read directly:
df = spark.read.parquet("abfss://silver@datalakeprod.dfs.core.windows.net/data/")
# Access controlled by Unity Catalog permissions, not per-notebook config
Q44 — Read from Azure Synapse Analytics (SQL Pool) via PySpark
1 partition = 1 thread = very slow for large tables
# Fix: Add parallel read config:
df = spark.read.format("jdbc") \
.option("url", synapse_url) \
.option("dbtable", "dbo.FactSales") \
.option("user", "sqladminuser") \
.option("password", dbutils.secrets.get("scope", "synapse-password")) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("numPartitions", "20") \ # parallel reads
.option("partitionColumn", "SaleID") \ # split by this column
.option("lowerBound", "1") \
.option("upperBound", "10000000") \ # estimated max SaleID
.load()
# ── METHOD 2: Synapse Connector for Databricks (PolyBase — fast!) ──────────────
# Azure Synapse Analytics connector uses PolyBase/COPY INTO via ADLS staging
# Much faster than JDBC for large tables (parallel bulk export)
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", synapse_url) \
.option("tempDir", "abfss://temp@datalakeprod.dfs.core.windows.net/staging/") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "dbo.FactSales") \
.load()
# ── METHOD 3: Push query down (read only what you need) ───────────────────────
query = """
(SELECT SaleID, CustomerID, Amount, SaleDate
FROM dbo.FactSales
WHERE SaleDate >= '2024-01-01'
AND Region = 'APAC') AS filtered_sales
"""
df = spark.read.format("jdbc") \
.option("url", synapse_url) \
.option("dbtable", query) \ # push filter to Synapse, not pull all data
.option("user", "sqladminuser") \
.option("password", dbutils.secrets.get("scope", "synapse-password")) \
.load()"># ── METHOD 1: JDBC (simple but slow — single thread, no parallelism) ──────────
synapse_url = (
"jdbc:sqlserver://myworkspace.sql.azuresynapse.net:1433;"
"database=mydb;encrypt=true;trustServerCertificate=false;"
"hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30"
)
df = spark.read.format("jdbc") \
.option("url", synapse_url) \
.option("dbtable", "dbo.FactSales") \
.option("user", "sqladminuser") \
.option("password", dbutils.secrets.get("scope", "synapse-password")) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
# ⚠️ TRAP: Default JDBC = 1 partition = 1 thread = very slow for large tables
# Fix: Add parallel read config:
df = spark.read.format("jdbc") \
.option("url", synapse_url) \
.option("dbtable", "dbo.FactSales") \
.option("user", "sqladminuser") \
.option("password", dbutils.secrets.get("scope", "synapse-password")) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("numPartitions", "20") \ # parallel reads
.option("partitionColumn", "SaleID") \ # split by this column
.option("lowerBound", "1") \
.option("upperBound", "10000000") \ # estimated max SaleID
.load()
# ── METHOD 2: Synapse Connector for Databricks (PolyBase — fast!) ──────────────
# Azure Synapse Analytics connector uses PolyBase/COPY INTO via ADLS staging
# Much faster than JDBC for large tables (parallel bulk export)
df = spark.read \
.format("com.databricks.spark.sqldw") \
.option("url", synapse_url) \
.option("tempDir", "abfss://temp@datalakeprod.dfs.core.windows.net/staging/") \
.option("forwardSparkAzureStorageCredentials", "true") \
.option("dbTable", "dbo.FactSales") \
.load()
# ── METHOD 3: Push query down (read only what you need) ───────────────────────
query = """
(SELECT SaleID, CustomerID, Amount, SaleDate
FROM dbo.FactSales
WHERE SaleDate >= '2024-01-01'
AND Region = 'APAC') AS filtered_sales
"""
df = spark.read.format("jdbc") \
.option("url", synapse_url) \
.option("dbtable", query) \ # push filter to Synapse, not pull all data
.option("user", "sqladminuser") \
.option("password", dbutils.secrets.get("scope", "synapse-password")) \
.load()
Q45 — Read from Azure Event Hubs / Kafka into PySpark (Structured Streaming)
# Azure Event Hubs is Kafka-compatible → use Spark Kafka connector
# Event Hubs Kafka endpoint: <namespace>.servicebus.windows.net:9093
# ── CONNECTION STRING from Azure Portal → Event Hubs → Shared Access Policies
connection_string = dbutils.secrets.get("scope", "eventhub-connection-string")
# Format: "Endpoint=sb://ns.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=..."
# Build Kafka SASL config from Event Hubs connection string
SASL_CONFIG = (
f'org.apache.kafka.common.security.plain.PlainLoginModule required '
f'username="$ConnectionString" '
f'password="{connection_string}";'
)
# ── READ AS STREAMING DATAFRAME ───────────────────────────────────────────────
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers",
"mynamespace.servicebus.windows.net:9093") \
.option("subscribe", "my-event-hub-name") \ # topic = event hub name
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config", SASL_CONFIG) \
.option("startingOffsets", "latest") \ # or "earliest"
.load()
# ── PARSE THE MESSAGE (value is binary → deserialize) ────────────────────────
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
event_schema = StructType([
StructField("event_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("event_time", TimestampType(), True)
])
parsed_df = stream_df \
.select(
col("key").cast("string").alias("partition_key"),
from_json(col("value").cast("string"), event_schema).alias("data"),
col("timestamp").alias("ingest_time")
) \
.select("partition_key", "data.*", "ingest_time")
# ── WRITE STREAM TO ADLS GEN2 / DELTA ────────────────────────────────────────
query = parsed_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation",
"abfss://checkpoints@datalake.dfs.core.windows.net/eventhub-stream/") \
.start("abfss://silver@datalakeprod.dfs.core.windows.net/streaming/events/")
query.awaitTermination()
# ── READ AS BATCH (for one-time historical read) ──────────────────────────────
batch_df = spark.read \
.format("kafka") \
.option("kafka.bootstrap.servers",
"mynamespace.servicebus.windows.net:9093") \
.option("subscribe", "my-event-hub-name") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.sasl.jaas.config", SASL_CONFIG) \
.option("startingOffsets", "earliest") \ # read ALL historical events
.option("endingOffsets", "latest") \
.load()
Q46 — Full Azure Data Lake Architecture + PySpark read pattern (Medallion)
MEDALLION ARCHITECTURE ON AZURE:
External Sources
(Blob, RDBMS, APIs, Event Hubs)
↓ [ADF / Databricks ingest]
┌─────────────────────────────────────┐
│ BRONZE Layer (raw, immutable) │ ← abfss://bronze@datalake.dfs...
│ Format: raw CSV/JSON/Parquet │ ← partition by ingest_date
│ No transformation, as-is │
└─────────────────────────────────────┘
↓ [Databricks / Synapse Spark]
┌─────────────────────────────────────┐
│ SILVER Layer (cleaned, conformed) │ ← abfss://silver@datalake.dfs...
│ Format: Delta Lake │ ← partition by event_date
│ Deduped, nulls handled, typed │
└─────────────────────────────────────┘
↓ [Databricks / Synapse Spark]
┌─────────────────────────────────────┐
│ GOLD Layer (business aggregates) │ ← abfss://gold@datalake.dfs...
│ Format: Delta Lake │ ← partition by region/product
│ Fact + Dimension tables, KPIs │
└─────────────────────────────────────┘
↓ [Synapse SQL Pool / Power BI]
Dashboards / Reports / ML Models
# ── READ PATTERN FOR EACH LAYER ───────────────────────────────────────────────
# Setup (done once per session in production via cluster config or Managed Identity)
spark.conf.set(
"fs.azure.account.auth.type.datalakeprod.dfs.core.windows.net", "OAuth"
)
spark.conf.set(
"fs.azure.account.oauth.provider.type.datalakeprod.dfs.core.windows.net",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"
)
spark.conf.set(
"fs.azure.account.oauth2.client.id.datalakeprod.dfs.core.windows.net",
dbutils.secrets.get("kv-scope", "sp-client-id")
)
spark.conf.set(
"fs.azure.account.oauth2.client.secret.datalakeprod.dfs.core.windows.net",
dbutils.secrets.get("kv-scope", "sp-secret")
)
spark.conf.set(
"fs.azure.account.oauth2.client.endpoint.datalakeprod.dfs.core.windows.net",
"https://login.microsoftonline.com/<tenant-id>/oauth2/token"
)
BASE = "abfss://{layer}@datalakeprod.dfs.core.windows.net"
# ── BRONZE: read raw files ─────────────────────────────────────────────────────
bronze_csv = spark.read \
.option("header", True) \
.option("mode", "PERMISSIVE") \
.option("columnNameOfCorruptRecord", "_corrupt") \
.csv(f"{BASE.format(layer='bronze')}/raw/orders/ingest_date=2024-01-15/")
# ── SILVER: read cleaned Delta ─────────────────────────────────────────────────
silver_orders = spark.read \
.format("delta") \
.load(f"{BASE.format(layer='silver')}/orders/") \
.filter(col("event_date") >= "2024-01-01") # partition pruning
# Time travel: audit/debugging
silver_yesterday = spark.read.format("delta") \
.option("versionAsOf", 10) \
.load(f"{BASE.format(layer='silver')}/orders/")
# ── GOLD: read aggregated Delta for reporting ──────────────────────────────────
gold_kpis = spark.read \
.format("delta") \
.load(f"{BASE.format(layer='gold')}/fact_daily_revenue/") \
.filter(col("region") == "APAC")
# ── WRITE BACK TO SILVER (after transformation) ────────────────────────────────
from delta.tables import DeltaTable
DeltaTable.forPath(
spark,
f"{BASE.format(layer='silver')}/orders/"
).alias("target").merge(
bronze_csv.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
| Storage Type | Protocol | Endpoint suffix |
|---|---|---|
| Azure Blob (classic) | wasbs:// | .blob.core.windows.net |
| ADLS Gen1 | adl:// | .azuredatalakestore.net |
| ADLS Gen2 (modern) | abfss:// | .dfs.core.windows.net |
| Azure Files | (SMB/NFS) | .file.core.windows.net |
Q47 — Add to tracking table
Update the master tracking table at the top of this file:
| Q41 | Read from Azure Blob Storage (wasbs://) | Azure | S | wasbs/account-key/SAS/SP | ✓ | | Q42 | Read from ADLS Gen2 (abfss://) | Azure | S | abfss/OAuth/SP/MI | ✓ | | Q43 | Mount ADLS Gen2 in Databricks | Databricks | S | dbutils.fs.mount/secrets | ✓ | | Q44 | Read from Azure Synapse SQL Pool | Azure | S | JDBC/Synapse connector | ✓ | | Q45 | Read from Azure Event Hubs (Kafka) | Azure | S | Kafka/Structured Streaming | ✓ | | Q46 | Medallion architecture + full read pattern | Databricks | S | Bronze/Silver/Gold layers | ✓ |