🧱
Databricks
File 04: ETL Scenarios & Pipeline Design
🧱
🧱
Databricks · Section 5 of 17

File 04: ETL Scenarios & Pipeline Design

File 04: ETL Scenarios & Pipeline Design

Level: Senior/Lead (10+ years) — Real-world production scenarios Focus: SCD implementations, CDC, Medallion, DLT, end-to-end pipeline design

SECTION 1: SCD (SLOWLY CHANGING DIMENSION) IMPLEMENTATIONS

Q1: Explain all SCD Types. When would you use each?

Answer:

SCD TypeStrategyHistory?Use Case
Type 0Never updateNoStatic reference data (country codes)
Type 1OverwriteNoCorrections, non-historical attributes
Type 2Add new row + close oldFull historyCustomer address, pricing tiers
Type 3Previous value columnLimited (1 prior)When only the previous value matters
Type 4Separate history tableFull historySeparate current + history tables
Type 6Hybrid (1+2+3)Full + current flag + previousComplex dimensional modeling

Q2: Implement SCD Type 1 (Overwrite) in Databricks.

Answer:

sql
MERGE INTO dim_customer t
USING staging_customer s
ON t.customer_id = s.customer_id

WHEN MATCHED THEN
    UPDATE SET
        t.name = s.name,
        t.email = s.email,
        t.address = s.address,
        t.phone = s.phone,
        t.updated_at = current_timestamp()

WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, address, phone, created_at, updated_at)
    VALUES (s.customer_id, s.name, s.email, s.address, s.phone,
            current_timestamp(), current_timestamp())

Q3: Implement SCD Type 2 (Full History) — This is the #1 most asked question.

Answer:

Table structure:

sql
CREATE TABLE dim_customer (
    surrogate_key BIGINT GENERATED ALWAYS AS IDENTITY,
    customer_id STRING,          -- Business key
    name STRING,
    email STRING,
    address STRING,
    hash_value STRING,           -- Hash of tracked columns for change detection
    effective_start TIMESTAMP,
    effective_end TIMESTAMP,
    is_current BOOLEAN
) USING DELTA;

Full PySpark implementation:

python — editable
from delta.tables import DeltaTable
from pyspark.sql.functions import *

# Step 1: Prepare source with hash for change detection
source_df = spark.table("staging_customer").withColumn(
    "hash_value", md5(concat_ws("||", col("name"), col("email"), col("address")))
)

# Step 2: Get current target records
target = DeltaTable.forName(spark, "dim_customer")
target_df = target.toDF().filter("is_current = true")

# Step 3: Identify changes (new + changed records)
changes = source_df.alias("s").join(
    target_df.alias("t"),
    col("s.customer_id") == col("t.customer_id"),
    "left"
).filter(
    col("t.customer_id").isNull() |           # New records
    (col("s.hash_value") != col("t.hash_value"))  # Changed records
).select("s.*")

# Step 4: THE MERGE KEY TRICK
# For closing old records: use real customer_id as merge_key (matches target)
# For inserting new current records: set merge_key = NULL (never matches → goes to NOT MATCHED)

# Rows that need to CLOSE existing records (update is_current = false)
close_records = changes.filter(
    col("customer_id").isin(
        [row.customer_id for row in target_df.join(changes, "customer_id", "inner")
         .select("customer_id").distinct().collect()]
    )
).withColumn("merge_key", col("customer_id"))

# Rows that need to INSERT as new current records
insert_records = changes.withColumn("merge_key", lit(None).cast("string"))

# Combine: close_records will MATCH (update), insert_records will NOT MATCH (insert)
staged = close_records.unionByName(insert_records)

# Step 5: Execute MERGE
target.alias("t").merge(
    staged.alias("s"),
    "t.customer_id = s.merge_key AND t.is_current = true"
).whenMatchedUpdate(set={
    "is_current": lit(False),
    "effective_end": current_timestamp()
}).whenNotMatchedInsert(values={
    "customer_id": col("s.customer_id"),
    "name": col("s.name"),
    "email": col("s.email"),
    "address": col("s.address"),
    "hash_value": col("s.hash_value"),
    "effective_start": current_timestamp(),
    "effective_end": lit("9999-12-31 23:59:59").cast("timestamp"),
    "is_current": lit(True)
}).execute()

Q4: Explain the "Merge Key Trick" for SCD Type 2. Why is it needed?

Answer: The trick solves a fundamental problem: for a changed record, you need to BOTH:

  1. UPDATE the existing row (set is_current=false, effective_end=now)
  2. INSERT a new row (the new current version)

But MERGE can only do one action per matched source row.

The trick:

  • Create TWO rows in the staged source for each change:
    • Row 1: merge_key = customer_id → this will MATCH the target → UPDATE (close old record)
    • Row 2: merge_key = NULL → NULL never matches → goes to NOT MATCHEDINSERT (new current record)
  • Union both and execute a single MERGE

Alternative approach — Two-step MERGE:

python — editable
# Step 1: Close old records
target.alias("t").merge(
    changes.alias("s"),
    "t.customer_id = s.customer_id AND t.is_current = true"
).whenMatchedUpdate(set={
    "is_current": lit(False),
    "effective_end": current_timestamp()
}).execute()

# Step 2: Insert new current records
changes.withColumn("effective_start", current_timestamp()) \
    .withColumn("effective_end", lit("9999-12-31").cast("timestamp")) \
    .withColumn("is_current", lit(True)) \
    .write.format("delta").mode("append").saveAsTable("dim_customer")

Q5: Implement SCD Type 2 using Delta Live Tables (DLT) — the modern approach.

Answer:

python — editable
import dlt
from pyspark.sql.functions import col, expr

# Bronze: raw CDC events
@dlt.table(comment="Raw customer CDC events")
def bronze_customer_cdc():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/schema/customer_cdc")
        .load("/landing/customer_cdc/")
    )

# Silver: SCD Type 2 using apply_changes
dlt.create_streaming_table("dim_customer_scd2")

dlt.apply_changes(
    target="dim_customer_scd2",
    source="bronze_customer_cdc",
    keys=["customer_id"],
    sequence_by=col("updated_at"),
    apply_as_deletes=expr("operation = 'DELETE'"),
    except_column_list=["operation", "_rescued_data"],
    stored_as_scd_type=2                 # <-- This is the magic
)
# DLT automatically manages __START_AT, __END_AT, __IS_CURRENT columns

Why this is superior:

  • No manual merge-key trick
  • Handles out-of-order events via sequence_by
  • Handles deletes natively
  • Automatic state management
  • Built-in data quality with expectations

Q6: Implement SCD Type 3 (Previous Value Column).

Answer:

sql
MERGE INTO dim_customer t
USING staging_customer s
ON t.customer_id = s.customer_id

WHEN MATCHED AND t.address != s.address THEN
    UPDATE SET
        t.prev_address = t.address,        -- Save current as previous
        t.address = s.address,              -- Update with new value
        t.address_changed_date = current_timestamp(),
        t.updated_at = current_timestamp()

WHEN MATCHED AND t.address = s.address THEN
    UPDATE SET                              -- Other fields might change
        t.name = s.name,
        t.email = s.email,
        t.updated_at = current_timestamp()

WHEN NOT MATCHED THEN
    INSERT (customer_id, name, email, address, prev_address, created_at, updated_at)
    VALUES (s.customer_id, s.name, s.email, s.address, NULL,
            current_timestamp(), current_timestamp())

Q7: Scenario — Implement SCD Type 2 for a customer dimension with 500M records, receiving 2M daily updates. Design the solution at scale.

Answer:

📐 Architecture Diagram
Architecture:
┌─────────────┐     ┌───────────────┐     ┌──────────────────┐
│ Source DB    │────▶│ CDC Pipeline  │────▶│ dim_customer     │
│ (500M rows) │     │ (2M changes)  │     │ (500M+ rows)     │
└─────────────┘     └───────────────┘     └──────────────────┘

Key Design Decisions:

1. PARTITIONING: Partition dim_customer by customer_id hash range
   - 500M rows / 1 GB per partition ≈ 500 partitions
   - Ensures MERGE only scans relevant partitions

2. Z-ORDERING: ZORDER BY (customer_id) for data skipping during MERGE

3. CHANGE DETECTION: Use hash comparison
   - Hash only tracked columns (name, email, address)
   - Reduces comparison cost

4. SOURCE DEDUPLICATION:
   - 2M daily changes may have duplicates
   - Deduplicate BEFORE merge (keep latest per customer_id)

5. MERGE OPTIMIZATION:
   - Add partition column to MERGE condition
   - Broadcast the 2M source (it's small relative to 500M)
   - Use Photon runtime

6. COMPACTION: Run OPTIMIZE weekly
   - Z-ORDER BY (customer_id, is_current)

7. MONITORING:
   - Track merge duration, rows affected
   - Alert if > 10M changes (unusual spike)
   - Data quality: null checks, hash validation
python — editable
# Optimized implementation
from delta.tables import DeltaTable
from pyspark.sql.functions import *

# Deduplicate source (keep latest)
w = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())
source = spark.table("staging_customer") \
    .withColumn("rn", row_number().over(w)) \
    .filter(col("rn") == 1).drop("rn") \
    .withColumn("hash_value", md5(concat_ws("||", col("name"), col("email"), col("address"))))

# Get only changed records (compare hashes)
target_current = spark.table("dim_customer").filter("is_current = true") \
    .select("customer_id", col("hash_value").alias("target_hash"))

changes = source.join(target_current, "customer_id", "left") \
    .filter(col("target_hash").isNull() | (col("hash_value") != col("target_hash"))) \
    .drop("target_hash")

# Execute SCD Type 2 merge (using the merge-key trick)
# ... (same pattern as Q3)

SECTION 2: CHANGE DATA CAPTURE (CDC)

Q8: Design a CDC pipeline from Oracle/MySQL to Delta Lake.

Answer:

📐 Architecture Diagram
┌──────────┐     ┌──────────┐     ┌───────┐     ┌──────────────┐     ┌──────────────┐
│ Oracle   │────▶│ Debezium │────▶│ Kafka │────▶│ Auto Loader  │────▶│ Bronze       │
│ (source) │     │ (CDC)    │     │       │     │ (ingestion)  │     │ (raw events) │
└──────────┘     └──────────┘     └───────┘     └──────────────┘     └──────┬───────┘
                                                                            │
                                                                    ┌───────▼───────┐
                                                                    │ Silver        │
                                                                    │ (MERGE/SCD)   │
                                                                    └───────┬───────┘
                                                                            │
                                                                    ┌───────▼───────┐
                                                                    │ Gold          │
                                                                    │ (aggregated)  │
                                                                    └───────────────┘
python — editable
1 minute") \
    .start()"># Bronze: Ingest raw CDC events from Kafka
raw_cdc = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "oracle.schema.customers") \
    .load() \
    .select(
        from_json(col("value").cast("string"), cdc_schema).alias("data"),
        col("timestamp").alias("kafka_timestamp")
    ).select("data.*", "kafka_timestamp")

raw_cdc.writeStream.format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/checkpoints/bronze_cdc") \
    .table("bronze_customer_cdc")

# Silver: Apply CDC using foreachBatch
def apply_cdc_changes(batch_df, batch_id):
    from delta.tables import DeltaTable

    # Deduplicate within batch (keep latest per key)
    w = Window.partitionBy("after.id").orderBy(col("ts_ms").desc())
    deduped = batch_df.withColumn("rn", row_number().over(w)) \
        .filter("rn = 1").drop("rn")

    target = DeltaTable.forName(spark, "silver_customers")

    target.alias("t").merge(
        deduped.alias("s"),
        "t.customer_id = s.after.id"
    ).whenMatchedDelete(
        condition="s.op = 'd'"                    # Delete
    ).whenMatchedUpdate(
        condition="s.op = 'u'",                   # Update
        set={
            "name": "s.after.name",
            "email": "s.after.email",
            "address": "s.after.address",
            "updated_at": "s.ts_ms"
        }
    ).whenNotMatchedInsert(
        condition="s.op IN ('c', 'r')",           # Create / Read (snapshot)
        values={
            "customer_id": "s.after.id",
            "name": "s.after.name",
            "email": "s.after.email",
            "address": "s.after.address",
            "created_at": "s.ts_ms",
            "updated_at": "s.ts_ms"
        }
    ).execute()

spark.readStream.table("bronze_customer_cdc") \
    .writeStream.foreachBatch(apply_cdc_changes) \
    .option("checkpointLocation", "/checkpoints/silver_customers") \
    .trigger(processingTime="1 minute") \
    .start()

Q9: What is Delta Lake Change Data Feed (CDF)? How is it different from CDC?

Answer:

AspectCDC (from source DB)CDF (from Delta Lake)
SourceExternal databaseDelta Lake table
CapturesChanges at source DBChanges at Delta table
Use caseIngesting external changesPropagating Delta changes downstream
MechanismDebezium, DMS, etc.Built into Delta Lake

Enable CDF:

sql
ALTER TABLE my_table SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

Read changes:

python — editable
# By version range
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .option("endingVersion", 10) \
    .table("my_table")

# By timestamp range
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "2025-01-01") \
    .option("endingTimestamp", "2025-01-31") \
    .table("my_table")

# Streaming (incremental)
changes = spark.readStream.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 5) \
    .table("my_table")

CDF columns:

ColumnValues
_change_typeinsert, update_preimage, update_postimage, delete
_commit_versionDelta table version
_commit_timestampWhen the change was committed

Q10: Debezium CDC event types — what do they mean?

Answer:

Operation (op)MeaningWhen
rRead (snapshot)Initial full load
cCreateNew row inserted
uUpdateRow updated
dDeleteRow deleted

Each event has:

  • before: Row state before the change (null for inserts)
  • after: Row state after the change (null for deletes)
  • ts_ms: Timestamp of the change
  • source: Source metadata (database, table, position)

SECTION 3: MEDALLION ARCHITECTURE

Q11: Design a medallion architecture. What are the design decisions for each layer?

Answer:

BRONZE (Raw Ingestion):

python — editable
# Design decisions:
# - Append-only (NEVER update or delete raw data)
# - Include ingestion metadata
# - Partition by ingestion date (not business date)
# - Schema: flexible (can store as strings or use schema inference)
# - Retention: long (compliance, reprocessing)

bronze_df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", schema_loc) \
    .load(landing_path) \
    .withColumn("_ingested_at", current_timestamp()) \
    .withColumn("_source_file", input_file_name()) \
    .withColumn("_batch_id", lit(batch_id))

bronze_df.writeStream.format("delta") \
    .partitionBy("_ingested_date") \
    .option("checkpointLocation", checkpoint) \
    .table("bronze_orders")

SILVER (Cleansed, Conformed):

python — editable
# Design decisions:
# - Apply data quality checks
# - Deduplicate
# - Standardize data types and values
# - Join reference data
# - Apply SCD logic here
# - Enable CDF for downstream
# - Partition by business date

silver_df = spark.readStream.table("bronze_orders") \
    .dropDuplicates(["order_id"]) \
    .filter(col("order_id").isNotNull()) \
    .filter(col("amount") > 0) \
    .withColumn("amount", col("amount").cast("decimal(10,2)")) \
    .withColumn("order_date", to_date(col("order_date_str"), "yyyy-MM-dd"))

GOLD (Business-Level Aggregates):

python — editable
# Design decisions:
# - Pre-aggregated for specific use cases
# - Denormalized (star schema or wide tables)
# - Heavily optimized (Z-ORDER, OPTIMIZE, caching)
# - Lower volume, higher query performance

gold_df = spark.table("silver_orders") \
    .groupBy("order_date", "category", "region") \
    .agg(
        count("*").alias("order_count"),
        sum("amount").alias("total_revenue"),
        avg("amount").alias("avg_order_value"),
        countDistinct("customer_id").alias("unique_customers")
    )

Q12: When would you deviate from the standard medallion pattern?

Answer:

  1. Skip Silver: Simple use cases with minimal transformation (raw → gold)
  2. Real-time "hot path": Streaming direct to serving layer, bypassing medallion
  3. ML Feature Layer: Separate "feature store" alongside gold
  4. Reverse ETL: Gold feeds back to operational systems (CRM, marketing tools)
  5. "Platinum" layer: Some orgs add a presentation/BI-specific layer
  6. Domain-specific layers: In data mesh, each domain owns its own medallion stack

Q13: How do you handle data quality between medallion layers?

Answer:

python — editable
# Pattern: Quarantine bad records, pass good records

from pyspark.sql.functions import col, when

bronze_df = spark.read.table("bronze_orders")

# Define quality rules
quality_rules = {
    "valid_order_id": col("order_id").isNotNull(),
    "positive_amount": col("amount") > 0,
    "valid_date": col("order_date").isNotNull() & (col("order_date") >= "2020-01-01"),
    "valid_status": col("status").isin("pending", "completed", "cancelled", "shipped"),
}

# Apply all rules
quality_check = bronze_df
for rule_name, condition in quality_rules.items():
    quality_check = quality_check.withColumn(
        f"_rule_{rule_name}", when(condition, True).otherwise(False)
    )

# All rules must pass
all_rules = [f"_rule_{name}" for name in quality_rules]
quality_check = quality_check.withColumn(
    "_is_valid",
    reduce(lambda a, b: a & b, [col(r) for r in all_rules])
)

# Route records
good_records = quality_check.filter("_is_valid = true").drop(*all_rules, "_is_valid")
bad_records = quality_check.filter("_is_valid = false")

good_records.write.format("delta").mode("append").saveAsTable("silver_orders")
bad_records.write.format("delta").mode("append").saveAsTable("quarantine_orders")

SECTION 4: AUTO LOADER

Q14: What is Auto Loader? How does it work?

Answer: Auto Loader incrementally processes new data files as they arrive in cloud storage.

Two modes:

AspectDirectory ListingFile Notification
HowLists directory for new filesCloud events (S3 SNS/SQS, ADLS Event Grid)
SetupZero configurationRequires cloud infrastructure
LatencyHigher (polling interval)Near real-time
CostCan be expensive for large directoriesMinimal listing costs
ScaleSlower for millions of filesHandles billions of files
RecommendationDev/test, small directoriesProduction, high-volume
python — editable
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/schema/orders") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.useNotifications", "true")  # File notification mode \
    .load("/landing/orders/")

Q15: How does Auto Loader handle schema evolution?

Answer:

python — editable
# Schema evolution modes
df = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/schema/orders") \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns")  # or rescue, failOnNewColumns, none
    .load("/landing/orders/")
ModeBehaviorUse Case
addNewColumnsAdds new columns, restarts streamSchema evolves frequently
rescueNew/unexpected columns go to _rescued_dataDon't want restarts
failOnNewColumnsPipeline fails when schema changesStrict schema control
noneIgnore new columnsFixed schema

Schema hints (force specific types):

python — editable
.option("cloudFiles.schemaHints", "order_id LONG, amount DECIMAL(10,2)")

Q16: Auto Loader vs COPY INTO — when do you use each?

Answer:

AspectAuto LoaderCOPY INTO
ProcessingStreaming (incremental)Batch (one-time or scheduled)
State trackingCheckpoint-basedTable metadata
File discoveryEfficient (notification/listing)Full directory listing every time
Schema evolutionBuilt-inManual
ScaleBetter for large directoriesSlower for millions of files
IdempotencyExactly-once via checkpointsExactly-once via file tracking
RecommendationPreferred for most use casesSimple one-time loads

SECTION 5: DELTA LIVE TABLES (DLT)

Q17: What are the three DLT expectation levels?

Answer:

python — editable
import dlt

@dlt.table
@dlt.expect("valid_id", "order_id IS NOT NULL")                    # Log violation, keep row
@dlt.expect_or_drop("positive_amount", "amount > 0")               # Drop violating rows
@dlt.expect_or_fail("valid_status", "status IN ('A','B','C')")     # Fail entire pipeline
def silver_orders():
    return dlt.read_stream("bronze_orders")
DecoratorOn ViolationUse Case
@dlt.expectRecords in metrics, keeps the rowMonitoring, soft quality
@dlt.expect_or_dropDrops the row silentlyFilter bad data
@dlt.expect_or_failFails the entire pipelineCritical data quality

Q18: What is the difference between a streaming live table, a materialized view, and a view in DLT?

Answer:

ConceptProcessingUse Case
Streaming Live TableAppend-only, incrementalNew data ingestion (Bronze, CDC)
Materialized ViewPrecomputed, incrementally updatedAggregations, enriched data (Silver, Gold)
ViewVirtual, computed on readIntermediate transformations, no storage
python — editable
# Streaming Live Table (Bronze — append-only)
@dlt.table
def bronze_events():
    return spark.readStream.format("cloudFiles").load(path)

# Materialized View (Gold — automatically incremental)
@dlt.table
def gold_daily_metrics():
    return dlt.read("silver_events") \
        .groupBy("date").agg(count("*").alias("event_count"))

# View (intermediate, not materialized)
@dlt.view
def enriched_events():
    return dlt.read("bronze_events").join(dim_table, "key")

Q19: How do you implement SCD Type 2 in DLT? (Compare with manual approach)

Answer:

python — editable
# DLT approach — 5 lines vs 50+ lines manually
dlt.create_streaming_table("dim_customer_scd2")

dlt.apply_changes(
    target="dim_customer_scd2",
    source="bronze_customer_cdc",
    keys=["customer_id"],                          # Business key
    sequence_by=col("updated_at"),                 # Ordering column
    apply_as_deletes=expr("operation = 'DELETE'"),  # Delete condition
    except_column_list=["operation", "_rescued_data"],
    stored_as_scd_type=2                           # SCD Type 2
)
# DLT auto-manages: __START_AT, __END_AT, __IS_CURRENT columns

Why DLT is better for SCD Type 2:

  • No merge-key trick needed
  • Handles out-of-order events automatically
  • Handles deletes natively
  • Incremental processing built-in
  • Data quality expectations on the same pipeline

SECTION 6: SCENARIO-BASED PIPELINE DESIGN

Q20: Scenario — Design a pipeline for 10 billion clickstream events/day from Kafka.

Answer:

🗂️Design:
Event rate: 10B/day = ~115K events/second
Source: Kafka (multiple topics, partitioned by user_id hash)
Bronze:
Structured Streaming from Kafka
Parse JSON, add ingestion metadata
Append-only, partitioned by ingestion_hour
Trigger: processingTime="30 seconds"
Cluster: 16 workers, Photon enabled
Expected volume: ~2 TB/day compressed
Silver:
Read from Bronze with CDF or streaming
Deduplicate (dropDuplicates with watermark)
Parse user agent, extract session info
Join with dim_user (broadcast — ~10 GB)
Sessionization (gap-based, 30-min timeout)
Write to silver_sessions table
CLUSTER BY (user_id)
Gold:
gold_hourly_metrics: page views, unique users per hour
gold_session_metrics: avg session duration, pages per session
gold_funnel_analysis: conversion funnel drop-offs
Trigger: availableNow (scheduled every hour)
Monitoring:
Streaming query listener for lag alerts
Row count reconciliation: Kafka offset vs Bronze count
Processing rate dashboard
Alert if processing time > 2x average

Q21: Scenario — Your daily batch pipeline takes 8 hours. Business wants it under 2 hours. How do you optimize?

Answer:

🗂️Debugging Methodology:
1. PROFILE: Check Spark UI for every stage
Which stage is slowest? (joins, aggregations, writes)
Is there data skew? (1 task >> others)
Is there spill to disk?
What join strategies are used?
2. IDENTIFY BOTTLENECKS (common findings):
Shuffle: Too many/few partitions
Skew: Uneven data distribution
Small files: Reading 100K tiny files
Full table scans: No partition pruning
Python UDFs: Serialization overhead
3. OPTIMIZATION STRATEGIES:
a. Switch to incremental processing
Process only changed data (use CDF, Auto Loader)
b. Optimize joins
Broadcast small tables
Salt skewed keys
Bucket tables that join repeatedly
c. Enable AQE for automatic optimizations
d. Use Photon runtime (up to 12x faster)
e. OPTIMIZE + ZORDER target tables
f. Increase cluster size (scale out)
g. Replace Python UDFs with built-in functions or Pandas UDFs
h. Parallelize independent stages (DAG, not sequential)
i. Use replaceWhere for partition-level overwrites
j. Enable optimized writes and auto-compaction

Q22: Scenario — Design a data mesh on Databricks.

Answer:

🗂️Principles:
1. Domain-oriented data ownership
2. Data as a product
3. Self-serve data platform
4. Federated computational governance
Implementation:
Unity Catalog Structure:
catalog: marketing (owned by marketing team)
schema: bronze (raw marketing data)
schema: silver (cleansed)
schema: gold (marketing metrics — the "product")
catalog: sales (owned by sales team)
... (same pattern)
catalog: shared (cross-domain data products)
schema: customer_360 (shared customer view)
Governance:
Unity Catalog for centralized access control
Each domain defines their own data quality SLAs
Shared data contracts (schema + SLA + ownership)
Audit logging for compliance
Self-Serve Platform:
Databricks Asset Bundles for CI/CD templates
Cluster policies per domain (cost control)
Shared notebook templates and libraries
Data quality framework (DLT expectations)
Data Products:
Each domain publishes curated Gold tables
Cross-domain access via Unity Catalog GRANT
Delta Sharing for external consumers
SLA monitoring with alerts

Q23: Scenario — Design a real-time fraud detection pipeline.

Answer:

python — editable
5 minutes") \
    .withColumn("txn_count_1h", count("*").over(w_1h)) \
    .withColumn("total_amount_1h", sum("amount").over(w_1h)) \
    .withColumn("txn_count_24h", count("*").over(w_24h)) \
    .withColumn("avg_amount_24h", avg("amount").over(w_24h)) \
    .withColumn("amount_deviation", (col("amount") - col("avg_amount_24h")) / col("avg_amount_24h"))

# 3. ML Scoring (using foreachBatch for model inference)
def score_and_alert(batch_df, batch_id):
    import mlflow
    model = mlflow.pyfunc.load_model("models:/fraud_model/Production")

    # Score
    scored = batch_df.toPandas()
    scored["fraud_score"] = model.predict(scored[feature_cols])

    scored_df = spark.createDataFrame(scored)

    # Write all scored transactions
    scored_df.write.format("delta").mode("append").saveAsTable("silver_scored_transactions")

    # Alert on high-risk
    alerts = scored_df.filter(col("fraud_score") > 0.8)
    alerts.write.format("delta").mode("append").saveAsTable("gold_fraud_alerts")

    # Trigger webhook for real-time blocking
    if alerts.count() > 0:
        send_webhook(alerts.collect())

features.writeStream.foreachBatch(score_and_alert) \
    .option("checkpointLocation", "/checkpoints/fraud") \
    .trigger(processingTime="10 seconds") \
    .start()"># Architecture:
# Transaction stream (Kafka) → Bronze → Feature Engineering → ML Scoring → Alert

# 1. Bronze: Raw transactions
transactions = spark.readStream.format("kafka") \
    .option("subscribe", "transactions") \
    .load() \
    .select(from_json(col("value").cast("string"), txn_schema).alias("txn")) \
    .select("txn.*")

# 2. Feature Engineering: Rolling aggregates
w_1h = Window.partitionBy("card_id").orderBy("event_time").rangeBetween(-3600, 0)
w_24h = Window.partitionBy("card_id").orderBy("event_time").rangeBetween(-86400, 0)

features = transactions \
    .withWatermark("event_time", "5 minutes") \
    .withColumn("txn_count_1h", count("*").over(w_1h)) \
    .withColumn("total_amount_1h", sum("amount").over(w_1h)) \
    .withColumn("txn_count_24h", count("*").over(w_24h)) \
    .withColumn("avg_amount_24h", avg("amount").over(w_24h)) \
    .withColumn("amount_deviation", (col("amount") - col("avg_amount_24h")) / col("avg_amount_24h"))

# 3. ML Scoring (using foreachBatch for model inference)
def score_and_alert(batch_df, batch_id):
    import mlflow
    model = mlflow.pyfunc.load_model("models:/fraud_model/Production")

    # Score
    scored = batch_df.toPandas()
    scored["fraud_score"] = model.predict(scored[feature_cols])

    scored_df = spark.createDataFrame(scored)

    # Write all scored transactions
    scored_df.write.format("delta").mode("append").saveAsTable("silver_scored_transactions")

    # Alert on high-risk
    alerts = scored_df.filter(col("fraud_score") > 0.8)
    alerts.write.format("delta").mode("append").saveAsTable("gold_fraud_alerts")

    # Trigger webhook for real-time blocking
    if alerts.count() > 0:
        send_webhook(alerts.collect())

features.writeStream.foreachBatch(score_and_alert) \
    .option("checkpointLocation", "/checkpoints/fraud") \
    .trigger(processingTime="10 seconds") \
    .start()

Q24: Scenario — You need to backfill 2 years of historical data, then switch to incremental. Design this.

Answer:

python — editable
# Phase 1: Historical Backfill (batch)
# Process month by month to manage memory

from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

start_date = datetime(2023, 1, 1)
end_date = datetime(2025, 1, 1)

current = start_date
while current < end_date:
    next_month = current + relativedelta(months=1)

    # Read one month at a time
    month_df = spark.read.parquet(f"/raw/data/year={current.year}/month={current.month:02d}/")

    # Transform
    transformed = apply_transformations(month_df)

    # Write idempotently using replaceWhere
    transformed.write.format("delta") \
        .mode("overwrite") \
        .option("replaceWhere", f"year = {current.year} AND month = {current.month}") \
        .saveAsTable("silver_table")

    current = next_month

# Optimize after backfill
spark.sql("OPTIMIZE silver_table ZORDER BY (customer_id)")

# Phase 2: Switch to Incremental (streaming)
spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "parquet") \
    .load("/raw/data/") \
    .writeStream.format("delta") \
    .option("checkpointLocation", "/checkpoints/silver") \
    .trigger(availableNow=True) \  # Run as scheduled batch
    .table("silver_table")

Q25: Scenario — Your MERGE statement takes 3 hours. How do you optimize it?

Answer:

Diagnosis Checklist:
┌─ Check Spark UI for the MERGE job
│ ├── Which stage is slowest?
│ ├── Is there a broadcast happening (or should there be)?
│ ├── Data skew in join keys?
│ └── How many files in target table?
├── Common Causes & Fixes:
│ ├── Too many small files in target
│ │ └── Fix: OPTIMIZE target table
│ │
│ ├── No partition pruning
│ │ └── Fix: Add partition column to MERGE condition
│ │ MERGE INTO target t USING source s
│ │ ON t.id = s.id AND t.date = s.date -- date is partition column
│ │
│ ├── No data skipping on merge key
│ │ └── Fix: OPTIMIZE target ZORDER BY (merge_key)
│ │
│ ├── Source too large (merging more than needed)
│ │ └── Fix: Pre-filter source to only changed records
│ │
│ ├── Both tables are large (no broadcast)
│ │ └── Fix: If source < 1 GB, force broadcast(source)
│ │
│ ├── Not using Photon
│ │ └── Fix: Switch to Photon runtime (can be 3-5x faster for MERGE)
│ │
│ └── Data skew in merge key
│ └── Fix: Salt the merge key (complex but effective)
└── Expected result: 3 hours20-40 minutes

Q26: Scenario — Multiple teams need access to the same data with different views. How?

Answer: Using Unity Catalog's governance features:

sql
-- Base table accessible to data engineering
CREATE TABLE catalog.schema.customer_data (...);

-- Marketing team: sees masked PII
CREATE FUNCTION mask_email(email STRING)
RETURN CASE
    WHEN is_member('data_engineering') THEN email
    ELSE regexp_replace(email, '(.).+(@.+)', '$1***$2')
END;

ALTER TABLE customer_data ALTER COLUMN email SET MASK mask_email;

-- Sales team: only sees their region
CREATE FUNCTION region_filter(region STRING)
RETURN region = current_user_region() OR is_member('admin');

ALTER TABLE customer_data SET ROW FILTER region_filter ON (region);

-- Analytics team: aggregated gold view (no PII)
CREATE VIEW gold_customer_metrics AS
SELECT region, COUNT(*) as customers, AVG(lifetime_value) as avg_ltv
FROM customer_data
GROUP BY region;

Q27: How do you design an idempotent pipeline?

Answer:

python — editable
# Principle: Re-running the pipeline produces the SAME result

# 1. Use MERGE instead of INSERT (handles reruns)
target.alias("t").merge(source.alias("s"), "t.id = s.id") \
    .whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

# 2. Use replaceWhere for partition-level overwrites
df.write.format("delta").mode("overwrite") \
    .option("replaceWhere", "date = '2025-01-15'") \
    .saveAsTable("orders")
# Re-running replaces the same partition — idempotent

# 3. Streaming: checkpoints ensure exactly-once
stream.writeStream.option("checkpointLocation", "/checkpoints/job1").start()
# Re-starting reads from last committed offset

# 4. Auto Loader: tracks processed files
spark.readStream.format("cloudFiles").load(path)
# Re-starting skips already-processed files

# 5. Batch jobs: use run_date parameter
def process_day(run_date):
    df = spark.read.parquet(f"/raw/{run_date}/")
    df.write.format("delta").mode("overwrite") \
        .option("replaceWhere", f"process_date = '{run_date}'") \
        .saveAsTable("silver_table")

Q28: How do you monitor pipeline health in production?

Answer:

python — editable
# 1. Row count reconciliation
source_count = spark.read.table("bronze_orders").filter("date = '2025-01-15'").count()
silver_count = spark.read.table("silver_orders").filter("date = '2025-01-15'").count()
gold_count = spark.read.table("gold_daily_orders").filter("date = '2025-01-15'").count()

assert abs(source_count - silver_count) / source_count < 0.01, "Silver count drift > 1%"

# 2. Data freshness SLA
from pyspark.sql.functions import max as _max, current_timestamp, unix_timestamp

freshness = spark.table("silver_orders") \
    .select(
        (unix_timestamp(current_timestamp()) - unix_timestamp(_max("updated_at"))) / 60
    ).collect()[0][0]  # minutes since last update

assert freshness < 60, f"Data is {freshness} minutes stale!"

# 3. Streaming metrics
for query in spark.streams.active:
    progress = query.lastProgress
    print(f"Input rate: {progress['inputRowsPerSecond']}")
    print(f"Processing rate: {progress['processedRowsPerSecond']}")
    print(f"Batch duration: {progress['batchDuration']}")
    # Alert if processing rate < input rate (falling behind)

# 4. DLT expectation metrics (available in DLT event log)
expectations = spark.read.table("event_log") \
    .filter("event_type = 'flow_progress'") \
    .select("timestamp", "details:flow_progress:data_quality:*")