File 04: ETL Scenarios & Pipeline Design
Level: Senior/Lead (10+ years) — Real-world production scenarios Focus: SCD implementations, CDC, Medallion, DLT, end-to-end pipeline design
SECTION 1: SCD (SLOWLY CHANGING DIMENSION) IMPLEMENTATIONS
Q1: Explain all SCD Types. When would you use each?
Answer:
| SCD Type | Strategy | History? | Use Case |
|---|---|---|---|
| Type 0 | Never update | No | Static reference data (country codes) |
| Type 1 | Overwrite | No | Corrections, non-historical attributes |
| Type 2 | Add new row + close old | Full history | Customer address, pricing tiers |
| Type 3 | Previous value column | Limited (1 prior) | When only the previous value matters |
| Type 4 | Separate history table | Full history | Separate current + history tables |
| Type 6 | Hybrid (1+2+3) | Full + current flag + previous | Complex dimensional modeling |
Q2: Implement SCD Type 1 (Overwrite) in Databricks.
Answer:
MERGE INTO dim_customer t
USING staging_customer s
ON t.customer_id = s.customer_id
WHEN MATCHED THEN
UPDATE SET
t.name = s.name,
t.email = s.email,
t.address = s.address,
t.phone = s.phone,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, address, phone, created_at, updated_at)
VALUES (s.customer_id, s.name, s.email, s.address, s.phone,
current_timestamp(), current_timestamp())
Q3: Implement SCD Type 2 (Full History) — This is the #1 most asked question.
Answer:
Table structure:
CREATE TABLE dim_customer (
surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,
customer_id STRING, -- Business key
name STRING,
email STRING,
address STRING,
hash_value STRING, -- Hash of tracked columns for change detection
effective_start TIMESTAMP,
effective_end TIMESTAMP,
is_current BOOLEAN
) USING DELTA;
Full PySpark implementation:
from delta.tables import DeltaTable
from pyspark.sql.functions import *
# Step 1: Prepare source with hash for change detection
source_df = spark.table("staging_customer").withColumn(
"hash_value", md5(concat_ws("||", col("name"), col("email"), col("address")))
)
# Step 2: Get current target records
target = DeltaTable.forName(spark, "dim_customer")
target_df = target.toDF().filter("is_current = true")
# Step 3: Identify changes (new + changed records)
changes = source_df.alias("s").join(
target_df.alias("t"),
col("s.customer_id") == col("t.customer_id"),
"left"
).filter(
col("t.customer_id").isNull() | # New records
(col("s.hash_value") != col("t.hash_value")) # Changed records
).select("s.*")
# Step 4: THE MERGE KEY TRICK
# For closing old records: use real customer_id as merge_key (matches target)
# For inserting new current records: set merge_key = NULL (never matches → goes to NOT MATCHED)
# Rows that need to CLOSE existing records (update is_current = false)
close_records = changes.filter(
col("customer_id").isin(
[row.customer_id for row in target_df.join(changes, "customer_id", "inner")
.select("customer_id").distinct().collect()]
)
).withColumn("merge_key", col("customer_id"))
# Rows that need to INSERT as new current records
insert_records = changes.withColumn("merge_key", lit(None).cast("string"))
# Combine: close_records will MATCH (update), insert_records will NOT MATCH (insert)
staged = close_records.unionByName(insert_records)
# Step 5: Execute MERGE
target.alias("t").merge(
staged.alias("s"),
"t.customer_id = s.merge_key AND t.is_current = true"
).whenMatchedUpdate(set={
"is_current": lit(False),
"effective_end": current_timestamp()
}).whenNotMatchedInsert(values={
"customer_id": col("s.customer_id"),
"name": col("s.name"),
"email": col("s.email"),
"address": col("s.address"),
"hash_value": col("s.hash_value"),
"effective_start": current_timestamp(),
"effective_end": lit("9999-12-31 23:59:59").cast("timestamp"),
"is_current": lit(True)
}).execute()
Q4: Explain the "Merge Key Trick" for SCD Type 2. Why is it needed?
Answer: The trick solves a fundamental problem: for a changed record, you need to BOTH:
- UPDATE the existing row (set
is_current=false,effective_end=now) - INSERT a new row (the new current version)
But MERGE can only do one action per matched source row.
The trick:
- Create TWO rows in the staged source for each change:
- Row 1:
merge_key = customer_id→ this will MATCH the target → UPDATE (close old record) - Row 2:
merge_key = NULL→ NULL never matches → goes to NOT MATCHED → INSERT (new current record)
- Row 1:
- Union both and execute a single MERGE
Alternative approach — Two-step MERGE:
# Step 1: Close old records
target.alias("t").merge(
changes.alias("s"),
"t.customer_id = s.customer_id AND t.is_current = true"
).whenMatchedUpdate(set={
"is_current": lit(False),
"effective_end": current_timestamp()
}).execute()
# Step 2: Insert new current records
changes.withColumn("effective_start", current_timestamp()) \
.withColumn("effective_end", lit("9999-12-31").cast("timestamp")) \
.withColumn("is_current", lit(True)) \
.write.format("delta").mode("append").saveAsTable("dim_customer")
Q5: Implement SCD Type 2 using Delta Live Tables (DLT) — the modern approach.
Answer:
import dlt
from pyspark.sql.functions import col, expr
# Bronze: raw CDC events
@dlt.table(comment="Raw customer CDC events")
def bronze_customer_cdc():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/schema/customer_cdc")
.load("/landing/customer_cdc/")
)
# Silver: SCD Type 2 using apply_changes
dlt.create_streaming_table("dim_customer_scd2")
dlt.apply_changes(
target="dim_customer_scd2",
source="bronze_customer_cdc",
keys=["customer_id"],
sequence_by=col("updated_at"),
apply_as_deletes=expr("operation = 'DELETE'"),
except_column_list=["operation", "_rescued_data"],
stored_as_scd_type=2 # <-- This is the magic
)
# DLT automatically manages __START_AT, __END_AT, __IS_CURRENT columns
Why this is superior:
- No manual merge-key trick
- Handles out-of-order events via
sequence_by - Handles deletes natively
- Automatic state management
- Built-in data quality with expectations
Q6: Implement SCD Type 3 (Previous Value Column).
Answer:
MERGE INTO dim_customer t
USING staging_customer s
ON t.customer_id = s.customer_id
WHEN MATCHED AND t.address != s.address THEN
UPDATE SET
t.prev_address = t.address, -- Save current as previous
t.address = s.address, -- Update with new value
t.address_changed_date = current_timestamp(),
t.updated_at = current_timestamp()
WHEN MATCHED AND t.address = s.address THEN
UPDATE SET -- Other fields might change
t.name = s.name,
t.email = s.email,
t.updated_at = current_timestamp()
WHEN NOT MATCHED THEN
INSERT (customer_id, name, email, address, prev_address, created_at, updated_at)
VALUES (s.customer_id, s.name, s.email, s.address, NULL,
current_timestamp(), current_timestamp())
Q7: Scenario — Implement SCD Type 2 for a customer dimension with 500M records, receiving 2M daily updates. Design the solution at scale.
Answer:
Architecture: ┌─────────────┐ ┌───────────────┐ ┌──────────────────┐ │ Source DB │────▶│ CDC Pipeline │────▶│ dim_customer │ │ (500M rows) │ │ (2M changes) │ │ (500M+ rows) │ └─────────────┘ └───────────────┘ └──────────────────┘ Key Design Decisions: 1. PARTITIONING: Partition dim_customer by customer_id hash range - 500M rows / 1 GB per partition ≈ 500 partitions - Ensures MERGE only scans relevant partitions 2. Z-ORDERING: ZORDER BY (customer_id) for data skipping during MERGE 3. CHANGE DETECTION: Use hash comparison - Hash only tracked columns (name, email, address) - Reduces comparison cost 4. SOURCE DEDUPLICATION: - 2M daily changes may have duplicates - Deduplicate BEFORE merge (keep latest per customer_id) 5. MERGE OPTIMIZATION: - Add partition column to MERGE condition - Broadcast the 2M source (it's small relative to 500M) - Use Photon runtime 6. COMPACTION: Run OPTIMIZE weekly - Z-ORDER BY (customer_id, is_current) 7. MONITORING: - Track merge duration, rows affected - Alert if > 10M changes (unusual spike) - Data quality: null checks, hash validation
# Optimized implementation
from delta.tables import DeltaTable
from pyspark.sql.functions import *
# Deduplicate source (keep latest)
w = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())
source = spark.table("staging_customer") \
.withColumn("rn", row_number().over(w)) \
.filter(col("rn") == 1).drop("rn") \
.withColumn("hash_value", md5(concat_ws("||", col("name"), col("email"), col("address"))))
# Get only changed records (compare hashes)
target_current = spark.table("dim_customer").filter("is_current = true") \
.select("customer_id", col("hash_value").alias("target_hash"))
changes = source.join(target_current, "customer_id", "left") \
.filter(col("target_hash").isNull() | (col("hash_value") != col("target_hash"))) \
.drop("target_hash")
# Execute SCD Type 2 merge (using the merge-key trick)
# ... (same pattern as Q3)
SECTION 2: CHANGE DATA CAPTURE (CDC)
Q8: Design a CDC pipeline from Oracle/MySQL to Delta Lake.
Answer:
┌──────────┐ ┌──────────┐ ┌───────┐ ┌──────────────┐ ┌──────────────┐
│ Oracle │────▶│ Debezium │────▶│ Kafka │────▶│ Auto Loader │────▶│ Bronze │
│ (source) │ │ (CDC) │ │ │ │ (ingestion) │ │ (raw events) │
└──────────┘ └──────────┘ └───────┘ └──────────────┘ └──────┬───────┘
│
┌───────▼───────┐
│ Silver │
│ (MERGE/SCD) │
└───────┬───────┘
│
┌───────▼───────┐
│ Gold │
│ (aggregated) │
└───────────────┘
1 minute") \
.start()"># Bronze: Ingest raw CDC events from Kafka
raw_cdc = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "oracle.schema.customers") \
.load() \
.select(
from_json(col("value").cast("string"), cdc_schema).alias("data"),
col("timestamp").alias("kafka_timestamp")
).select("data.*", "kafka_timestamp")
raw_cdc.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/bronze_cdc") \
.table("bronze_customer_cdc")
# Silver: Apply CDC using foreachBatch
def apply_cdc_changes(batch_df, batch_id):
from delta.tables import DeltaTable
# Deduplicate within batch (keep latest per key)
w = Window.partitionBy("after.id").orderBy(col("ts_ms").desc())
deduped = batch_df.withColumn("rn", row_number().over(w)) \
.filter("rn = 1").drop("rn")
target = DeltaTable.forName(spark, "silver_customers")
target.alias("t").merge(
deduped.alias("s"),
"t.customer_id = s.after.id"
).whenMatchedDelete(
condition="s.op = 'd'" # Delete
).whenMatchedUpdate(
condition="s.op = 'u'", # Update
set={
"name": "s.after.name",
"email": "s.after.email",
"address": "s.after.address",
"updated_at": "s.ts_ms"
}
).whenNotMatchedInsert(
condition="s.op IN ('c', 'r')", # Create / Read (snapshot)
values={
"customer_id": "s.after.id",
"name": "s.after.name",
"email": "s.after.email",
"address": "s.after.address",
"created_at": "s.ts_ms",
"updated_at": "s.ts_ms"
}
).execute()
spark.readStream.table("bronze_customer_cdc") \
.writeStream.foreachBatch(apply_cdc_changes) \
.option("checkpointLocation", "/checkpoints/silver_customers") \
.trigger(processingTime="1 minute") \
.start()
Q9: What is Delta Lake Change Data Feed (CDF)? How is it different from CDC?
Answer:
| Aspect | CDC (from source DB) | CDF (from Delta Lake) |
|---|---|---|
| Source | External database | Delta Lake table |
| Captures | Changes at source DB | Changes at Delta table |
| Use case | Ingesting external changes | Propagating Delta changes downstream |
| Mechanism | Debezium, DMS, etc. | Built into Delta Lake |
Enable CDF:
ALTER TABLE my_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Read changes:
# By version range
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.option("endingVersion", 10) \
.table("my_table")
# By timestamp range
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2025-01-01") \
.option("endingTimestamp", "2025-01-31") \
.table("my_table")
# Streaming (incremental)
changes = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("my_table")
CDF columns:
| Column | Values |
|---|---|
_change_type | insert, update_preimage, update_postimage, delete |
_commit_version | Delta table version |
_commit_timestamp | When the change was committed |
Q10: Debezium CDC event types — what do they mean?
Answer:
Operation (op) | Meaning | When |
|---|---|---|
r | Read (snapshot) | Initial full load |
c | Create | New row inserted |
u | Update | Row updated |
d | Delete | Row deleted |
Each event has:
before: Row state before the change (null for inserts)after: Row state after the change (null for deletes)ts_ms: Timestamp of the changesource: Source metadata (database, table, position)
SECTION 3: MEDALLION ARCHITECTURE
Q11: Design a medallion architecture. What are the design decisions for each layer?
Answer:
BRONZE (Raw Ingestion):
# Design decisions:
# - Append-only (NEVER update or delete raw data)
# - Include ingestion metadata
# - Partition by ingestion date (not business date)
# - Schema: flexible (can store as strings or use schema inference)
# - Retention: long (compliance, reprocessing)
bronze_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", schema_loc) \
.load(landing_path) \
.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source_file", input_file_name()) \
.withColumn("_batch_id", lit(batch_id))
bronze_df.writeStream.format("delta") \
.partitionBy("_ingested_date") \
.option("checkpointLocation", checkpoint) \
.table("bronze_orders")
SILVER (Cleansed, Conformed):
# Design decisions:
# - Apply data quality checks
# - Deduplicate
# - Standardize data types and values
# - Join reference data
# - Apply SCD logic here
# - Enable CDF for downstream
# - Partition by business date
silver_df = spark.readStream.table("bronze_orders") \
.dropDuplicates(["order_id"]) \
.filter(col("order_id").isNotNull()) \
.filter(col("amount") > 0) \
.withColumn("amount", col("amount").cast("decimal(10,2)")) \
.withColumn("order_date", to_date(col("order_date_str"), "yyyy-MM-dd"))
GOLD (Business-Level Aggregates):
# Design decisions:
# - Pre-aggregated for specific use cases
# - Denormalized (star schema or wide tables)
# - Heavily optimized (Z-ORDER, OPTIMIZE, caching)
# - Lower volume, higher query performance
gold_df = spark.table("silver_orders") \
.groupBy("order_date", "category", "region") \
.agg(
count("*").alias("order_count"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
countDistinct("customer_id").alias("unique_customers")
)
Q12: When would you deviate from the standard medallion pattern?
Answer:
- Skip Silver: Simple use cases with minimal transformation (raw → gold)
- Real-time "hot path": Streaming direct to serving layer, bypassing medallion
- ML Feature Layer: Separate "feature store" alongside gold
- Reverse ETL: Gold feeds back to operational systems (CRM, marketing tools)
- "Platinum" layer: Some orgs add a presentation/BI-specific layer
- Domain-specific layers: In data mesh, each domain owns its own medallion stack
Q13: How do you handle data quality between medallion layers?
Answer:
# Pattern: Quarantine bad records, pass good records
from pyspark.sql.functions import col, when
bronze_df = spark.read.table("bronze_orders")
# Define quality rules
quality_rules = {
"valid_order_id": col("order_id").isNotNull(),
"positive_amount": col("amount") > 0,
"valid_date": col("order_date").isNotNull() & (col("order_date") >= "2020-01-01"),
"valid_status": col("status").isin("pending", "completed", "cancelled", "shipped"),
}
# Apply all rules
quality_check = bronze_df
for rule_name, condition in quality_rules.items():
quality_check = quality_check.withColumn(
f"_rule_{rule_name}", when(condition, True).otherwise(False)
)
# All rules must pass
all_rules = [f"_rule_{name}" for name in quality_rules]
quality_check = quality_check.withColumn(
"_is_valid",
reduce(lambda a, b: a & b, [col(r) for r in all_rules])
)
# Route records
good_records = quality_check.filter("_is_valid = true").drop(*all_rules, "_is_valid")
bad_records = quality_check.filter("_is_valid = false")
good_records.write.format("delta").mode("append").saveAsTable("silver_orders")
bad_records.write.format("delta").mode("append").saveAsTable("quarantine_orders")
SECTION 4: AUTO LOADER
Q14: What is Auto Loader? How does it work?
Answer: Auto Loader incrementally processes new data files as they arrive in cloud storage.
Two modes:
| Aspect | Directory Listing | File Notification |
|---|---|---|
| How | Lists directory for new files | Cloud events (S3 SNS/SQS, ADLS Event Grid) |
| Setup | Zero configuration | Requires cloud infrastructure |
| Latency | Higher (polling interval) | Near real-time |
| Cost | Can be expensive for large directories | Minimal listing costs |
| Scale | Slower for millions of files | Handles billions of files |
| Recommendation | Dev/test, small directories | Production, high-volume |
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/schema/orders") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.useNotifications", "true") # File notification mode \
.load("/landing/orders/")
Q15: How does Auto Loader handle schema evolution?
Answer:
# Schema evolution modes
df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/schema/orders") \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") # or rescue, failOnNewColumns, none
.load("/landing/orders/")
| Mode | Behavior | Use Case |
|---|---|---|
addNewColumns | Adds new columns, restarts stream | Schema evolves frequently |
rescue | New/unexpected columns go to _rescued_data | Don't want restarts |
failOnNewColumns | Pipeline fails when schema changes | Strict schema control |
none | Ignore new columns | Fixed schema |
Schema hints (force specific types):
.option("cloudFiles.schemaHints", "order_id LONG, amount DECIMAL(10,2)")
Q16: Auto Loader vs COPY INTO — when do you use each?
Answer:
| Aspect | Auto Loader | COPY INTO |
|---|---|---|
| Processing | Streaming (incremental) | Batch (one-time or scheduled) |
| State tracking | Checkpoint-based | Table metadata |
| File discovery | Efficient (notification/listing) | Full directory listing every time |
| Schema evolution | Built-in | Manual |
| Scale | Better for large directories | Slower for millions of files |
| Idempotency | Exactly-once via checkpoints | Exactly-once via file tracking |
| Recommendation | Preferred for most use cases | Simple one-time loads |
SECTION 5: DELTA LIVE TABLES (DLT)
Q17: What are the three DLT expectation levels?
Answer:
import dlt
@dlt.table
@dlt.expect("valid_id", "order_id IS NOT NULL") # Log violation, keep row
@dlt.expect_or_drop("positive_amount", "amount > 0") # Drop violating rows
@dlt.expect_or_fail("valid_status", "status IN ('A','B','C')") # Fail entire pipeline
def silver_orders():
return dlt.read_stream("bronze_orders")
| Decorator | On Violation | Use Case |
|---|---|---|
@dlt.expect | Records in metrics, keeps the row | Monitoring, soft quality |
@dlt.expect_or_drop | Drops the row silently | Filter bad data |
@dlt.expect_or_fail | Fails the entire pipeline | Critical data quality |
Q18: What is the difference between a streaming live table, a materialized view, and a view in DLT?
Answer:
| Concept | Processing | Use Case |
|---|---|---|
| Streaming Live Table | Append-only, incremental | New data ingestion (Bronze, CDC) |
| Materialized View | Precomputed, incrementally updated | Aggregations, enriched data (Silver, Gold) |
| View | Virtual, computed on read | Intermediate transformations, no storage |
# Streaming Live Table (Bronze — append-only)
@dlt.table
def bronze_events():
return spark.readStream.format("cloudFiles").load(path)
# Materialized View (Gold — automatically incremental)
@dlt.table
def gold_daily_metrics():
return dlt.read("silver_events") \
.groupBy("date").agg(count("*").alias("event_count"))
# View (intermediate, not materialized)
@dlt.view
def enriched_events():
return dlt.read("bronze_events").join(dim_table, "key")
Q19: How do you implement SCD Type 2 in DLT? (Compare with manual approach)
Answer:
# DLT approach — 5 lines vs 50+ lines manually
dlt.create_streaming_table("dim_customer_scd2")
dlt.apply_changes(
target="dim_customer_scd2",
source="bronze_customer_cdc",
keys=["customer_id"], # Business key
sequence_by=col("updated_at"), # Ordering column
apply_as_deletes=expr("operation = 'DELETE'"), # Delete condition
except_column_list=["operation", "_rescued_data"],
stored_as_scd_type=2 # SCD Type 2
)
# DLT auto-manages: __START_AT, __END_AT, __IS_CURRENT columns
Why DLT is better for SCD Type 2:
- No merge-key trick needed
- Handles out-of-order events automatically
- Handles deletes natively
- Incremental processing built-in
- Data quality expectations on the same pipeline
SECTION 6: SCENARIO-BASED PIPELINE DESIGN
Q20: Scenario — Design a pipeline for 10 billion clickstream events/day from Kafka.
Answer:
Q21: Scenario — Your daily batch pipeline takes 8 hours. Business wants it under 2 hours. How do you optimize?
Answer:
Q22: Scenario — Design a data mesh on Databricks.
Answer:
Q23: Scenario — Design a real-time fraud detection pipeline.
Answer:
5 minutes") \
.withColumn("txn_count_1h", count("*").over(w_1h)) \
.withColumn("total_amount_1h", sum("amount").over(w_1h)) \
.withColumn("txn_count_24h", count("*").over(w_24h)) \
.withColumn("avg_amount_24h", avg("amount").over(w_24h)) \
.withColumn("amount_deviation", (col("amount") - col("avg_amount_24h")) / col("avg_amount_24h"))
# 3. ML Scoring (using foreachBatch for model inference)
def score_and_alert(batch_df, batch_id):
import mlflow
model = mlflow.pyfunc.load_model("models:/fraud_model/Production")
# Score
scored = batch_df.toPandas()
scored["fraud_score"] = model.predict(scored[feature_cols])
scored_df = spark.createDataFrame(scored)
# Write all scored transactions
scored_df.write.format("delta").mode("append").saveAsTable("silver_scored_transactions")
# Alert on high-risk
alerts = scored_df.filter(col("fraud_score") > 0.8)
alerts.write.format("delta").mode("append").saveAsTable("gold_fraud_alerts")
# Trigger webhook for real-time blocking
if alerts.count() > 0:
send_webhook(alerts.collect())
features.writeStream.foreachBatch(score_and_alert) \
.option("checkpointLocation", "/checkpoints/fraud") \
.trigger(processingTime="10 seconds") \
.start()"># Architecture:
# Transaction stream (Kafka) → Bronze → Feature Engineering → ML Scoring → Alert
# 1. Bronze: Raw transactions
transactions = spark.readStream.format("kafka") \
.option("subscribe", "transactions") \
.load() \
.select(from_json(col("value").cast("string"), txn_schema).alias("txn")) \
.select("txn.*")
# 2. Feature Engineering: Rolling aggregates
w_1h = Window.partitionBy("card_id").orderBy("event_time").rangeBetween(-3600, 0)
w_24h = Window.partitionBy("card_id").orderBy("event_time").rangeBetween(-86400, 0)
features = transactions \
.withWatermark("event_time", "5 minutes") \
.withColumn("txn_count_1h", count("*").over(w_1h)) \
.withColumn("total_amount_1h", sum("amount").over(w_1h)) \
.withColumn("txn_count_24h", count("*").over(w_24h)) \
.withColumn("avg_amount_24h", avg("amount").over(w_24h)) \
.withColumn("amount_deviation", (col("amount") - col("avg_amount_24h")) / col("avg_amount_24h"))
# 3. ML Scoring (using foreachBatch for model inference)
def score_and_alert(batch_df, batch_id):
import mlflow
model = mlflow.pyfunc.load_model("models:/fraud_model/Production")
# Score
scored = batch_df.toPandas()
scored["fraud_score"] = model.predict(scored[feature_cols])
scored_df = spark.createDataFrame(scored)
# Write all scored transactions
scored_df.write.format("delta").mode("append").saveAsTable("silver_scored_transactions")
# Alert on high-risk
alerts = scored_df.filter(col("fraud_score") > 0.8)
alerts.write.format("delta").mode("append").saveAsTable("gold_fraud_alerts")
# Trigger webhook for real-time blocking
if alerts.count() > 0:
send_webhook(alerts.collect())
features.writeStream.foreachBatch(score_and_alert) \
.option("checkpointLocation", "/checkpoints/fraud") \
.trigger(processingTime="10 seconds") \
.start()
Q24: Scenario — You need to backfill 2 years of historical data, then switch to incremental. Design this.
Answer:
# Phase 1: Historical Backfill (batch)
# Process month by month to manage memory
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
start_date = datetime(2023, 1, 1)
end_date = datetime(2025, 1, 1)
current = start_date
while current < end_date:
next_month = current + relativedelta(months=1)
# Read one month at a time
month_df = spark.read.parquet(f"/raw/data/year={current.year}/month={current.month:02d}/")
# Transform
transformed = apply_transformations(month_df)
# Write idempotently using replaceWhere
transformed.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"year = {current.year} AND month = {current.month}") \
.saveAsTable("silver_table")
current = next_month
# Optimize after backfill
spark.sql("OPTIMIZE silver_table ZORDER BY (customer_id)")
# Phase 2: Switch to Incremental (streaming)
spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "parquet") \
.load("/raw/data/") \
.writeStream.format("delta") \
.option("checkpointLocation", "/checkpoints/silver") \
.trigger(availableNow=True) \ # Run as scheduled batch
.table("silver_table")
Q25: Scenario — Your MERGE statement takes 3 hours. How do you optimize it?
Answer:
Q26: Scenario — Multiple teams need access to the same data with different views. How?
Answer: Using Unity Catalog's governance features:
-- Base table accessible to data engineering
CREATE TABLE catalog.schema.customer_data (...);
-- Marketing team: sees masked PII
CREATE FUNCTION mask_email(email STRING)
RETURN CASE
WHEN is_member('data_engineering') THEN email
ELSE regexp_replace(email, '(.).+(@.+)', '$1***$2')
END;
ALTER TABLE customer_data ALTER COLUMN email SET MASK mask_email;
-- Sales team: only sees their region
CREATE FUNCTION region_filter(region STRING)
RETURN region = current_user_region() OR is_member('admin');
ALTER TABLE customer_data SET ROW FILTER region_filter ON (region);
-- Analytics team: aggregated gold view (no PII)
CREATE VIEW gold_customer_metrics AS
SELECT region, COUNT(*) as customers, AVG(lifetime_value) as avg_ltv
FROM customer_data
GROUP BY region;
Q27: How do you design an idempotent pipeline?
Answer:
# Principle: Re-running the pipeline produces the SAME result
# 1. Use MERGE instead of INSERT (handles reruns)
target.alias("t").merge(source.alias("s"), "t.id = s.id") \
.whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# 2. Use replaceWhere for partition-level overwrites
df.write.format("delta").mode("overwrite") \
.option("replaceWhere", "date = '2025-01-15'") \
.saveAsTable("orders")
# Re-running replaces the same partition — idempotent
# 3. Streaming: checkpoints ensure exactly-once
stream.writeStream.option("checkpointLocation", "/checkpoints/job1").start()
# Re-starting reads from last committed offset
# 4. Auto Loader: tracks processed files
spark.readStream.format("cloudFiles").load(path)
# Re-starting skips already-processed files
# 5. Batch jobs: use run_date parameter
def process_day(run_date):
df = spark.read.parquet(f"/raw/{run_date}/")
df.write.format("delta").mode("overwrite") \
.option("replaceWhere", f"process_date = '{run_date}'") \
.saveAsTable("silver_table")
Q28: How do you monitor pipeline health in production?
Answer:
# 1. Row count reconciliation
source_count = spark.read.table("bronze_orders").filter("date = '2025-01-15'").count()
silver_count = spark.read.table("silver_orders").filter("date = '2025-01-15'").count()
gold_count = spark.read.table("gold_daily_orders").filter("date = '2025-01-15'").count()
assert abs(source_count - silver_count) / source_count < 0.01, "Silver count drift > 1%"
# 2. Data freshness SLA
from pyspark.sql.functions import max as _max, current_timestamp, unix_timestamp
freshness = spark.table("silver_orders") \
.select(
(unix_timestamp(current_timestamp()) - unix_timestamp(_max("updated_at"))) / 60
).collect()[0][0] # minutes since last update
assert freshness < 60, f"Data is {freshness} minutes stale!"
# 3. Streaming metrics
for query in spark.streams.active:
progress = query.lastProgress
print(f"Input rate: {progress['inputRowsPerSecond']}")
print(f"Processing rate: {progress['processedRowsPerSecond']}")
print(f"Batch duration: {progress['batchDuration']}")
# Alert if processing rate < input rate (falling behind)
# 4. DLT expectation metrics (available in DLT event log)
expectations = spark.read.table("event_log") \
.filter("event_type = 'flow_progress'") \
.select("timestamp", "details:flow_progress:data_quality:*")