10 GB, fail at 500 GB ─────────────────
# ── SOLUTION 1: Enable AQE (auto-adapts at runtime) ───────────────────────────
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
# AQE merges tiny partitions on small days, splits large ones on big days
# shuffle.partitions can be set to 2000 and AQE will coalesce on small days
# ── SOLUTION 2: Dynamic Allocation (scale executors to data) ───────────────────
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200") # scale up for Monday
spark.conf.set("spark.shuffle.service.enabled", "true")
# On 10 GB day: ~5 executors. On 500 GB Monday: auto-scales to 100+ executors
# ── SOLUTION 3: Data-driven shuffle partition calculation ─────────────────────
# Before running the main job, estimate data size and set partitions
def estimate_partitions(spark, path, target_mb=128):
"""Estimate optimal partition count from input data size."""
file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
.FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
spark.sparkContext._jsc.hadoopConfiguration()) \
.getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
.getLength()
size_mb = file_sizes / (1024 * 1024)
partitions = max(200, int(size_mb / target_mb))
return partitions
n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))
# ── SOLUTION 4: Add checkpoint mid-job (survive retries on large runs) ─────────
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df = spark.read.parquet("s3://bucket/data/")
# After expensive operation: checkpoint to cut lineage + save progress
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint() # writes to S3 — if job fails, restart from here
after_join.unpersist()
# ── SOLUTION 5: Partition the input table by date for pruning ─────────────────
# Write data partitioned by date so Monday read only reads weekend partition
df.write \
.partitionBy("year", "month", "day") \
.mode("append") \
.parquet("s3://bucket/data-partitioned/")
# Monday read: only reads Sat+Sun partitions (not entire history)
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
.filter(col("year") == 2024) \
.filter(col("month") == 1) \
.filter(col("day").isin(6, 7)) # Saturday + Sunday only
# ── COMPLETE ROBUST ETL PATTERN ───────────────────────────────────────────────
from datetime import datetime, timedelta
def run_etl(spark, date_str):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.sql.shuffle.partitions", "2000") # AQE coalesces down
df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
.filter(col("is_valid") == True) \
.select("id", "amount", "category")
result = df.groupBy("category") \
.agg(sum("amount").alias("total"))
result.write.mode("overwrite") \
.parquet(f"s3://bucket/output/date={date_str}/")">
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.minExecutors", "5")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.shuffle.service.enabled", "true")
def estimate_partitions(spark, path, target_mb=128):
"""Estimate optimal partition count from input data size."""
file_sizes = spark.sparkContext._jvm.org.apache.hadoop.fs \
.FileSystem.get(spark.sparkContext._jvm.java.net.URI.create(path),
spark.sparkContext._jsc.hadoopConfiguration()) \
.getContentSummary(spark.sparkContext._jvm.org.apache.hadoop.fs.Path(path)) \
.getLength()
size_mb = file_sizes / (1024 * 1024)
partitions = max(200, int(size_mb / target_mb))
return partitions
n_parts = estimate_partitions(spark, "s3://bucket/data/")
spark.conf.set("spark.sql.shuffle.partitions", str(n_parts))
spark.sparkContext.setCheckpointDir("s3://bucket/checkpoints/")
df = spark.read.parquet("s3://bucket/data/")
after_join = df.join(reference, "id").filter(...)
after_join.cache()
after_join.checkpoint()
after_join.unpersist()
df.write \
.partitionBy("year", "month", "day") \
.mode("append") \
.parquet("s3://bucket/data-partitioned/")
df_weekend = spark.read.parquet("s3://bucket/data-partitioned/") \
.filter(col("year") == 2024) \
.filter(col("month") == 1) \
.filter(col("day").isin(6, 7))
from datetime import datetime, timedelta
def run_etl(spark, date_str):
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.dynamicAllocation.enabled", "true")
spark.conf.set("spark.dynamicAllocation.maxExecutors", "200")
spark.conf.set("spark.sql.shuffle.partitions", "2000")
df = spark.read.parquet(f"s3://bucket/data/date={date_str}/") \
.filter(col("is_valid") == True) \
.select("id", "amount", "category")
result = df.groupBy("category") \
.agg(sum("amount").alias("total"))
result.write.mode("overwrite") \
.parquet(f"s3://bucket/output/date={date_str}/")