PySpark
PySpark Day 2 — Quick Recall: DataFrame + SparkSQL
PySpark · Section 5 of 8

PySpark Day 2 — Quick Recall: DataFrame + SparkSQL

PySpark Day 2 — Quick Recall: DataFrame + SparkSQL

Must know⚠️Trap🧠Memory map📝One-liner

🧠 MASTER MNEMONICS

🧠 RDD → Low-level, no optimizer, use only when DF can't express logic
DATAFRAME vs RDD vs DATASET"TYPE-OPT"
DataFrameNo type safety, Catalyst optimized, use in PySpark always
DatasetType safe (Scala/Java only), NOT available in Python
RDDLow-level, no optimizer, use only when DF can't express logic
READ FORMATS"CPJ-OAD"
CCSV (inferSchema → false in prod, always specify schema)
PParquet (columnar, default Spark format, predicate pushdown)
JJSON (multiline option for wrapped arrays)
OORC (Hive-native, great for Hive tables)
AAvro (row-based, streaming, schema evolution)
DDelta (ACID, time travel, upserts — production standard)
WINDOW FUNCTION SKELETON"POF"
PpartitionBy (what resets the window)
OorderBy (within partition ordering)
FFrame (ROWS BETWEEN ... — what rows to include)
UDF PERFORMANCE ORDER (fastest → slowest)
Built-in Spark functions >> Pandas UDF (Arrow) >> Python UDF >> RDD operations
WRITE MODES"OACEI"
Ooverwrite (replace entire output)
Aappend (add to existing)
Cignore (skip if exists)
Eerror (fail if exists — default)

SECTION 1: RDD vs DataFrame vs Dataset FLASH CARDS

QWhen do you use RDD over DataFrame?

Use RDD when:
1. Complex functional transformations not expressible in DataFrame
2. Unstructured data (text, binary) where schema doesn't apply
3. Need fine-grained control over partitioning
4. Custom partitioners
Use DataFrame otherwise (Catalyst optimization = much faster)
QWhy is Dataset NOT available in PySpark?

Dataset requires compile-time type checking (JVM generics). Python is dynamically typed — no compile-time types.

QWhat gives DataFrame its performance advantage over RDD?

Catalyst Optimizer (logical/physical plan optimization) + Tungsten Engine (memory management, code generation)

SECTION 2: READING DATA FLASH CARDS

QWhy avoid inferSchema=True in production?

1. Spark does a FULL SCAN of data just to infer types (costs 1 extra read)
2. May infer wrong types (e.g., "2023-01-01" as string, not date)
3. Schema instability — new data can change inferred schema
→ Always use explicit StructType in production
QRead CSV with explicit schema — template:
python — editable
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("amount", DoubleType(), nullable=True)
])
df = spark.read.schema(schema).option("header", True).csv("path/to/*.csv")
QRead multiple files — 4 ways:
python — editable
# 1. Glob pattern
df = spark.read.parquet("s3://bucket/data/year=2024/month=*/")

# 2. Python list
df = spark.read.csv(["file1.csv", "file2.csv", "file3.csv"])

# 3. Directory (reads all files matching format)
df = spark.read.parquet("s3://bucket/data/")

# 4. Multiple sources → union
df1 = spark.read.parquet("s3://bucket/source1/")
df2 = spark.read.parquet("s3://bucket/source2/")
df = df1.unionByName(df2, allowMissingColumns=True)
QHow to track which file each row came from?
python — editable
from pyspark.sql.functions import input_file_name
df = spark.read.parquet("s3://bucket/data/").withColumn("source_file", input_file_name())
QRead Delta table with time travel:
python — editable
# By version
df = spark.read.format("delta").option("versionAsOf", 5).load("path/to/delta")

# By timestamp
df = spark.read.format("delta").option("timestampAsOf", "2024-01-01").load("path/to/delta")
QRead from JDBC (database):
python — editable
df = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/db") \
    .option("dbtable", "schema.table") \
    .option("user", "user").option("password", "pass") \
    .option("numPartitions", 10) \
    .option("partitionColumn", "id") \
    .option("lowerBound", 1).option("upperBound", 1000000) \
    .load()

⚠️ TRAP: Without numPartitions + partitionColumn, JDBC reads as 1 partition (single thread!)

SECTION 3: CORE TRANSFORMATIONS FLASH CARDS

Qselect vs selectExpr — difference?
python — editable
df.select("col1", "col2")                  # column names only
df.select(col("col1"), col("col2") * 2)    # Column expressions
df.selectExpr("col1", "col2 * 2 as doubled", "YEAR(date_col) as yr")  # SQL strings
Qfilter vs where — difference?

None. They are aliases. Both take a condition.

python — editable
df.filter(col("age") > 18)
df.where("age > 18")  # SQL string syntax
QgroupBy + agg — template:
python — editable
from pyspark.sql.functions import sum, avg, count, max, min, countDistinct

df.groupBy("dept", "year") \
  .agg(
      sum("salary").alias("total_salary"),
      avg("salary").alias("avg_salary"),
      countDistinct("employee_id").alias("headcount")
  )
QAll join types — quick reference:
python — editable
# Join types: inner, left, right, full, left_anti, left_semi, cross
df1.join(df2, on="id", how="inner")        # default
df1.join(df2, on="id", how="left")         # all left rows
df1.join(df2, on="id", how="left_anti")    # in df1 NOT in df2
df1.join(df2, on="id", how="left_semi")    # in df1 WHERE exists in df2 (no df2 cols)
df1.join(df2, df1.id == df2.id, how="inner")  # explicit condition (different col names)
Qleft_anti vs left_semi — use cases?
left_anti"Find customers with NO orders" (NOT IN equivalent)
left_semi"Find customers who HAVE orders" (EXISTS equivalent, no order cols)
QwithColumn — create/update column:
python — editable
from pyspark.sql.functions import col, lit, when, round

df.withColumn("tax", col("amount") * 0.18) \
  .withColumn("category", when(col("amount") > 1000, "high").otherwise("low")) \
  .withColumn("pi", lit(3.14159))
QHandle NULLs — 4 ways:
python — editable
df.na.drop()                            # drop rows with ANY null
df.na.drop(subset=["col1", "col2"])     # drop rows with null in these cols
df.na.fill(0)                           # fill all numeric nulls with 0
df.na.fill({"col1": 0, "col2": "N/A"}) # fill per column
df.filter(col("col1").isNotNull())      # filter out nulls
df.filter(col("col1").isNull())         # find nulls

SECTION 4: WINDOW FUNCTIONS FLASH CARDS

QWindow function skeleton — 3 parts (POF):
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, rank, dense_rank, lag, lead, sum, avg

# 1. Define the window spec
w = Window.partitionBy("dept").orderBy(col("salary").desc())

# 2. Apply function
df.withColumn("rank", dense_rank().over(w))
QRANK vs DENSE_RANK vs ROW_NUMBER — differences?
🧠 Memory Map
Values: [100, 100, 90, 80]
ROW_NUMBER: 1, 2, 3, 4unique (use for dedup, partition-first)
RANK: 1, 1, 3, 4gaps (use when ties should skip positions)
DENSE_RANK: 1, 1, 2, 3no gaps (use for Top-N with ties)
Interview: Top-N per groupALWAYS use DENSE_RANK
QRunning total + rolling 7-day average:
python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import sum, avg, col

# Running total
w_running = Window.partitionBy("user_id").orderBy("date") \
                  .rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("running_total", sum("amount").over(w_running))

# Rolling 7-day average (6 previous rows + current)
w_rolling = Window.partitionBy("user_id").orderBy("date") \
                  .rowsBetween(-6, Window.currentRow)
df.withColumn("rolling_7d_avg", avg("amount").over(w_rolling))
QLAG and LEAD — use cases:
python — editable
w = Window.partitionBy("user_id").orderBy("date")

df.withColumn("prev_amount", lag("amount", 1).over(w)) \
  .withColumn("next_amount", lead("amount", 1).over(w)) \
  .withColumn("pct_change",
      (col("amount") - col("prev_amount")) / col("prev_amount") * 100)
  • LAG → MoM/YoY change, detect consecutive events
  • LEAD → "What did user buy next?", time to next event

⚠️ TRAP: First row LAG = null. Last row LEAD = null. Handle with coalesce or isNull.

QWhat is ROWS BETWEEN vs RANGE BETWEEN?
ROWS BETWEENPhysical rows (recommended, predictable)
RANGE BETWEENLogical range based on ORDER BY values (tricky with duplicates)
Always use ROWS BETWEEN unless you specifically need RANGE semantics.

SECTION 5: UDF FLASH CARDS

QPython UDF vs Pandas UDF — key difference?
Python UDF:
Row-by-row Python function
Data serialized: JVM → Python → JVM for EVERY row
No vectorization
Slowest
Pandas UDF (Arrow-based):
Operates on Pandas Series (batch of rows)
Apache Arrow eliminates serialization overhead
Vectorized operations
3-100x faster than Python UDF
Requires pyarrow installed
QPython UDF — template:
python — editable
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def clean_phone(phone):
    return phone.replace("-", "").replace(" ", "") if phone else None

df.withColumn("clean_phone", clean_phone(col("phone")))
QPandas UDF — template:
python — editable
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd

@pandas_udf(DoubleType())
def apply_tax(amount: pd.Series) -> pd.Series:
    return amount * 1.18

df.withColumn("total_with_tax", apply_tax(col("amount")))
QWhen to use UDFs vs built-in functions?
🧠 UDF → Complex business logic that can't be expressed with built-ins
Built-inALWAYS try first (Catalyst can optimize)
UDFComplex business logic that can't be expressed with built-ins
Pandas UDFHeavy computation (ML inference, string parsing) on batches
Performance order (fastest → slowest):
spark.sql functions > Pandas UDF > Python UDF

⚠️ TRAP: UDFs are BLACK BOXES to Catalyst — no optimization, no predicate pushdown.

SECTION 6: WRITING DATA FLASH CARDS

QWrite modes — OACEI:
python — editable
df.write.mode("overwrite").parquet("output/path")   # replace all
df.write.mode("append").parquet("output/path")       # add to existing
df.write.mode("ignore").parquet("output/path")       # skip if exists
df.write.mode("error").parquet("output/path")        # fail if exists (DEFAULT)
QpartitionBy vs bucketBy — difference?
partitionBy:
Creates directory hierarchy: output/year=2024/month=01/
Good for: date-based filtering, Hive partition pruning
Files per partition: can create many small files
Used for: storage/query filtering efficiency
bucketBy:
Distributes data into fixed N buckets by hash
Must saveAsTable (Hive metastore)
Pre-shuffles data for future joins on the bucket column
Used for: eliminating shuffle in repeated joins
df.write.partitionBy("year", "month").parquet("output/")
df.write.bucketBy(50, "customer_id").sortBy("order_date").saveAsTable("orders")
QWrite to Delta with upsert (merge):
python — editable
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, "/path/to/delta")
delta_table.alias("target").merge(
    new_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

SECTION 7: SPARKSQL FLASH CARDS

QcreateTempView vs createOrReplaceTempView vs createGlobalTempView:
python — editable
df.createTempView("orders")              # fails if view exists
df.createOrReplaceTempView("orders")     # overwrites if exists (use this!)
df.createGlobalTempView("orders")        # accessible across SparkSessions
# access global: spark.sql("SELECT * FROM global_temp.orders")

# scope:
# TempView        → only in this SparkSession
# GlobalTempView  → all SparkSessions in same application
QSpark SQL — use cases:
python — editable
result = spark.sql("""
    SELECT dept, year, SUM(salary) as total
    FROM employees
    WHERE year >= 2020
    GROUP BY dept, year
    HAVING SUM(salary) > 1000000
    ORDER BY total DESC
""")
QExplain a query plan:
python — editable
df.explain()          # physical plan only
df.explain(True)      # all plans: parsed, analyzed, optimized, physical
df.explain("cost")    # with cost statistics (AQE)

SECTION 8: NESTED DATA FLASH CARDS

QAccess struct field:
python — editable
df.select("address.city")           # dot notation
df.select(col("address.city"))      # Column API
df.withColumn("city", col("address.city"))
Qexplode vs explode_outer — difference?
python — editable
from pyspark.sql.functions import explode, explode_outer

# explode → drops rows where array is null/empty
df.withColumn("tag", explode(col("tags")))

# explode_outer → keeps rows where array is null/empty (produces NULL row)
df.withColumn("tag", explode_outer(col("tags")))

⚠️ ALWAYS use explode_outer if you need to preserve all rows
QFlatten nested JSON — step by step:
python — editable
# Input: {"user": {"id": 1, "name": "Alice"}, "items": [{"sku": "A1"}, {"sku": "B2"}]}
df.select(
    col("user.id").alias("user_id"),
    col("user.name").alias("user_name"),
    explode("items").alias("item")          # one row per item
).select("user_id", "user_name", "item.sku")
QMap column operations:
python — editable
from pyspark.sql.functions import map_keys, map_values, col

# Access map value by key
df.select(col("metadata")["env"].alias("environment"))

# Get all keys/values
df.select(map_keys("metadata").alias("keys"),
          map_values("metadata").alias("values"))

🧠 DAY 2 ULTRA SUMMARY CARD

📐 Architecture Diagram
╔═══════════════════════════════════════════════════════════════╗
║         PYSPARK DAY 2 — DATAFRAME + SPARKSQL                  ║
╠═══════════════════════════════════════════════════════════════╣
║                                                               ║
║  READ FORMATS (CPJ-OAD):                                      ║
║  CSV(header+schema) | Parquet(col) | JSON(multiline)          ║
║  ORC(Hive) | Avro(stream) | Delta(ACID+time travel)           ║
║  ⚠ Always explicit schema in prod (inferSchema = 1 extra scan) ║
║  ⚠ JDBC needs numPartitions+partitionColumn or 1 thread!      ║
║                                                               ║
║  MULTIPLE FILES:                                              ║
║  Glob: "path/year=2024/month=*/"                              ║
║  List: read.csv(["f1.csv","f2.csv"])                          ║
║  Directory: read.parquet("dir/")                              ║
║  Union: df1.unionByName(df2, allowMissingColumns=True)        ║
║  Track source: input_file_name()                              ║
║                                                               ║
║  WINDOW FUNCTIONS (POF):                                      ║
║  W = Window.partitionBy("col").orderBy("col")                 ║
║  ROW_NUMBER: unique | RANK: gaps | DENSE_RANK: no gaps        ║
║  Top-N → DENSE_RANK | Dedup → ROW_NUMBER = 1                  ║
║  Running total: rowsBetween(unboundedPreceding, currentRow)   ║
║  Rolling N: rowsBetween(-(N-1), currentRow)                   ║
║  LAG/LEAD: first row LAG=NULL, last row LEAD=NULL             ║
║                                                               ║
║  UDF PERFORMANCE: Built-in >> Pandas UDF >> Python UDF        ║
║  Python UDF → row-by-row, JVM↔Python serialization           ║
║  Pandas UDF → Arrow-based batch, vectorized, 3-100x faster    ║
║  ⚠ UDFs are opaque to Catalyst — no optimization!             ║
║                                                               ║
║  JOIN TYPES:                                                  ║
║  inner | left | right | full | cross                          ║
║  left_anti → NOT IN equivalent (in A, NOT in B)               ║
║  left_semi → EXISTS equivalent (in A WHERE exists in B)       ║
║                                                               ║
║  WRITE MODES (OACEI): overwrite | append | ignore | error     ║
║  partitionBy → directory hierarchy, query pruning             ║
║  bucketBy   → hash bucketing, eliminates join shuffles        ║
║  Delta merge → whenMatchedUpdateAll + whenNotMatchedInsertAll  ║
║                                                               ║
║  SPARKSQL:                                                    ║
║  createOrReplaceTempView → session-scoped                     ║
║  createGlobalTempView → app-scoped (global_temp.name)         ║
║  df.explain(True) → see all 4 plan phases                     ║
║                                                               ║
║  NESTED DATA:                                                 ║
║  struct: col("address.city") or "address.city"                ║
║  array:  explode (drops nulls), explode_outer (keeps nulls)   ║
║  map:    col("metadata")["key"] for value access              ║
║                                                               ║
║  TOP TRAPS:                                                   ║
║  1. inferSchema = extra scan (always explicit in prod)        ║
║  2. RANK ≠ DENSE_RANK for top-N with ties                     ║
║  3. UDF black box = no Catalyst optimization                  ║
║  4. explode drops null rows (use explode_outer)               ║
║  5. JDBC default = 1 partition = single thread!               ║
║  6. left_anti ≠ left_semi (know the difference!)             ║
║                                                               ║
╚═══════════════════════════════════════════════════════════════╝

QUICK DECISION TABLE

🧠 TASK → SOLUTION
TASKSOLUTION
────────────────────────────────────────────────────────────────────────
Top-3 products per categoryDENSE_RANK OVER(PARTITION BY cat)
Remove duplicate rows (keep latest) → ROW_NUMBER OVER(PARTITION BY key ORDER BY ts DESC) = 1
Running total per userSUM OVER(PARTITION BY user ORDER BY date ROWS BETWEEN...)
Previous row value (MoM change) → LAG(col, 1) OVER(PARTITION BY user ORDER BY date)
Track which file each row came frominput_file_name()
Combine 2 DFs with different schemasunionByName(allowMissingColumns=True)
Complex business logic in columnPandas UDF (prefer) or Python UDF
Find customers with no ordersleft_anti join
Find customers with at least one orderleft_semi join
Save data partitioned by date for Hivewrite.partitionBy("year","month").parquet()
Pre-shuffle for repeated joinsbucketBy(N, "join_col").saveAsTable()
Upsert/merge into Delta tableDeltaTable.merge().whenMatched().whenNotMatched()
Expand array column to multiple rowsexplode_outer (keeps null rows)
Access nested JSON fieldcol("parent.child") dot notation