File 03: Delta Lake & Lakehouse Architecture
Level: Senior/Lead (10+ years) — Deep internals and optimization Focus: Transaction log, MERGE scenarios, optimization, architecture decisions
SECTION 1: DELTA LAKE INTERNALS
Q1: What is the Delta Lake transaction log (_delta_log)? Explain how it ensures ACID transactions.
Answer:
The _delta_log/ directory is an ordered record of every transaction performed on a Delta table.
Structure:
Each JSON commit file contains:
addactions: New Parquet files addedremoveactions: Files logically deleted (still physically present until VACUUM)metaData: Schema changes, table propertiesprotocol: Reader/writer versioncommitInfo: Timestamp, operation, user, metrics
How ACID is ensured:
- Atomicity: Each commit writes a single JSON file atomically using
put-if-absent(filesystem rename) - Consistency: Schema enforcement rejects mismatched writes
- Isolation: Snapshot isolation — readers see a consistent snapshot
- Durability: Data persisted as Parquet files on durable cloud storage
Q2: What are checkpoint files? Why are they critical?
Answer: Every 10 commits (default), Delta creates a Parquet checkpoint file that consolidates the full table state at that point.
Why needed:
- Without checkpoints: Reading version 10,000 requires replaying 10,000 JSON files
- With checkpoints: Read the latest checkpoint + only subsequent JSON files
_last_checkpointfile stores the latest checkpoint version
Configurable: delta.checkpointInterval (default: 10)
Q3: Explain optimistic concurrency control in Delta Lake. What happens when two writers conflict?
Answer:
Isolation levels:
| Level | Default? | Behavior |
|---|---|---|
| WriteSerializable | Yes | Writers see consistent snapshots; non-conflicting concurrent writes succeed |
| Serializable | No | Strictest; even reads during writes can cause conflicts |
Q4: Explain file-level statistics and data skipping.
Answer: Delta stores min/max statistics for the first 32 columns (by default) in the transaction log.
How data skipping works:
SELECT * FROM orders WHERE order_date = '2025-01-15'
- Delta reads file statistics from the transaction log
- For each file, checks:
min(order_date) <= '2025-01-15' AND max(order_date) >= '2025-01-15' - Files where the condition is impossible are skipped entirely
- Only matching files are read
Configuration: delta.dataSkippingNumIndexedCols (default 32)
Q5: What is the difference between OPTIMIZE and VACUUM?
Answer:
| Aspect | OPTIMIZE | VACUUM |
|---|---|---|
| Purpose | Compacts small files into larger ones | Physically deletes old unreferenced files |
| Performance impact | Yes — improves reads | No direct performance gain — frees storage |
| Data safety | Non-destructive (old files still exist) | DESTRUCTIVE — files removed permanently |
| Default retention | N/A | 7 days (delta.deletedFileRetentionDuration) |
| Time travel impact | None | Breaks time travel before vacuum threshold |
-- Compact files (target ~1 GB per file)
OPTIMIZE orders;
-- Compact + co-locate by column
OPTIMIZE orders ZORDER BY (customer_id);
-- Clean up old files (7-day retention)
VACUUM orders;
-- Clean up with custom retention
VACUUM orders RETAIN 168 HOURS; -- 7 days
Q6: Can you run VACUUM with retention of 0 hours? What are the risks?
Answer: Yes, but you must disable the safety check:
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM my_table RETAIN 0 HOURS;
Risks:
- Breaks ALL time travel — can't query any previous version
- Concurrent readers may fail — readers that started before VACUUM may reference deleted files
- No recovery — deleted data is gone permanently
Rule: NEVER do this in production. Use default 7-day retention.
Q7: What happens if a write fails midway in Delta Lake?
Answer:
- Data files (Parquet) may be partially written to storage
- But the transaction log entry is never committed (atomic operation)
- On next read, Delta only considers files referenced in committed log entries
- The orphaned Parquet files are cleaned up by VACUUM
- This is the key benefit of ACID — partial writes don't corrupt the table
SECTION 2: MERGE INTO — ALL SCENARIOS
Q8: Explain the MERGE INTO syntax. Write a basic upsert.
Answer:
MERGE INTO target_table AS t
USING source_table AS s
ON t.id = s.id -- Match condition
WHEN MATCHED AND s.op = 'DELETE' THEN
DELETE -- Delete matching rows
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET * -- Update with all source columns
WHEN NOT MATCHED THEN
INSERT * -- Insert all source columns
WHEN NOT MATCHED BY SOURCE THEN
DELETE -- Delete target rows not in source (Databricks extension)
PySpark equivalent:
from delta.tables import DeltaTable
target = DeltaTable.forName(spark, "target_table")
target.alias("t").merge(
source_df.alias("s"),
"t.id = s.id"
).whenMatchedUpdate(
condition="s.updated_at > t.updated_at",
set={"*": "*"} # or explicit: {"name": "s.name", "email": "s.email"}
).whenNotMatchedInsertAll() \
.execute()
Q9: How do you handle duplicate keys in the source during MERGE?
Answer:
If source has duplicate keys matching the same target row, MERGE throws an error: "Cannot perform Merge as multiple source rows matched...".
Solution: Deduplicate source first:
from pyspark.sql.functions import row_number, col
from pyspark.sql import Window
w = Window.partitionBy("id").orderBy(col("updated_at").desc())
deduped_source = source_df \
.withColumn("rn", row_number().over(w)) \
.filter(col("rn") == 1) \
.drop("rn")
# Now merge with deduped source
target.alias("t").merge(deduped_source.alias("s"), "t.id = s.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Q10: Design a MERGE for an e-commerce order system (create, update, cancel).
Answer:
MERGE INTO orders_fact t
USING orders_staging s
ON t.order_id = s.order_id
-- Cancel: mark as deleted
WHEN MATCHED AND s.status = 'CANCELLED' THEN DELETE
-- Update: only if newer
WHEN MATCHED AND s.updated_at > t.updated_at THEN
UPDATE SET
t.status = s.status,
t.amount = s.amount,
t.shipping_address = s.shipping_address,
t.updated_at = s.updated_at
-- New order
WHEN NOT MATCHED THEN
INSERT (order_id, customer_id, status, amount, shipping_address, created_at, updated_at)
VALUES (s.order_id, s.customer_id, s.status, s.amount, s.shipping_address, s.created_at, s.updated_at)
Q11: What is the performance concern with MERGE? How do you optimize it?
Answer: MERGE scans the entire target table to find matches. For large tables, this is the bottleneck.
Optimization techniques:
1. Partition pruning — add partition columns to ON clause:
MERGE INTO target t
USING source s
ON t.id = s.id AND t.date = s.date -- date is partition column → prunes partitions
2. Z-ORDER on merge key:
OPTIMIZE target ZORDER BY (id); -- Data skipping on the merge key
3. Reduce source data before merge:
# Don't merge 100M rows if only 1M changed
changed_records = source_df.join(target_df, "id", "left_anti") # New records
changed_records = changed_records.union(
source_df.join(target_df, "id", "inner")
.filter(source_df["hash"] != target_df["hash"]) # Changed records
)
4. Broadcast small source:
target.alias("t").merge(
broadcast(source_df).alias("s"), # Broadcast if source is small
"t.id = s.id"
)
5. Compact target first:
OPTIMIZE target; -- Fewer, larger files = fewer tasks
6. Use Photon runtime — significantly faster MERGE operations.
Q12: MERGE with schema evolution — how does it work?
Answer:
# Enable automatic schema merge
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")
# Now source with new columns will automatically add them to target
target.alias("t").merge(
source_df.alias("s"), # source has columns not in target
"t.id = s.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# New columns from source are added to target table
SECTION 3: TIME TRAVEL & RECOVERY
Q13: How does time travel work? Show all query methods.
Answer:
-- By version number
SELECT * FROM orders VERSION AS OF 5;
SELECT * FROM orders@v5;
-- By timestamp
SELECT * FROM orders TIMESTAMP AS OF '2025-01-15 10:30:00';
-- Check history first
DESCRIBE HISTORY orders;
-- Restore entire table
RESTORE TABLE orders TO VERSION AS OF 5;
RESTORE TABLE orders TO TIMESTAMP AS OF '2025-01-15';
PySpark:
# By version
df = spark.read.format("delta").option("versionAsOf", 5).load(path)
# By timestamp
df = spark.read.format("delta").option("timestampAsOf", "2025-01-15").load(path)
Q14: Scenario — You accidentally deleted critical data 3 days ago. How do you recover?
Answer:
-- Step 1: Check history to find the version before the delete
DESCRIBE HISTORY customers;
-- Let's say the DELETE was version 42, so we want version 41
-- Step 2: Option A — Restore entire table (fastest, simplest)
RESTORE TABLE customers TO VERSION AS OF 41;
-- Step 2: Option B — Selectively restore only deleted rows
INSERT INTO customers
SELECT * FROM customers VERSION AS OF 41
WHERE customer_id NOT IN (SELECT customer_id FROM customers);
-- Step 2: Option C — MERGE for surgical recovery
MERGE INTO customers AS target
USING customers VERSION AS OF 41 AS source
ON target.customer_id = source.customer_id
WHEN NOT MATCHED THEN INSERT *;
Time travel limits:
- Default data retention: 7 days (
delta.deletedFileRetentionDuration) - Default log retention: 30 days (
delta.logRetentionDuration) - VACUUM removes old files → breaks time travel for those versions
SECTION 4: Z-ORDERING, LIQUID CLUSTERING, OPTIMIZATION
Q15: What is Z-Ordering? How does it differ from partitioning?
Answer:
| Aspect | Partitioning | Z-Ordering |
|---|---|---|
| How it works | Creates separate directories per value | Co-locates related data within files using space-filling Z-curve |
| Cardinality | Low cardinality (< 1000 values) | High cardinality (user_id, order_id) |
| Query pattern | Almost always filter on partition column | Frequently filter on the Z-ordered column |
| Overhead | Over-partitioning creates small files | Requires running OPTIMIZE |
| Combination | Can be combined | Z-ORDER should NOT use partition columns |
-- Partition by date (low cardinality), Z-order by customer_id (high cardinality)
CREATE TABLE orders (
order_id LONG,
customer_id LONG,
order_date DATE,
amount DECIMAL(10,2)
) PARTITIONED BY (order_date);
OPTIMIZE orders ZORDER BY (customer_id);
Z-ORDER best practices:
- Max 4 columns (effectiveness decreases with more)
- Choose columns in WHERE, JOIN, MERGE conditions
- High cardinality columns benefit most
- NOT idempotent — running again rewrites files
Q16: What is Liquid Clustering? How does it improve over Z-Ordering + Partitioning?
Answer: Liquid Clustering (GA in Databricks) is the next-generation data layout that replaces both partitioning and Z-Ordering.
| Aspect | Z-Ordering | Liquid Clustering |
|---|---|---|
| When applied | Manual OPTIMIZE ZORDER BY | Automatic on writes (or OPTIMIZE) |
| Incremental | No (rewrites ALL files) | Yes (only new/changed data) |
| Change keys | Must re-OPTIMIZE everything | Just ALTER — new writes use new keys |
| Partitioning | Separate concept | Replaces partitioning |
| Small files | Doesn't address | Handles automatically |
-- Create with liquid clustering (replaces PARTITIONED BY + ZORDER)
CREATE TABLE orders (
order_id LONG,
customer_id LONG,
order_date DATE,
amount DECIMAL(10,2)
) CLUSTER BY (customer_id, order_date);
-- Change clustering keys without rewriting data
ALTER TABLE orders CLUSTER BY (order_date, region);
-- Trigger optimization
OPTIMIZE orders;
When to use what (2024+ recommendation):
- New tables: Use Liquid Clustering
- Existing tables with partitioning: Migrate to Liquid Clustering if possible
- Legacy tables: Continue with partitioning + Z-Ordering
Q17: What are Deletion Vectors? How do they improve UPDATE/DELETE performance?
Answer: Instead of rewriting entire Parquet files for DELETE/UPDATE/MERGE, deletion vectors mark individual rows as deleted in a separate lightweight file.
Without deletion vectors:
- DELETE 1 row from a 1 GB Parquet file → rewrite entire 1 GB file
- MERGE affecting 100 files → rewrite all 100 files
With deletion vectors:
- DELETE 1 row → write a tiny deletion vector file (~bytes)
- Reads filter out deleted rows using the deletion vector
- Physical rewrite is deferred to next OPTIMIZE
-- Enable deletion vectors
ALTER TABLE my_table SET TBLPROPERTIES ('delta.enableDeletionVectors' = 'true');
Trade-off: Slightly slower reads (must apply deletion vectors) but much faster writes. OPTIMIZE reclaims space.
Q18: What are the most important Delta table properties?
Answer:
ALTER TABLE my_table SET TBLPROPERTIES (
-- Write optimization
'delta.autoOptimize.optimizeWrite' = 'true', -- Coalesce small files on write
'delta.autoOptimize.autoCompact' = 'true', -- Auto compaction after writes
-- Change Data Feed
'delta.enableChangeDataFeed' = 'true', -- Track row-level changes
-- Retention
'delta.logRetentionDuration' = 'interval 30 days', -- How long to keep commit logs
'delta.deletedFileRetentionDuration' = 'interval 7 days', -- VACUUM threshold
-- Column mapping (enables column rename/drop)
'delta.columnMapping.mode' = 'name',
-- Deletion vectors (faster deletes)
'delta.enableDeletionVectors' = 'true',
-- Checkpoint interval
'delta.checkpointInterval' = '10'
);
Q19: What is the difference between managed and external tables in Databricks?
Answer:
| Aspect | Managed Table | External Table |
|---|---|---|
| Storage | Databricks-managed location | User-specified external location |
| DROP TABLE | Deletes both metadata AND data | Deletes only metadata — data survives |
| Use case | Default for most tables | Shared data, data must survive table drops |
| Unity Catalog | Managed by metastore | Requires External Location grant |
-- Managed (data stored in default warehouse location)
CREATE TABLE managed_orders (id LONG, amount DECIMAL);
-- External (data at your specified location)
CREATE TABLE external_orders (id LONG, amount DECIMAL)
LOCATION 's3://my-bucket/orders/';
Q20: What are clone operations? Explain deep clone vs shallow clone.
Answer:
| Aspect | Deep Clone | Shallow Clone |
|---|---|---|
| Copies data? | YES (full copy) | NO (references source files) |
| Independent? | Yes — fully independent | No — depends on source data files |
| Speed | Slow (copies all data) | Fast (copies only metadata) |
| Use case | Production copies, migration | Testing, experimentation, quick snapshots |
| VACUUM safe? | Yes | NO — vacuuming source can break clone |
-- Deep clone (full copy)
CREATE TABLE orders_backup DEEP CLONE orders;
-- Shallow clone (metadata only)
CREATE TABLE orders_test SHALLOW CLONE orders;
-- Incremental deep clone (only new changes since last clone)
CREATE TABLE orders_backup DEEP CLONE orders; -- subsequent runs are incremental
Q21: Explain schema evolution in Delta Lake. What are the options?
Answer:
Add new columns during write:
# mergeSchema — adds new columns, preserves existing
df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.saveAsTable("my_table")
Replace entire schema:
# overwriteSchema — replaces schema completely
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.saveAsTable("my_table")
Column rename/drop (requires column mapping):
-- Enable column mapping first
ALTER TABLE my_table SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name');
-- Now you can rename and drop columns
ALTER TABLE my_table RENAME COLUMN old_name TO new_name;
ALTER TABLE my_table DROP COLUMN unused_column;
SECTION 5: LAKEHOUSE ARCHITECTURE
Q22: What is a data lakehouse? How does it differ from data lake and data warehouse?
Answer:
| Aspect | Data Lake | Data Warehouse | Data Lakehouse |
|---|---|---|---|
| Storage | Cheap object storage | Proprietary | Cheap object storage |
| Format | Open (Parquet, ORC) | Proprietary | Open (Delta, Iceberg, Hudi) |
| ACID | No | Yes | Yes |
| Schema | Schema-on-read | Schema-on-write | Both |
| Performance | Slow for BI | Fast for BI | Fast (OPTIMIZE, caching, Photon) |
| ML support | Good | Poor | Excellent |
| Governance | Limited | Strong | Strong (Unity Catalog) |
| Cost | Low | High | Low-Medium |
Q23: What key technologies enable the lakehouse?
Answer:
- Delta Lake / Iceberg / Hudi: ACID transactions on data lakes
- Photon / vectorized engines: Warehouse-level query performance
- Unity Catalog: Unified governance across all data assets
- Serverless compute: On-demand, auto-scaling
- SQL endpoints / SQL Warehouses: Direct BI tool connectivity
- MLflow: Integrated ML lifecycle management
Q24: What problems does the lakehouse solve?
Answer:
- Eliminates the "two-tier" architecture (data lake + data warehouse)
- No more ETL from lake to warehouse
- Single copy of data serves all workloads
- Reduces data duplication and ETL complexity
- Single source of truth for BI and ML
- Open formats prevent vendor lock-in
- Cost-effective storage with warehouse-level performance
- Unified governance across all workloads
Q25: Scenario — Your company has a data lake on S3, a Snowflake warehouse, and a separate ML platform. The CEO wants to consolidate. How do you design the lakehouse migration?
Answer:
Q26: What is Delta Sharing? How does it work?
Answer: Delta Sharing is an open protocol for secure data sharing across organizations.
How it works:
- Provider shares a Delta table via a Delta Sharing Server
- Provider generates a sharing profile (JSON with endpoint + credentials)
- Recipient uses any client (Spark, pandas, Power BI) to read shared data
- Data is read directly from the provider's storage — no copying
- Recipient gets read-only access with the provider's access controls
# Recipient reads shared data
df = spark.read.format("deltaSharing").load("profile.json#share.schema.table")
Key benefits:
- No data copying (cost-effective)
- Open protocol (not Databricks-specific)
- Audit logging on provider side
- Fine-grained access control
Q27: Scenario — A Delta table has 10,000 small files (each <1 MB). Queries are slow. How do you fix this?
Answer:
-- Step 1: Compact files immediately
OPTIMIZE slow_table; -- Merge into ~1 GB files
OPTIMIZE slow_table ZORDER BY (frequently_filtered_col); -- Plus data skipping
-- Step 2: Clean up old files
VACUUM slow_table RETAIN 168 HOURS;
-- Step 3: Prevent future small files
ALTER TABLE slow_table SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true', -- Coalesce on write
'delta.autoOptimize.autoCompact' = 'true' -- Auto compact after writes
);
-- Step 4: For streaming sources, increase trigger interval
# .trigger(processingTime="5 minutes") -- Instead of "10 seconds"
-- Step 5: For new tables, use Liquid Clustering
-- CREATE TABLE ... CLUSTER BY (col) -- Handles compaction automatically
Q28: What is the difference between DataFrame.write.mode("overwrite") and REPLACE TABLE in Delta?
Answer:
| Aspect | mode("overwrite") | REPLACE TABLE |
|---|---|---|
| Scope | Overwrites data (optionally per partition) | Replaces entire table definition |
| Schema | Keeps existing schema (unless overwriteSchema=true) | Can change schema |
| History | Maintains history (time travel works) | Maintains history |
| Partition overwrite | Supports replaceWhere for surgical overwrites | N/A |
# Overwrite specific partitions only
df.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "date = '2025-01-15'") \
.saveAsTable("orders")
# This is IDEMPOTENT — safe for retry/re-run
Q29: How do you implement write-audit-publish (WAP) pattern with Delta Lake?
Answer: WAP ensures data quality before making data visible to consumers.
# Step 1: Write to a staging area (or use table clones)
staging = "staging_orders"
df.write.format("delta").mode("overwrite").saveAsTable(staging)
# Step 2: Audit — run quality checks
quality_check = spark.sql(f"""
SELECT
COUNT(*) as total_rows,
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) as null_ids,
SUM(CASE WHEN amount < 0 THEN 1 ELSE 0 END) as negative_amounts
FROM {staging}
""").collect()[0]
assert quality_check["null_ids"] == 0, "Null order IDs found!"
assert quality_check["negative_amounts"] == 0, "Negative amounts found!"
# Step 3: Publish — atomically swap
spark.sql(f"""
INSERT OVERWRITE TABLE production_orders
SELECT * FROM {staging}
""")
Alternative with Delta's RESTORE:
If quality check fails after writing to production, RESTORE TABLE production_orders TO VERSION AS OF .