PySpark
PySpark Day 3 — Quick Recall: Optimization + Performance
PySpark · Section 7 of 8

PySpark Day 3 — Quick Recall: Optimization + Performance

PySpark Day 3 — Quick Recall: Optimization + Performance

Must know⚠️Trap🧠Memory map📝One-liner

🧠 MASTER MNEMONICS

🧠 CATALYST 4 PHASES → "ALPC"
CATALYST 4 PHASES"ALPC"
AAnalysis (resolve columns, parse SQL → Unresolved Logical Plan)
LLogical Opt (predicate pushdown, column pruning, constant folding)
PPhysical Plan (pick join strategy, CBO picks cheapest)
CCode Generation (Tungsten whole-stage CodeGen → JVM bytecode)
TUNGSTEN 3 FEATURES"OCW"
OOff-heap memory (bypass GC, binary format)
CCache-aware ops (CPU L1/L2 cache efficiency)
WWhole-stage CodeGen (collapse operators → compiled JVM function)
JOIN STRATEGIES"BSS-BC" (fastest → slowest)
BBroadcast Hash Join (NO shuffle, < threshold or hint)
SShuffle Hash Join (shuffle, medium tables, hash map in memory)
SSort-Merge Join (shuffle + sort, large-large, DEFAULT)
BBroadcast Nested Loop (non-equi joins, O(n×m))
CCartesian Join (CROSS JOIN, no condition)
AQE 3 FEATURES"CDS"
CCoalesce shuffle partitions (merge tiny post-shuffle partitions)
DDynamic join switching (SMJ → BHJ at runtime if table shrinks)
SSkew join optimization (split hot partitions automatically)
DATA SKEW SOLUTIONS"LABS"
LLet AQE handle it (spark 3.0+, automatic)
AAdd broadcast hint (if small side → broadcast(df))
BBuild salted keys (random salt → distribute hot keys)
SSplit aggregation (2-phase) (partial agg → final agg)

SECTION 1: CATALYST FLASH CARDS

QWhat are the 4 phases of Catalyst? (ALPC)

🧠 Memory Map
1. AnalysisParse + resolve column names/types against Catalog
2. Logical OptRule-based: predicate pushdown, column pruning, constant folding
3. PhysicalGenerate N physical plans, CBO picks cheapest
4. CodeGenTungsten whole-stage codegen → JVM bytecode
QWhat is predicate pushdown?

Moving filter conditions as close to the data source as possible (before joins/aggregations). For Parquet: skips row groups. For JDBC: adds WHERE clause to SQL sent to DB.

QWhen does predicate pushdown FAIL?

1. Filter uses a UDFUDF is opaque to Catalyst
2. Filter on computed column: .withColumn("yr", year("date")).filter("yr=2024")
Fix: .filter(year("date") == 2024) BEFORE withColumn
3. Non-pushdown sources (some custom data sources)
QWhat is column pruning?

Catalyst reads only the columns referenced in the query. Parquet (columnar) makes this very efficient — unneeded columns are never deserialized from disk.

QHow to see the Catalyst optimization in action?
python — editable
df.explain("formatted")   # shows all 4 plan phases
df.explain("cost")        # shows with CBO cost estimates
# Look for: PushedFilters in FileScan = pushdown working
# Look for: BroadcastHashJoin = correct join chosen

SECTION 2: TUNGSTEN FLASH CARDS

QWhat is Tungsten? Name 3 features (OCW)

Spark's low-level execution engine for memory + CPU efficiency.

🧠 Memory Map
1. Off-heap memory managementstores data in binary, bypasses GC
2. Cache-aware computationoptimizes access patterns for CPU cache
3. Whole-stage CodeGencompiles pipeline into single JVM function
QWhat problem does whole-stage CodeGen solve?

Eliminates virtual function call overhead per row (e.g., calling filter() then map() on every row). Instead, Spark generates a single compiled Java method that processes all operators in one loop → ~10x CPU efficiency.

QHow to enable off-heap Tungsten?
python — editable
spark.conf.set("spark.memory.offHeap.enabled", "true")
spark.conf.set("spark.memory.offHeap.size", "4g")

SECTION 3: REPARTITION vs COALESCE FLASH CARDS

Qrepartition vs coalesce — 5 differences:
Propertyrepartition(N)coalesce(N)
TypeWIDE (full shuffle)NARROW (no shuffle)
DirectionUp or downDOWN ONLY
BalancePerfectly evenMay be uneven
SpeedSlow (shuffle cost)Fast (local merge)
By column?YES (.repartition(N,col))NO
QWhen to use each?
repartition():
✓ Before a join (even distribution by join key)
✓ Increasing parallelism
✓ Fixing data skew before aggregation
coalesce():
✓ After large filter (less data now)
✓ Before write (reduce output file count)
✓ When partitions are already somewhat even

⚠️ TRAP: coalesce(1) = single partition = single writer = bottleneck for large data. Use repartition(small_N) instead.

QWhat is spark.sql.shuffle.partitions and when to tune?
Default: 200
Rule: 1 partition per 100-200 MB of shuffled data
Too few (10 for 1 TB): OOM, slow tasks
Too many (1000 for 1 MB): scheduler overhead, tiny files
AQE auto-tunes this at runtimeenable AQE and set conservatively high

SECTION 4: JOIN STRATEGIES FLASH CARDS

QWhich join is fastest? Which is default for large-large? (BSS-BC)
Fastest: Broadcast Hash Join (no shuffle at all)
Default: Sort-Merge Join (large + large equi-join)
QBroadcast Hash Join — when and how to trigger:
AUTO: table < spark.sql.autoBroadcastJoinThreshold (default 10 MB)
MANUAL HINT
Python: large_df.join(broadcast(small_df), "key")
SQL: SELECT /*+ BROADCAST(small_tbl) */ ...
⚠️Threshold too low → large tables not auto-broadcasted
Tune: spark.sql.autoBroadcastJoinThreshold = "50m"
⚠️Threshold too high → OOM from broadcasting too-large table
QSort-Merge Join — how does it work?
1. Shuffle both sides by join key
2. Sort both sides by join key
3. Merge (like merge-sort algorithm) without hash map
→ Memory-efficient but CPU/network expensive (double shuffle + sort)
→ Used when neither table fits in memory for hash map
QWhen do you get a Broadcast Nested Loop Join?
Non-equi joins (>, <, >=, !=, BETWEEN, LIKE, theta-joins)
Example: "find all orders within 30 days of each event"
→ O(n×m) nested loop — SLOW
→ Try to convert to equi-join where possible

SECTION 5: AQE FLASH CARDS (MOST IMPORTANT!)

QWhat is AQE and when was it introduced?

Adaptive Query Execution — Spark 3.0+, default ON in Spark 3.2+. Re-optimizes query plans at runtime using actual statistics (not estimates).

Q3 features of AQE (CDS):
🧠 Memory Map
1. COALESCE SHUFFLE PARTITIONS
Problem: shuffle.partitions=200200 tiny partitions for small data
Fix: After each shuffle, merges small partitions into target size (64 MB default)
Config: spark.sql.adaptive.advisoryPartitionSizeInBytes = "64m"
2. DYNAMIC JOIN STRATEGY SWITCHING
Problem: Catalyst planned SMJ (large table), but after filter it's 5 MB
Fix: At runtime, if table shrinks below thresholdswitches to BHJ
No code change needed!
3. SKEW JOIN OPTIMIZATION
Problem: "US" key has 80% of rows1 task runs for hours
Fix: Detects skewed partitions at runtime, splits them, duplicates other side
Config: spark.sql.adaptive.skewJoin.skewedPartitionFactor = 5 (5x median)
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes = "256m"
QHow to enable AQE (full config):
python — editable
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionFactor", "5")
spark.conf.set("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes", "256m")
QDoes AQE eliminate the need for manual tuning?

Not completely. AQE helps with partition coalescing, join switching, and skew — but you still need to: tune executor memory, choose repartition/coalesce, add broadcast hints for non-stats-based decisions, and handle the small files problem after writes.

SECTION 6: DATA SKEW FLASH CARDS

QHow do you detect data skew?
1. Spark UIStages tab → Task Duration: max >> median (e.g., 500s vs 5s)
2. Code: df.groupBy("key").count().orderBy(desc("count")).show(20)
3. Partition check: df.withColumn("pid", spark_partition_id())
.groupBy("pid").count()
.orderBy(desc("count")).show()
Q4 solutions for data skew (LABS):
🧠 Memory Map
1. LET AQE handle: skewJoin.enabled=trueauto at runtime (easiest)
2. ADD broadcast: broadcast(small_df) → if skewed side is small enough
3. BUILD salted keys:
large: concat(key, "_", (rand()*10).cast("int")) as salted_key
small: EXPLODE key×10 salt values (cross join with range 0-9)
JOIN on salted_key10× distributed
4. SPLIT aggregation (2-phase):
Phase 1: groupBy(key, salt) → partial result
Phase 2: groupBy(key) → final result
QWhat is salting — when does it apply?

Salting works for JOIN skew (one key appears too much on one side). It appends a random number (0-9) to the hot key in the large table, then replicates the matching small table 10 times with each salt value. The hot key is distributed across 10 tasks instead of 1.

SECTION 7: CACHE + PERSIST FLASH CARDS

Qcache() vs persist() — code difference:
python — editable
df.cache()                                     # = MEMORY_AND_DISK (Spark 3.x)
df.persist()                                   # same as cache()
df.persist(StorageLevel.MEMORY_ONLY)           # fast, recomputes on eviction
df.persist(StorageLevel.MEMORY_AND_DISK)       # spills to disk, safer
df.persist(StorageLevel.DISK_ONLY)             # slowest, always disk
df.persist(StorageLevel.MEMORY_ONLY_SER)       # smaller memory, slower access
df.persist(StorageLevel.OFF_HEAP)              # Tungsten off-heap

df.unpersist()  # release cache
QWhen to cache vs when NOT to cache:
CACHE when:
✓ Used in 2+ actions (count AND write AND show)
✓ Complex computation reused in loop
✓ Iterative algorithms
DON'T CACHE when:
✗ Used once only
✗ Too large for executor memory (eviction thrashing)
✗ Need freshest data (cache can be stale)
QCaching vs Checkpointing — 3 differences:
PropertyCacheCheckpoint
StorageExecutor mem/diskHDFS / S3 (persistent)
LineageKEPTCUT
ScopeApp lifetime onlySurvives restarts
Use whenReuse in same jobLong iterative (cut DAG)

SECTION 8: SPARK CONFIGS FLASH CARDS

QCore memory configs — what do they do?
python — editable
spark.executor.memory        # JVM heap for tasks + storage
spark.executor.memoryOverhead # off-heap (Python UDFs, native)
spark.driver.memory           # Driver JVM heap (collect() lives here)
spark.driver.maxResultSize    # max data sent to Driver (2g default)
spark.memory.fraction         # % of heap for Spark (default 0.6 = 60%)
spark.memory.storageFraction  # % of Spark memory for cache (0.5 = 50%)
QMemory layout for 10g executor:
Total JVM heap = 10g
User memory = 10g × (1 - 0.6) = 4g ← Python, UDFs, non-Spark
Spark memory = 10g × 0.6 = 6g
Storage pool = 6g × 0.5 = 3g ← cache (can borrow from execution)
Execution pool= 6g × 0.5 = 3g ← joins, sorts (can borrow from storage)
Q3 shuffle configs to know:
python — editable
spark.sql.shuffle.partitions = "200"    # number of post-shuffle partitions
spark.shuffle.compress = "true"         # compress shuffle data (default: true)
spark.io.compression.codec = "snappy"  # codec: snappy (fast) vs lz4 vs zstd
QDynamic allocation — why requires shuffle service?

When an Executor is released, it takes its shuffle files with it. The External Shuffle Service (running on YARN NodeManager) keeps shuffle files available after the Executor is gone. Without it, releasing executors breaks downstream stages.

QKryo vs Java serialization:
Java (default): Safe, works with any class, ~3-4x larger output
Kryo: 3-10x smaller, faster, but needs registration for custom classes
Enable: spark.serializer = "org.apache.spark.serializer.KryoSerializer"
When: RDD-heavy jobs with lots of shuffle (most benefit)
DataFrame API already uses Tungsten internal format (less benefit)

SECTION 9: SPARK UI READING FLASH CARDS

QWhere do you look for data skew in Spark UI?

Stages tab → click a Stage → "Task Duration" summary table. If max >> median (e.g., max=500s, median=5s) → skew.

QWhere do you see if a Broadcast Join was used?

SQL tab → click a query → expand physical plan. Look for BroadcastHashJoin node. If you see SortMergeJoin instead → opportunity to add broadcast hint.

QGC time > 10% of task time — what does it mean?

Memory pressure. Too many Java objects in heap → GC runs too long. Fix: reduce executor memory fraction, use G1GC, use Kryo, use serialized storage level for cache, or add more memory.

QWhat is "Exchange" in the physical plan?

A shuffle operation. Each Exchange = data moves across the network. Minimize Exchanges = minimize shuffles = faster job.

QHow to tell if predicate pushdown is working?

SQL tab → physical plan → FileScan parquet node should show PushedFilters: [EqualTo(...)]. If PushedFilters is empty, pushdown is not working.

SECTION 10: SMALL FILES FLASH CARDS

QWhat causes small files problem?
1. partitionBy() with high cardinality (1000 dates × 50 regions = 50,000 files)
2. Each streaming micro-batch writes its own files
3. Filter reduces data to tiny amounts per partition
4. Writing with too many partitions (shuffle.partitions = 200, tiny data)
Q3 solutions for small files:
python — editable
128 MB per file)
n_parts = max(1, estimated_gb * 1024 // 128)
df.repartition(n_parts).write.parquet("output/")

# 3. Delta OPTIMIZE (compact existing small files)
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date)")"># 1. Coalesce before write
df.coalesce(10).write.parquet("output/")

# 2. Repartition by target file size (128 MB per file)
n_parts = max(1, estimated_gb * 1024 // 128)
df.repartition(n_parts).write.parquet("output/")

# 3. Delta OPTIMIZE (compact existing small files)
spark.sql("OPTIMIZE delta.`/path/` ZORDER BY (date)")

PYSPARK ULTRA CHEAT SHEET — ALL 3 DAYS

📐 Architecture Diagram
╔═════════════════════════════════════════════════════════════════════════╗
║                    PYSPARK — ALL 3 DAYS CHEAT SHEET                     ║
╠═════════════════════════════════════════════════════════════════════════╣
║                                                                         ║
║  DAY 1: ARCHITECTURE + RDD                                              ║
║  ─────────────────────────────────────────────────────────────────────  ║
║  Architecture (DCE): Driver + Cluster Manager + Executors               ║
║  DAG → Stages (at shuffle) → Tasks (1 per partition)                    ║
║  Narrow: map,flatMap,filter,union,coalesce → NO shuffle                  ║
║  Wide:   groupBy,join,sort,distinct,repartition → shuffle+new stage      ║
║                                                                         ║
║  RDD Properties (RD-ILP): Resilient|Distributed|Immutable|Lazy|Part     ║
║  map()     → 1-to-1  | flatMap() → 1-to-N (flattens)                   ║
║  mapPartitions() → 1 call per partition (DB conn, model load)           ║
║  reduceByKey  → LOCAL pre-agg + shuffle (PREFER)                        ║
║  groupByKey   → ALL values shuffle (AVOID for aggregation)              ║
║  aggregateByKey → output type ≠ input type (avg = sum/count)            ║
║                                                                         ║
║  Persistence (MDSRO):                                                   ║
║  MEMORY_ONLY > MEMORY_AND_DISK > DISK_ONLY > MEMORY_ONLY_SER            ║
║  cache() = persist(MEMORY_AND_DISK)                                     ║
║  Broadcast = send once per Executor (not per task)                      ║
║  Accumulator = add-only in tasks, read-only on Driver                   ║
║  Checkpoint = HDFS, cuts lineage, survives restart                      ║
║                                                                         ║
║  DAY 2: DATAFRAME + SPARKSQL                                            ║
║  ─────────────────────────────────────────────────────────────────────  ║
║  Read (CPJ-OAD): CSV|Parquet|JSON|ORC|Avro|Delta                       ║
║  Always explicit StructType in prod (no inferSchema)                    ║
║  Multiple files: glob("path/*/"), list, directory, unionByName          ║
║  input_file_name() → track source file                                  ║
║  JDBC: numPartitions + partitionColumn (else 1 thread!)                 ║
║                                                                         ║
║  Window (POF): partitionBy().orderBy().rowsBetween()                    ║
║  ROW_NUMBER: unique | RANK: gaps | DENSE_RANK: no gaps                  ║
║  Top-N: DENSE_RANK | Dedup: ROW_NUMBER = 1                              ║
║  LAG/LEAD: first row=NULL, last row=NULL                                ║
║                                                                         ║
║  UDF Speed: Built-in >> Pandas UDF (Arrow) >> Python UDF                ║
║  Pandas UDF: batch-based, vectorized, Arrow zero-copy, 3-100x faster   ║
║  UDFs are opaque to Catalyst → no optimization!                         ║
║                                                                         ║
║  Joins: inner|left|right|full|cross|left_anti|left_semi                 ║
║  left_anti = NOT IN | left_semi = EXISTS (no right cols)                ║
║  Write modes (OACEI): overwrite|append|ignore|error                     ║
║  partitionBy → directories | bucketBy → hash for join opt               ║
║  explode → drops null rows | explode_outer → keeps null rows            ║
║                                                                         ║
║  DAY 3: OPTIMIZATION                                                    ║
║  ─────────────────────────────────────────────────────────────────────  ║
║  Catalyst (ALPC): Analysis→Logical Opt→Physical→CodeGen                 ║
║  Predicate pushdown: filter early, NO UDFs in filter, no computed cols  ║
║  Column pruning: Parquet reads only referenced columns                  ║
║  Tungsten (OCW): Off-heap|Cache-aware|Whole-stage CodeGen               ║
║                                                                         ║
║  repartition(N): WIDE, any direction, even, by column                   ║
║  coalesce(N):    NARROW, down only, faster, may be uneven               ║
║  shuffle.partitions: tune to 1 per 100-200 MB shuffled data              ║
║                                                                         ║
║  Joins (BSS-BC fastest→slowest):                                        ║
║  BroadcastHash(<threshold/hint) → ShuffleHash → SortMerge              ║
║  → BroadcastNestedLoop(non-equi) → Cartesian(CROSS)                    ║
║  Broadcast hint: large_df.join(broadcast(small_df), "key")             ║
║  SMJ default for large-large equi-joins                                 ║
║                                                                         ║
║  AQE (CDS) — enable: spark.sql.adaptive.enabled=true                   ║
║  C = Coalesce: merge tiny post-shuffle partitions automatically         ║
║  D = Dynamic: SMJ→BHJ at runtime if table shrinks after filter         ║
║  S = Skew: detect+split hot partitions, duplicate other side            ║
║                                                                         ║
║  Skew solutions (LABS):                                                 ║
║  L=AQE auto | A=broadcast hint | B=salt keys | S=2-phase agg           ║
║                                                                         ║
║  Key Configs:                                                           ║
║  executor.memory + memoryOverhead | driver.memory | shuffle.partitions  ║
║  autoBroadcastJoinThreshold=50m | adaptive.enabled=true                 ║
║  dynamicAllocation.enabled=true (needs shuffle.service.enabled=true)   ║
║  serializer=KryoSerializer (for RDD-heavy shuffle jobs)                 ║
║                                                                         ║
╠═════════════════════════════════════════════════════════════════════════╣
║                                                                         ║
║  TOP 10 INTERVIEW TRAPS:                                                ║
║  1. groupByKey → use reduceByKey (pre-aggregation = less shuffle)       ║
║  2. RANK ≠ DENSE_RANK: top-N with ties → DENSE_RANK                    ║
║  3. collect() on large data → Driver OOM                                ║
║  4. coalesce cannot increase partitions (use repartition)               ║
║  5. inferSchema=True → full extra scan (always explicit in prod)        ║
║  6. UDF in filter = no predicate pushdown                               ║
║  7. JDBC without numPartitions = 1 thread (serial read)                 ║
║  8. explode drops null rows (use explode_outer)                         ║
║  9. Accumulator in map() may double-count on retries                    ║
║  10. Dynamic allocation needs External Shuffle Service (or files lost)  ║
║                                                                         ║
╠═════════════════════════════════════════════════════════════════════════╣
║                                                                         ║
║  DEBUGGING SLOW JOBS — WHERE TO LOOK:                                   ║
║  Stages tab → max task >> median → SKEW                                 ║
║  SQL tab → SortMergeJoin on small table → add broadcast hint            ║
║  Executors tab → GC > 10% → memory pressure, tune G1GC                 ║
║  SQL plan → Exchange = shuffle = network cost = minimize                ║
║  SQL plan → no PushedFilters → predicate pushdown broken                ║
║                                                                         ║
╚═════════════════════════════════════════════════════════════════════════╝

QUICK DECISION TABLE — OPTIMIZATION

🧠 Memory Map
SYMPTOM / SCENARIOSOLUTION
──────────────────────────────────────────────────────────────────────────
One task 100x slower than othersDATA SKEW: AQE+skewJoin or salting
SortMergeJoin in plan (table < 100 MB) → ADD: broadcast(small_df) hint
shuffle.partitions=200, data is 1 GBREDUCE to ~10 (or let AQE coalesce)
shuffle.partitions=200, data is 2 TBINCREASE to 2000
Executor OOM on aggregationReplace groupByKey → reduceByKey
Executor OOM on cacheUse MEMORY_AND_DISK, unpersist after
Driver OOMRemove collect(), use write() instead
10,000 small output filescoalesce(N) or Delta OPTIMIZE
Job fails only when data is largeDynamic allocation + AQE
GC time > 10% in Executors tabG1GC + reduce memory fraction
JDBC reads slowly (1 thread) → Add numPartitions+partitionColumn
UDF making query slowReplace with built-in functions
→ Or convert to Pandas UDF
Checkpoint vs Cache for iterative jobBOTH: cache() then checkpoint()
Want AQE to auto-tune partitionsSet shuffle.partitions high (1000)
→ AQE will coalesce down to right size
Long lineage chain (PageRank iteration) → checkpoint() every 10 iterations

PRODUCTION CONFIG TEMPLATE

python — editable
50 MB (default 10 MB)
    # Serialization
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    # Dynamic Allocation
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .config("spark.shuffle.service.enabled", "true") \
    # GC
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
    .getOrCreate()">spark = SparkSession.builder \
    .appName("ProductionETL") \
    # Memory
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memoryOverhead", "1g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    # AQE (most important!)
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "64m") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    # Shuffle
    .config("spark.sql.shuffle.partitions", "400") \  # tune for your data
    .config("spark.io.compression.codec", "snappy") \
    # Joins
    .config("spark.sql.autoBroadcastJoinThreshold", "50m") \  # 50 MB (default 10 MB)
    # Serialization
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    # Dynamic Allocation
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "2") \
    .config("spark.dynamicAllocation.maxExecutors", "50") \
    .config("spark.shuffle.service.enabled", "true") \
    # GC
    .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35") \
    .getOrCreate()