PySpark
Day 2: DataFrame + SparkSQL — Deep Interview Guide
PySpark · Section 4 of 8

Day 2: DataFrame + SparkSQL — Deep Interview Guide

Day 2: DataFrame + SparkSQL — Deep Interview Guide

💡 Interview Tip
DataFrames are the primary API for PySpark. Know these inside out. Code questions here are very common — interviewers will ask you to write live.

MASTER MEMORY MAP — Day 2

🧠 SPARKSESSION = "Entry point for EVERYTHING"
RDD vs DataFrame vs Dataset = "Low -> High -> Typed"
RDD: Low-level, no schema, no optimizer, Python/Java/Scala
DataFrame: High-level, schema, Catalyst/Tungsten optimized, SQL-like
Dataset: High-level, TYPED (compile-time safety), JVM only (Scala/Java)
PySpark: Only RDD + DataFrame (no Dataset in Python!)
SPARKSESSION"Entry point for EVERYTHING"
spark.read.* -> read data
spark.sql("...") -> run SQL
spark.sparkContext -> access RDD API
spark.createDataFrame() -> create DF from RDD or list
spark.catalog.* -> manage databases, tables, functions
READING FILES"Format.Option.Schema.Load"
format: csv, json, parquet, orc, delta, jdbc, avro
option: header, inferSchema, delimiter, mode
schema: StructType([StructField(...)]) — ALWAYS explicit in production!
load: .load("/path/") or .(path) shortcut
READ MULTIPLE FILES"Glob/List/Directory"
Glob: spark.read.csv("/data/2024/*") <- all files matching pattern
List: spark.read.csv(["/f1", "/f2"]) <- specific files
Dir: spark.read.parquet("/data/") <- all parquet in dir
TRANSFORMATIONS"SWGJW-NAU"
Sselect() / withColumn()
Wwhere() / filter()
GgroupBy() + agg()
Jjoin()
WwithColumnRenamed() / drop()
Nna.fill() / na.drop()
Aalias() / cast()
Uunion() / unionByName()
WINDOW FUNCTIONS"PARTITION + ORDER + FRAME"
Window.partitionBy("col").orderBy("col")
rowsBetween(Window.unboundedPreceding, Window.currentRow)
COLUMN OPERATIONS"col / lit / when / cast"
col("name") -> reference a column
lit(100) -> create a constant column
when().otherwise() -> conditional logic (CASE WHEN)
cast("double") -> type conversion
NULL HANDLING"isNull / fillna / dropna / coalesce"
isNull() / isNotNull() -> filter nulls
na.fill() / na.drop() -> handle null rows
coalesce(a, b, c) -> first non-null value
STRING FUNCTIONS"concat / trim / regexp / split"
concat, concat_ws, substring, trim, ltrim, rtrim
regexp_replace, regexp_extract, split, lower, upper
DATE FUNCTIONS"current / diff / add / format"
current_date(), current_timestamp()
datediff(), months_between(), date_add(), date_sub()
date_format(), to_date(), to_timestamp()

SECTION 1: RDD vs DataFrame vs Dataset

Definition: A DataFrame is a distributed collection of data organized into named columns, equivalent to a table in a relational database, optimized by Spark's Catalyst optimizer and Tungsten execution engine.

Simple Explanation: Think of an RDD as a raw list of items with no labels. A DataFrame is like an Excel spreadsheet with column headers. A Dataset adds strict data typing (only in Scala/Java).

Real-world Analogy: RDD is a box of unsorted papers. DataFrame is papers organized in a filing cabinet with labeled folders. Dataset is a filing cabinet where each folder only accepts a specific document type.

FeatureRDDDataFrameDataset (Scala/Java)
API LevelLow-levelHigh-level SQLHigh-level typed
SchemaNoYes (column names)Yes (typed case class)
Type SafetyPython runtimeRuntime onlyCOMPILE TIME
Catalyst Optim.NoneFullFull
TungstenNoneFullFull
PerformanceSlowestFastFast
LanguagePython/Scala/JavaAll languagesScala/Java ONLY
Null HandlingManualAutomaticAutomatic
When to useUnstructured dataStructured dataN/A in PySpark
Complex customSQL-like ops
logicMost cases
Pro Tip
Interview Tip: "In PySpark, the choice is RDD vs DataFrame. I use DataFrame by default — it is faster (Catalyst + Tungsten), has a cleaner API, and supports SQL syntax. I use RDD only when the DataFrame API cannot express my logic or for unstructured text/binary data."

What NOT to say: "Dataset is available in PySpark." It is not — Dataset is Scala/Java only. In PySpark, DataFrame IS the Dataset[Row] equivalent.

SECTION 2: SparkSession + DataFrame Creation

What is SparkSession?

Definition: SparkSession is the unified entry point for all Spark functionality — reading data, running SQL, accessing the catalog, and configuring the application.

Simple Explanation: Before you do anything in Spark, you need a SparkSession. It is the single door you walk through to access everything Spark offers.

Pro Tip
Interview Tip: Always mention getOrCreate() — it prevents creating multiple sessions in the same application. Mention that SparkSession replaced the older SparkContext + SQLContext + HiveContext pattern.

What NOT to say: "You need a SparkContext to use DataFrames." SparkSession is the entry point since Spark 2.0. SparkContext is still used internally but you access it via spark.sparkContext.

Creating SparkSession

python — editable
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataPipeline") \
    .master("yarn") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.instances", "10") \
    .config("spark.sql.shuffle.partitions", "400") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) \
    .enableHiveSupport() \
    .getOrCreate()

# getOrCreate() -> creates new session OR returns existing one
# Prevents creating multiple SparkSessions in the same application

DataFrame Creation Methods

Definition: A DataFrame can be created from files (CSV, JSON, Parquet, etc.), databases (JDBC), in-memory data (lists, RDDs), or existing tables.

python — editable
# --- FROM A PYTHON LIST ---
data = [("Alice", 30, "Engineering"), ("Bob", 25, "Marketing")]
columns = ["name", "age", "department"]
df = spark.createDataFrame(data, columns)

# --- FROM AN RDD ---
rdd = spark.sparkContext.parallelize([("Alice", 30), ("Bob", 25)])
df = rdd.toDF(["name", "age"])

# --- FROM A PANDAS DATAFRAME ---
import pandas as pd
pdf = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
df = spark.createDataFrame(pdf)

# --- FROM AN EXISTING TABLE (Hive/Delta metastore) ---
df = spark.table("database_name.table_name")

# --- FROM A RANGE ---
df = spark.range(0, 100, 1)  # single column 'id' with values 0-99
Pro Tip
Interview Tip: When asked "how do you create a DataFrame?", list at least 3 methods: from files, from in-memory data, and from existing tables. Mention spark.createDataFrame() for testing and spark.read for production.

SECTION 3: Schema Definition (StructType, StructField)

Definition: A schema in PySpark defines the structure of a DataFrame — column names, data types, and nullability — using StructType (the table) and StructField (each column).

Simple Explanation: A schema is the blueprint of your data. Just like a building blueprint specifies where every room goes, a schema specifies what every column looks like before any data arrives.

Real-world Analogy: Schema is like the column headers and data validation rules in an Excel template — it says "Column A must be text, Column B must be a number, Column C cannot be blank."

python — editable
from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, LongType, DoubleType, FloatType,
    BooleanType, DateType, TimestampType, ArrayType, MapType
)

# Define schema explicitly (no inferSchema scan needed)
booking_schema = StructType([
    StructField("booking_id",   StringType(),    nullable=False),
    StructField("customer_id",  LongType(),      nullable=True),
    StructField("flight_code",  StringType(),    nullable=True),
    StructField("amount",       DoubleType(),    nullable=True),
    StructField("booking_date", DateType(),      nullable=True),
    StructField("is_cancelled", BooleanType(),   nullable=True),
    # Nested struct:
    StructField("address", StructType([
        StructField("city",    StringType(), True),
        StructField("country", StringType(), True)
    ]), True),
    # Array column:
    StructField("tags", ArrayType(StringType()), True),
    # Map column:
    StructField("metadata", MapType(StringType(), StringType()), True)
])

df = spark.read.schema(booking_schema).csv("/data/bookings.csv", header=True)
AspectinferSchema=TrueExplicit Schema
PerformanceExtra pass over dataNo extra pass
Type accuracyOften wrong (123 as Long)You control every type
Null handlingGuesses nullabilityYou define nullable
Streaming supportNOT supportedRequired
Schema enforcementNoneFail fast on bad data
Production useNEVERALWAYS
Pro Tip
Interview Tip: Always say "In production I define schemas explicitly using StructType. inferSchema triggers an additional read pass over the data and can misdetect types." Mention that streaming sources REQUIRE explicit schemas.

What NOT to say: "I just use inferSchema=True." This signals you have not worked on production pipelines. Also do not say "I use DDL strings for complex schemas" — StructType is the standard.

SECTION 4: Reading Data (CSV, JSON, Parquet, JDBC, Delta)

Pro Tip
Definition: spark.read is the DataFrameReader API that loads data from external storage into a DataFrame, supporting multiple formats and configuration options.

Simple Explanation: spark.read is how you bring outside data into Spark. You specify the format, set options (like whether there is a header row), and point it to the file path.

Reading Single Files — All Formats

python — editable
# --- CSV ---
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("nullValue", "N/A") \
    .option("dateFormat", "yyyy-MM-dd") \
    .csv("/data/bookings.csv")
# WARNING: inferSchema triggers extra read pass — use explicit schema in prod

# --- JSON ---
df_json = spark.read \
    .option("multiLine", "true") \
    .option("mode", "PERMISSIVE") \
    .json("/data/events/*.json")
# mode options: PERMISSIVE (default, nulls bad fields) | DROPMALFORMED | FAILFAST

# --- PARQUET (PREFERRED FORMAT) ---
df_parquet = spark.read.parquet("/data/warehouse/bookings/")
# Schema embedded in file (no inferSchema needed)
# Columnar format with predicate pushdown
# Efficient compression (Snappy by default)

# --- ORC ---
df_orc = spark.read.orc("/data/hive_table/")

# --- JDBC (relational database) ---
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://db-host:5432/mydb") \
    .option("dbtable", "bookings") \
    .option("user", "etl_user") \
    .option("password", "secret") \
    .option("driver", "org.postgresql.Driver") \
    .option("numPartitions", "10") \
    .option("partitionColumn", "booking_id") \
    .option("lowerBound", "1") \
    .option("upperBound", "1000000") \
    .load()

# --- AVRO ---
df_avro = spark.read.format("avro").load("/data/kafka_output/")

# --- DELTA LAKE ---
df_delta = spark.read.format("delta").load("/data/delta/bookings/")
# Or using table name if registered in metastore:
df_delta = spark.table("bookings_db.bookings")
READ MODE COMPARISON TABLE
Mode | Behavior on Bad Records
--------------|----------------------------------------------------
PERMISSIVE | Sets bad fields to null, stores raw in _corrupt_record
DROPMALFORMED | Silently drops bad rows
FAILFAST | Throws exception immediately on bad data

Reading Multiple Files from Multiple Sources

python — editable
# --- GLOB PATTERN (most common) ---
df = spark.read.csv("/data/2024/*/bookings_*.csv", header=True)
# Reads: /data/2024/jan/bookings_1.csv, /data/2024/feb/bookings_2.csv etc.

# --- LIST OF SPECIFIC FILES ---
df = spark.read.parquet(
    "/data/2024/jan/bookings.parquet",
    "/data/2024/feb/bookings.parquet",
    "/data/2024/mar/bookings.parquet"
)
# Or pass as Python list:
files = ["/data/2024/jan/bookings.parquet", "/data/2024/feb/bookings.parquet"]
df = spark.read.parquet(*files)   # unpack list

# --- DIRECTORY (all files in dir with same format) ---
df = spark.read.parquet("/data/2024/")   # reads ALL parquet files in dir

# --- ADD SOURCE FILE NAME COLUMN ---
from pyspark.sql.functions import input_file_name
df = spark.read.option("header", "true") \
    .csv("/data/2024/*/bookings_*.csv") \
    .withColumn("source_file", input_file_name())

# --- READ MULTIPLE FORMATS AND UNION ---
df_csv = spark.read.option("header", True).csv("/landing/batch/")
df_json = spark.read.json("/landing/api/events/")
df_parquet = spark.read.parquet("/landing/warehouse/")

# Align columns then union
cols = ["booking_id", "customer_id", "amount", "booking_date"]
df_all = df_csv.select(cols) \
    .union(df_json.select(cols)) \
    .union(df_parquet.select(cols))

# Better: unionByName (aligns by column name, not position)
df_all = df_csv.unionByName(df_json, allowMissingColumns=True) \
               .unionByName(df_parquet, allowMissingColumns=True)

# --- READ FROM DIFFERENT STORAGE SYSTEMS ---
df_s3 = spark.read.parquet("s3a://my-bucket/data/")
df_adls = spark.read.parquet("abfss://container@storage.dfs.core.windows.net/data/")
df_hdfs = spark.read.parquet("hdfs://namenode:8020/data/")
df_local = spark.read.parquet("file:///local/path/data/")

# Combine multiple storage locations:
df_combined = spark.read.parquet(
    "s3a://bucket/data/2023/",
    "s3a://bucket/data/2024/",
    "hdfs://namenode/archive/2022/"
)
Pro Tip
Interview Tip: When discussing file reading, always mention: (1) explicit schema over inferSchema, (2) Parquet as the preferred format for analytics, (3) partition pruning for performance. For JDBC, mention numPartitions + partitionColumn for parallel reads.

What NOT to say: "I just use spark.read.csv(path) with defaults." This shows you do not understand production considerations like schema enforcement, read modes, or performance tuning.

SECTION 5: DataFrame Transformations

Definition: Transformations are lazy operations that define a computation plan on a DataFrame without executing it. They produce a new DataFrame (DataFrames are immutable).

Simple Explanation: Transformations are instructions you stack up. Nothing actually runs until you call an action (like .show(), .count(), .collect()). Spark waits so it can optimize the entire chain at once.

Real-world Analogy: Writing a recipe (transformations) vs actually cooking it (actions). You write all the steps first, then the chef (Catalyst optimizer) rearranges them for efficiency before cooking.

select

Definition: Projects a set of columns or expressions from a DataFrame, equivalent to SELECT in SQL.

python — editable
df.select("id", "name", "amount")                    # by name
df.select(col("id"), col("amount") * 1.1)            # with expression
df.select("*", (col("amount") * 1.1).alias("new_amount"))  # all + new column

filter / where

Definition: Returns rows that satisfy a given condition. filter() and where() are identical — aliases of each other.

python — editable
df.filter(col("age") > 30)
df.filter("age > 30")                                 # SQL string form
df.where((col("country") == "IN") & (col("amount") > 100))
df.filter(col("status").isin("CONFIRMED", "PENDING"))
df.filter(col("name").startswith("A"))
df.filter(col("name").like("A%"))
df.filter(~col("is_cancelled"))                       # NOT condition
Pro Tip
Interview Tip: Mention that filter() and where() are the same — interviewers sometimes test this. Also mention that Spark pushes filter predicates down to the data source (predicate pushdown) for formats like Parquet and JDBC.

withColumn

Definition: Returns a new DataFrame with a column added or replaced. If the column name already exists, it replaces the column.

python — editable
df.withColumn("tax", col("amount") * 0.18)
df.withColumn("amount", col("amount").cast("double"))  # modify existing column
df.withColumn("status", when(col("amount") > 1000, "high")
                        .when(col("amount") > 100, "medium")
                        .otherwise("low"))
Pro Tip
What NOT to say: "I chain 20 withColumn calls to add columns." Chaining many withColumn calls creates a deep logical plan that slows Catalyst. Use select() with multiple expressions instead for better performance.

drop

Definition: Returns a new DataFrame with specified columns removed.

python — editable
df.drop("col1", "col2")
df.drop(col("temporary_column"))

distinct / dropDuplicates

Definition: distinct() removes duplicate rows considering ALL columns. dropDuplicates() removes duplicates based on a SUBSET of columns.

python — editable
df.distinct()                                     # all columns must match
df.dropDuplicates(["customer_id", "booking_date"])  # subset columns
Pro Tip
Interview Tip: If asked "how do you deduplicate?", mention dropDuplicates for subset-based dedup. For "keep the latest record per key", use window function with row_number() — not dropDuplicates, because dropDuplicates gives you an arbitrary row, not the most recent.

What NOT to say: "I use distinct() to deduplicate by key." distinct() considers ALL columns. If you only want unique keys, use dropDuplicates(["key_col"]).

withColumnRenamed / sort

python — editable
# --- RENAME ---
df.withColumnRenamed("old_name", "new_name")

# --- SORT ---
df.orderBy("amount")                              # ascending
df.orderBy(col("amount").desc())                  # descending
df.orderBy(col("date").asc(), col("amount").desc())  # multi-column

SECTION 6: Column Operations (col, lit, when/otherwise, cast)

Definition: Column operations are functions that create, transform, or reference individual columns within a DataFrame expression.

Simple Explanation: col() points to an existing column. lit() creates a constant value column. when().otherwise() is your IF-ELSE logic. cast() converts data types.

col() and lit()

python — editable
from pyspark.sql.functions import col, lit

# col() — reference an existing column
df.select(col("name"), col("amount") * 2)

# lit() — create a constant value column
df.withColumn("country", lit("India"))
df.withColumn("multiplier", lit(1.18))

# Common mistake: trying to use a Python variable directly
tax_rate = 0.18
# WRONG: df.withColumn("tax", col("amount") * tax_rate)  # works but unclear
# RIGHT: df.withColumn("tax", col("amount") * lit(tax_rate))  # explicit
Pro Tip
Interview Tip: lit() is needed when you want to add a constant column or use a Python variable as a column value. Without lit(), Python values sometimes work due to implicit conversion, but lit() makes intent explicit.

when / otherwise (Conditional Logic)

Definition: when().otherwise() is the PySpark equivalent of SQL's CASE WHEN. It evaluates conditions in order and returns the first matching result.

python — editable
from pyspark.sql.functions import when

# Simple condition
df.withColumn("category",
    when(col("amount") > 1000, "high")
    .when(col("amount") > 100, "medium")
    .otherwise("low")
)

# Multiple conditions combined
df.withColumn("flag",
    when((col("status") == "CANCELLED") & (col("amount") > 500), "refund_priority")
    .when(col("status") == "CANCELLED", "standard_refund")
    .otherwise("no_action")
)

# Nested when for complex logic
df.withColumn("tier",
    when(col("total_spend") > 10000, "platinum")
    .when((col("total_spend") > 5000) & (col("years") > 3), "gold")
    .when(col("total_spend") > 1000, "silver")
    .otherwise("bronze")
)

cast (Type Conversion)

Definition: Converts a column from one data type to another.

python — editable
df.withColumn("amount", col("amount").cast("double"))
df.withColumn("booking_date", col("booking_date").cast("date"))
df.withColumn("amount_int", col("amount").cast(IntegerType()))

# Common cast operations:
# "string" -> "integer", "long", "double", "float", "date", "timestamp", "boolean"

What NOT to say: "I use int() or float() to convert column types." Those are Python functions, not Spark functions. Always use .cast() for DataFrame column type conversion.

SECTION 7: Aggregations (groupBy, agg, sum, avg, count, min, max)

Definition: Aggregation operations group rows by one or more columns and compute summary statistics (count, sum, average, etc.) for each group.

Simple Explanation: Aggregation is like creating a pivot table in Excel — you pick the grouping columns and then calculate totals, averages, or counts for each group.

Real-world Analogy: Counting how many passengers boarded each flight and the total revenue per flight — that is a groupBy("flight_code") with count and sum aggregations.

python — editable
from pyspark.sql.functions import count, sum, avg, max, min, countDistinct, collect_list, collect_set

# --- BASIC GROUPBY + AGG ---
df.groupBy("country") \
  .agg(
      count("*").alias("booking_count"),
      sum("amount").alias("total_amount"),
      avg("amount").alias("avg_amount"),
      max("amount").alias("max_amount"),
      min("amount").alias("min_amount"),
      countDistinct("customer_id").alias("unique_customers")
  )

# --- MULTIPLE GROUPBY COLUMNS ---
df.groupBy("country", "booking_year") \
  .agg(
      count("*").alias("bookings"),
      sum("amount").alias("revenue")
  )

# --- COLLECT VALUES INTO A LIST/SET ---
df.groupBy("customer_id") \
  .agg(
      collect_list("product").alias("all_products"),     # allows duplicates
      collect_set("product").alias("unique_products")    # no duplicates
  )

# --- SHORTCUT METHODS (less flexible but quick) ---
df.groupBy("country").count()
df.groupBy("country").sum("amount")
df.groupBy("country").avg("amount")
df.groupBy("country").max("amount")
FunctionWhat it doesNull behavior
count("*")Counts all rows including nullsCounts everything
count("col")Counts non-null values in columnIgnores nulls
countDistinct()Counts unique non-null valuesIgnores nulls
sum()Sum of valuesIgnores nulls
avg() / mean()Average of valuesIgnores nulls
max() / min()Maximum / minimum valueIgnores nulls
collect_list()Collects into array (with duplicates)Includes nulls
collect_set()Collects into array (unique only)Excludes nulls
first() / last()First / last value in groupDepends on ignorenulls
Pro Tip
Interview Tip: Always mention that count("*") counts all rows including nulls, while count("column_name") skips nulls. This is a common trick question. Also mention that collect_list and collect_set can cause OOM if groups are very large.

What NOT to say: "count() and count(column) are the same." They are not. count("*") includes null rows; count("col") excludes them.

SECTION 8: Joins

Definition: A join combines rows from two DataFrames based on a related column (join key), similar to SQL joins.

Simple Explanation: Joins connect two tables using a shared column. Think of matching employee IDs in a "people" table with the same IDs in a "salary" table to get each person's salary.

Real-world Analogy: You have a guest list (names + invite codes) and a seating chart (invite codes + table numbers). Joining them on invite code gives you name + table number.

python — editable
10 MB (spark.sql.autoBroadcastJoinThreshold)">from pyspark.sql.functions import broadcast, col

# --- INNER JOIN (only matching rows from both sides) ---
result = df1.join(df2, on="customer_id", how="inner")

# --- LEFT JOIN (all rows from left, matching from right, null if no match) ---
result = df1.join(df2, on=["customer_id", "date"], how="left")

# --- RIGHT JOIN (all rows from right, matching from left) ---
result = df1.join(df2, on="customer_id", how="right")

# --- FULL OUTER JOIN (all rows from both, nulls where no match) ---
result = df1.join(df2, on="customer_id", how="full")

# --- CROSS JOIN (cartesian product — every row with every row) ---
result = df1.crossJoin(df2)
# WARNING: N x M rows — use only when intentional (e.g., generating combinations)

# --- LEFT SEMI JOIN (rows in df1 that HAVE a match in df2, NO df2 columns) ---
result = df1.join(df2, on="customer_id", how="left_semi")
# Equivalent to: WHERE customer_id IN (SELECT customer_id FROM df2)

# --- LEFT ANTI JOIN (rows in df1 that do NOT have a match in df2) ---
result = df1.join(df2, on="customer_id", how="left_anti")
# Equivalent to: WHERE customer_id NOT IN (SELECT customer_id FROM df2)

# --- JOIN WITH DIFFERENT COLUMN NAMES ---
result = df1.join(df2, df1["cust_id"] == df2["customer_id"], how="inner")
# After join, drop the duplicate column:
result = result.drop(df2["customer_id"])

# --- BROADCAST JOIN (force small table broadcast — no shuffle) ---
result = large_df.join(broadcast(small_df), on="airport_code")
# Spark sends the small table to every executor (avoids shuffle of large table)
# Default broadcast threshold: 10 MB (spark.sql.autoBroadcastJoinThreshold)
Join TypeLeft rowsRight rowsWhen no matchUse case
innerMatchedMatchedRow droppedOnly common records
leftALLMatchedRight cols = NULLKeep all from primary table
rightMatchedALLLeft cols = NULLKeep all from lookup table
fullALLALLOpposite side = NULLMerge two full datasets
crossALL x ALLALL x ALLN/A (cartesian)Generate all combinations
left_semiMatchedNONERow droppedEXISTS / IN subquery
left_antiUnmatchedNONERow keptNOT EXISTS / NOT IN subquery
Pro Tip
Interview Tip: Semi and anti joins are interview favorites. Explain them as "filter joins" — they filter the left table based on existence in the right table but never add right-side columns. Always mention broadcast joins for small-large table joins.

What NOT to say: "Semi join returns columns from both tables." It only returns columns from the LEFT table. Also never say "I always use inner join" — show awareness of when to use left/anti/semi.

Pro Tip
Follow-up they will ask: "What happens with duplicate keys in a join?" Answer: rows multiply. If key "A" appears 3 times in left and 2 times in right, inner join produces 6 rows for key "A".

SECTION 9: Window Functions

Definition: Window functions perform calculations across a set of rows (a "window") that are related to the current row, without collapsing rows like GROUP BY does.

Simple Explanation: GROUP BY gives you one row per group. Window functions give you one result per ROW, but that result is computed from a group of related rows. You keep all your original rows.

Real-world Analogy: In a race, GROUP BY tells you "the fastest time per age group." A window function tells you "each runner's rank within their age group" — every runner keeps their row, but now has a rank attached.

Defining a Window

python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    row_number, rank, dense_rank, lag, lead,
    sum, avg, max, min, count,
    ntile, percent_rank, cume_dist,
    first, last
)

# --- DEFINE A WINDOW ---
windowSpec = Window \
    .partitionBy("department") \
    .orderBy(col("salary").desc())

Ranking Functions (row_number, rank, dense_rank)

Definition: Ranking functions assign a position number to each row within a partition based on the ordering.

python — editable
df = df.withColumn("row_num", row_number().over(windowSpec))  # 1,2,3,4,5 (no ties)
df = df.withColumn("rank",    rank().over(windowSpec))         # 1,2,2,4,5 (ties skip)
df = df.withColumn("drank",   dense_rank().over(windowSpec))   # 1,2,2,3,4 (ties no skip)
Salaryrow_numberrankdense_rank
100111
90222
90322
80443
70554
Pro Tip
Interview Tip: The most common interview question is "get top N per group." Always use row_number() or dense_rank() with a window function, filter by rank, then drop the rank column. Know the difference: row_number breaks ties arbitrarily, rank skips numbers after ties, dense_rank never skips.

What NOT to say: "I use GROUP BY with LIMIT to get top N per group." That does not work — LIMIT applies globally, not per group. You MUST use a window function.

TOP N Per Group Pattern

python — editable
# Top 2 highest-paid employees per department
window = Window.partitionBy("department").orderBy(col("salary").desc())

top2 = df.withColumn("rank", dense_rank().over(window)) \
         .filter(col("rank") <= 2) \
         .drop("rank")

lag / lead (Access Previous/Next Row)

Definition: lag() accesses a value from a previous row; lead() accesses a value from the next row, within the window partition.

python — editable
lag_spec = Window.partitionBy("customer_id").orderBy("purchase_date")

df = df.withColumn("prev_purchase", lag("amount", 1, 0).over(lag_spec))
#                                   column, offset, default_if_null

df = df.withColumn("next_purchase", lead("amount", 1).over(lag_spec))

# Month-over-month change
df = df.withColumn("mom_change",
    round((col("revenue") - lag("revenue", 1).over(lag_spec))
          / lag("revenue", 1).over(lag_spec) * 100, 2))

Running Totals and Rolling Averages

Definition: A running total accumulates values from the beginning of the partition to the current row. A rolling average computes the average over a fixed sliding window of rows.

python — editable
# --- RUNNING TOTAL ---
running_spec = Window.partitionBy("region") \
    .orderBy("sale_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df = df.withColumn("running_total", sum("revenue").over(running_spec))

# --- ROLLING 7-DAY AVERAGE ---
rolling_spec = Window.partitionBy("user_id") \
    .orderBy("event_date") \
    .rowsBetween(-6, 0)  # current + 6 before = 7 rows

df = df.withColumn("rolling_7day_avg", avg("daily_count").over(rolling_spec))
Frame TypeSyntaxBased on
ROWSrowsBetween(-2, 0)Physical row positions
RANGErangeBetween(-7, 0)Logical value range
UnboundedWindow.unboundedPrecedingFrom start of partition
CurrentWindow.currentRowCurrent row

ntile / percentile / first / last

python — editable
# --- NTILE (divide into N equal buckets) ---
quartile_spec = Window.orderBy("spend")
df = df.withColumn("quartile", ntile(4).over(quartile_spec))
df = df.withColumn("percentile", percent_rank().over(quartile_spec))

# --- FIRST / LAST ---
first_spec = Window.partitionBy("customer_id").orderBy("purchase_date")
df = df.withColumn("first_purchase_amount",
    first("amount", ignorenulls=True).over(first_spec))

What NOT to say: "Window functions are the same as GROUP BY." They are fundamentally different — GROUP BY collapses rows, window functions preserve every row. Also do not say "rowsBetween and rangeBetween are the same" — rows is physical position, range is logical value.

SECTION 10: Temp Views and Running SQL

Definition: A temporary view registers a DataFrame as a named table that can be queried with SQL. Session-scoped views are visible only within the current SparkSession; global views are visible across all sessions in the application.

Simple Explanation: Creating a temp view is like giving your DataFrame a table name so you can write SQL queries against it. The data is not copied — it is just a reference.

python — editable
# --- SESSION-SCOPED VIEW (most common) ---
df.createOrReplaceTempView("bookings")

# --- GLOBAL VIEW (visible across sessions) ---
df.createOrReplaceGlobalTempView("bookings")
# Access global views with: global_temp.bookings

# --- RUNNING SQL ---
result = spark.sql("""
    SELECT
        country,
        COUNT(*) as booking_count,
        SUM(amount) as total_revenue,
        ROUND(AVG(amount), 2) as avg_booking
    FROM bookings
    WHERE booking_date >= '2024-01-01'
      AND is_cancelled = false
    GROUP BY country
    HAVING COUNT(*) > 100
    ORDER BY total_revenue DESC
""")

# --- SQL WITH WINDOW FUNCTIONS ---
spark.sql("""
    SELECT *,
        ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank
    FROM employees
""")

# --- MIXING SQL AND DATAFRAME API ---
spark.sql("SELECT * FROM bookings WHERE amount > 1000").groupBy("country").count()
Pro Tip
Interview Tip: Mention that createOrReplaceTempView is session-scoped and disappears when the session ends. For shared views across notebooks in Databricks, use createOrReplaceGlobalTempView accessed via the global_temp database.

What NOT to say: "Temp views persist after the application ends." They do not — they live only as long as the SparkSession. Also do not confuse temp views with managed/external tables which persist in the metastore.

SECTION 11: UDFs (User-Defined Functions)

Definition: A UDF is a custom function that extends Spark's built-in functions, allowing you to apply arbitrary Python/Scala logic to DataFrame columns.

Simple Explanation: When Spark's built-in functions cannot do what you need, you write your own function and register it as a UDF. But UDFs are slow because data must be serialized between the JVM and Python.

Real-world Analogy: Built-in Spark functions are like the kitchen tools already in a restaurant. A UDF is bringing your own tool from home — it works, but it slows everything down because the kitchen was not designed for it.

Regular Python UDF

python — editable
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, ArrayType

# --- REGULAR PYTHON UDF ---
def clean_phone(phone):
    """Remove non-digit chars from phone number"""
    import re
    return re.sub(r'\D', '', phone) if phone else None

# Register as UDF
clean_phone_udf = udf(clean_phone, StringType())

# Use with DataFrame API:
df = df.withColumn("clean_phone", clean_phone_udf(col("phone")))

# Register for SQL use:
spark.udf.register("clean_phone", clean_phone, StringType())
spark.sql("SELECT clean_phone(phone) FROM customers")

UDF with Decorator

python — editable
@udf(returnType=DoubleType())
def calculate_tax(amount, rate):
    return amount * rate if amount and rate else 0.0

df = df.withColumn("tax", calculate_tax(col("amount"), col("tax_rate")))

Pandas UDF (Vectorized — Much Faster)

python — editable
# Uses Apache Arrow for zero-copy data transfer
# Processes data in COLUMNAR BATCHES (not row by row)
# 3x to 100x faster than regular Python UDF!

from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def normalize_name(names: pd.Series) -> pd.Series:
    """Vectorized: processes entire column as pandas Series"""
    return names.str.strip().str.title().fillna("Unknown")

df = df.withColumn("clean_name", normalize_name(col("name")))

# Pandas UDF with multiple columns:
@pandas_udf(DoubleType())
def calculate_discount(amount: pd.Series, tier: pd.Series) -> pd.Series:
    discount_map = {"gold": 0.15, "silver": 0.10, "bronze": 0.05}
    discount = tier.map(discount_map).fillna(0)
    return amount * discount

df = df.withColumn("discount", calculate_discount(col("amount"), col("tier")))
TypeSpeedWhy
Built-in Spark functionsFastestNo serialization, Catalyst optimized, codegen
Pandas UDF (vectorized)FastArrow-based batch transfer, columnar
Regular Python UDFSlowestRow-by-row serialization, Python<->JVM overhead
Pro Tip
Interview Tip: When asked about UDFs, immediately mention the performance penalty and say "I always try to use built-in Spark SQL functions first. If I must use a UDF, I use Pandas UDF for vectorized processing." Explain WHY they are slow: data serialization between JVM and Python interpreter.

What NOT to say: "UDFs are fine for production at scale." They should be a last resort. Also do not say "Pandas UDFs and regular UDFs have the same performance" — Pandas UDFs are 3x-100x faster due to Apache Arrow.

SECTION 12: Writing Files

Definition: df.write is the DataFrameWriter API that saves a DataFrame to external storage in various formats with configurable write modes and partitioning.

python — editable
# --- WRITE MODES ---
# overwrite: replace existing data entirely
# append: add to existing data
# ignore: don't write if destination already exists
# error / errorIfExists: fail if destination exists (default)

df.write.mode("overwrite").parquet("/output/bookings/")

# --- WRITE PARTITIONED (Hive-style partitioning) ---
df.write \
  .mode("overwrite") \
  .partitionBy("booking_year", "booking_month") \
  .parquet("/output/bookings_partitioned/")
# Creates: /output/bookings_partitioned/booking_year=2024/booking_month=01/part-00000.parquet

# --- WRITE BUCKETED (for joins -- avoids shuffle!) ---
df.write \
  .mode("overwrite") \
  .bucketBy(32, "customer_id") \
  .sortBy("customer_id") \
  .saveAsTable("default.bookings_bucketed")
# When two tables bucketed by same column + same N buckets: JOIN has NO SHUFFLE!

# --- WRITE AS DELTA ---
df.write \
  .format("delta") \
  .mode("overwrite") \
  .save("/delta/bookings/")

# --- WRITE CSV WITH OPTIONS ---
df.write \
  .option("header", "true") \
  .option("delimiter", "|") \
  .mode("overwrite") \
  .csv("/output/bookings.csv")

# --- WRITE TO JDBC ---
df.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://db:5432/mydb") \
  .option("dbtable", "bookings") \
  .option("user", "user") \
  .option("password", "pass") \
  .mode("append") \
  .save()

# --- CONTROL NUMBER OF OUTPUT FILES ---
df.repartition(10).write.parquet("/output/")     # exactly 10 files
df.coalesce(1).write.csv("/output/single_file/") # single file (avoid for large data)
ModeIf path existsIf path does NOT exist
overwriteReplaces all dataCreates new
appendAdds to existing dataCreates new
ignoreDoes nothing (no error)Creates new
error (default)Throws errorCreates new
Pro Tip
Interview Tip: Always mention partitionBy for write optimization — it enables partition pruning on reads. Mention that bucketBy only works with saveAsTable, not save(). For controlling output file count, use repartition() before write.

What NOT to say: "I use coalesce(1) to write a single file in production." This forces all data through one partition/core and is extremely slow for large datasets. Only acceptable for small lookup files.

SECTION 13: Handling NULL Values

Definition: NULL represents a missing or unknown value. PySpark provides isNull(), isNotNull(), na.fill(), na.drop(), and coalesce() for null handling.

Simple Explanation: NULLs are empty cells in your data. You need to decide: drop the row, fill with a default value, or handle them in your logic. Ignoring nulls causes silent bugs.

Real-world Analogy: A survey form with blank answers. You can throw away incomplete forms (dropna), write "N/A" in blank fields (fillna), or use the first available answer from a backup list (coalesce).

python — editable
from pyspark.sql.functions import col, coalesce, lit, isnull

# --- FINDING NULLS ---
df.filter(col("amount").isNull())                  # rows where amount is null
df.filter(col("amount").isNotNull())               # rows where amount is not null

# --- DROPPING ROWS WITH NULLS ---
df.na.drop()                                       # drop rows with ANY null
df.na.drop(how="all")                              # drop only if ALL columns null
df.na.drop(subset=["customer_id", "amount"])       # only check these columns
df.na.drop(how="any", thresh=3)                    # keep rows with at least 3 non-null values

# --- FILLING NULLS ---
df.na.fill(0)                                      # fill all numeric nulls with 0
df.na.fill("")                                     # fill all string nulls with empty string
df.na.fill({"amount": 0, "country": "UNKNOWN"})    # per-column fill

# --- COALESCE (first non-null value from multiple columns) ---
df.withColumn("phone",
    coalesce(col("mobile_phone"), col("home_phone"), col("work_phone"), lit("N/A"))
)

# --- SAFE DIVISION (avoid null/zero errors) ---
from pyspark.sql.functions import when
df.withColumn("rate",
    when(col("impressions") != 0,
         col("clicks") / col("impressions"))
    .otherwise(None))

# --- NULL-SAFE EQUALITY (<=>) ---
# Regular ==: NULL == NULL returns NULL (not True!)
# Null-safe <=>: NULL <=> NULL returns True
df.filter(col("a").eqNullSafe(col("b")))
NULL BEHAVIOR IN OPERATIONS
Operation | NULL behavior
--------------------|-------------------------------------------
NULL + 5 | NULL (any arithmetic with NULL = NULL)
NULL→ NULL | NULL (not True!)
NULL <=> NULL | True (null-safe equality)
count("*") | Counts rows including nulls
count("col") | Skips null values
sum/avg/max/min | Ignores nulls
ORDER BY | NULLs are LAST by default (ascending)
GROUP BY | NULL is treated as a group
JOIN on NULL keys | NULL keys do NOT match (NULL != NULL)
💡 Insight
Interview Tip: The key insight is that NULL == NULL returns NULL, not True. This means NULL join keys never match. Always mention eqNullSafe or <=> for null-safe comparisons. In aggregations, nulls are silently ignored — know this for accurate counts.

What NOT to say: "NULL equals NULL in Spark." It does not. NULL == NULL evaluates to NULL, which is falsy. This is standard SQL three-valued logic.

SECTION 14: String Functions

Definition: PySpark provides built-in string functions for text manipulation including concatenation, trimming, pattern matching, and extraction.

python — editable
from pyspark.sql.functions import (
    concat, concat_ws, substring, trim, ltrim, rtrim,
    lower, upper, initcap, length, lpad, rpad,
    regexp_replace, regexp_extract, split,
    instr, locate, translate, reverse,
    col, lit
)

# --- CONCATENATION ---
df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
df.withColumn("full_name", concat_ws(" ", col("first_name"), col("middle"), col("last_name")))
# concat_ws skips NULLs; concat returns NULL if any input is NULL

# --- CASE CONVERSION ---
df.withColumn("upper_name", upper(col("name")))
df.withColumn("lower_name", lower(col("name")))
df.withColumn("title_name", initcap(col("name")))  # "alice smith" -> "Alice Smith"

# --- TRIMMING ---
df.withColumn("clean", trim(col("name")))           # both sides
df.withColumn("clean", ltrim(col("name")))          # left only
df.withColumn("clean", rtrim(col("name")))          # right only

# --- SUBSTRING ---
df.withColumn("area_code", substring(col("phone"), 1, 3))  # first 3 chars (1-indexed)

# --- REGEX REPLACE ---
df.withColumn("clean_phone", regexp_replace(col("phone"), r"[^0-9]", ""))
df.withColumn("no_special", regexp_replace(col("text"), r"[^a-zA-Z0-9 ]", ""))

# --- REGEX EXTRACT ---
df.withColumn("domain", regexp_extract(col("email"), r"@(.+)", 1))
# Group 1 extracts the domain from "user@example.com" -> "example.com"

# --- SPLIT ---
df.withColumn("parts", split(col("full_name"), " "))       # returns array
df.withColumn("first", split(col("full_name"), " ")[0])    # first element
df.withColumn("last", split(col("full_name"), " ")[1])     # second element

# --- PADDING ---
df.withColumn("padded_id", lpad(col("id"), 10, "0"))   # "42" -> "0000000042"

# --- LENGTH ---
df.withColumn("name_len", length(col("name")))
FunctionExample InputOutput
upper("hello")"hello""HELLO"
lower("HELLO")"HELLO""hello"
initcap("hello")"hello world""Hello World"
trim(" hi ")" hi ""hi"
substring(s,1,3)"abcdef""abc"
length("hello")"hello"5
lpad("42",5,"0")"42""00042"
concat_wsNULL handlingSkips nulls
concatNULL handlingReturns NULL if any null
Pro Tip
Interview Tip: Know the difference between concat and concat_ws: concat returns NULL if ANY input is NULL, while concat_ws (with separator) skips NULLs. This is frequently asked.

What NOT to say: "I use Python string operations in a UDF for text manipulation." Always use built-in Spark string functions — they are optimized by Catalyst and avoid serialization overhead.

SECTION 15: Date Functions

Definition: PySpark provides built-in functions for date/timestamp creation, extraction, arithmetic, and formatting.

python — editable
from pyspark.sql.functions import (
    current_date, current_timestamp,
    datediff, months_between, date_add, date_sub,
    date_format, to_date, to_timestamp,
    year, month, dayofmonth, dayofweek, dayofyear,
    hour, minute, second, weekofyear, quarter,
    last_day, next_day, trunc, date_trunc,
    col, lit
)

# --- CURRENT DATE/TIMESTAMP ---
df.withColumn("today", current_date())
df.withColumn("now", current_timestamp())

# --- STRING TO DATE/TIMESTAMP ---
df.withColumn("parsed_date", to_date(col("date_str"), "yyyy-MM-dd"))
df.withColumn("parsed_ts", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
# Common formats: "yyyy-MM-dd", "MM/dd/yyyy", "dd-MMM-yyyy", "yyyyMMdd"

# --- DATE FORMATTING ---
df.withColumn("formatted", date_format(col("booking_date"), "MMM dd, yyyy"))
# "2024-03-15" -> "Mar 15, 2024"
df.withColumn("year_month", date_format(col("booking_date"), "yyyy-MM"))

# --- DATE ARITHMETIC ---
df.withColumn("next_week", date_add(col("booking_date"), 7))
df.withColumn("last_week", date_sub(col("booking_date"), 7))
df.withColumn("days_diff", datediff(col("end_date"), col("start_date")))
df.withColumn("months_diff", months_between(col("end_date"), col("start_date")))

# --- EXTRACTING PARTS ---
df.withColumn("year", year(col("booking_date")))
df.withColumn("month", month(col("booking_date")))
df.withColumn("day", dayofmonth(col("booking_date")))
df.withColumn("dow", dayofweek(col("booking_date")))     # 1=Sunday, 7=Saturday
df.withColumn("quarter", quarter(col("booking_date")))
df.withColumn("week", weekofyear(col("booking_date")))

# --- TRUNCATION ---
df.withColumn("month_start", trunc(col("booking_date"), "month"))   # first day of month
df.withColumn("year_start", trunc(col("booking_date"), "year"))     # first day of year
df.withColumn("hour_start", date_trunc("hour", col("timestamp_col")))

# --- LAST DAY OF MONTH ---
df.withColumn("month_end", last_day(col("booking_date")))

# --- NEXT SPECIFIC DAY ---
df.withColumn("next_monday", next_day(col("booking_date"), "Monday"))
FunctionExampleOutput
current_date()-2024-03-15
datediff(end, start)(Mar 15, Mar 10)5
date_add(date, 7)(Mar 15, 7)Mar 22
months_between(end, start)(Jun 15, Mar 15)3.0
year(date)Mar 15, 20242024
date_format(date, "MMM yyyy")2024-03-15"Mar 2024"
to_date(str, "yyyy-MM-dd")"2024-03-15"Date object
trunc(date, "month")2024-03-152024-03-01
last_day(date)2024-03-152024-03-31
Pro Tip
Interview Tip: Know that datediff(end, start) takes the end date FIRST, which is counterintuitive. Also know that dayofweek returns 1 for Sunday (not Monday), which is a common gotcha.

What NOT to say: "I use Python datetime in a UDF for date calculations." Always use built-in Spark date functions. They are Catalyst-optimized and handle distributed data correctly.

SECTION 16: Nested Data — Struct, Array, Map

Definition: PySpark supports complex nested data types: StructType (nested objects), ArrayType (lists), and MapType (key-value pairs), commonly found in JSON data.

Simple Explanation: Real-world data is not always flat tables. JSON from APIs has nested objects (structs), lists (arrays), and dictionaries (maps). PySpark can read, query, and flatten all of these.

python — editable
from pyspark.sql.functions import explode, explode_outer, col, flatten, map_keys, map_values

# --- NESTED STRUCT ---
# Schema: address STRUCT<city STRING, country STRING, zip STRING>
df.select(
    col("id"),
    col("address.city").alias("city"),
    col("address.country").alias("country"),
    col("address.zip").alias("zip")
)

# --- ARRAY COLUMN ---
# Schema: tags ARRAY<STRING>
# explode: creates one row per element (NULL arrays -> zero rows)
df.withColumn("tag", explode(col("tags"))) \
  .select("id", "tag")

# explode_outer: creates one row per element (NULL arrays -> one row with NULL)
df.withColumn("tag", explode_outer(col("tags"))) \
  .select("id", "tag")

# size: number of elements in array
from pyspark.sql.functions import size
df.withColumn("tag_count", size(col("tags")))

# contains: check if array contains value
from pyspark.sql.functions import array_contains
df.filter(array_contains(col("tags"), "business"))

# flatten: flatten nested arrays
from pyspark.sql.functions import flatten
df.withColumn("flat_tags", flatten(col("nested_tags")))

# --- MAP COLUMN ---
# Schema: metadata MAP<STRING, STRING>
# Get value by key:
df.withColumn("status", col("metadata")["status"])
df.withColumn("status", col("metadata").getItem("status"))

# Explode map into key-value rows:
df.withColumn("kv_pair", explode(col("metadata"))) \
  .select("id", col("kv_pair.key"), col("kv_pair.value"))

# --- READING NESTED JSON ---
json_data = """
{"booking_id": "B001", "customer": {"name": "Alice", "email": "a@b.com"}, "tags": ["biz", "premium"]}
"""
df = spark.read.json(sc.parallelize([json_data]))

# Flatten it:
df_flat = df.select(
    col("booking_id"),
    col("customer.name").alias("customer_name"),
    col("customer.email").alias("customer_email"),
    explode(col("tags")).alias("tag")
)
FunctionNULL array inputEmpty array inputOutput
explodeDrops rowDrops rowOne row per element
explode_outerKeeps row (NULL)Drops rowOne row per element
posexplodeDrops rowDrops rowRow + position index
Pro Tip
Interview Tip: Always mention the difference between explode and explode_outerexplode drops null arrays silently, which can cause data loss. Use explode_outer when you want to preserve rows with null arrays.

What NOT to say: "I use UDFs to parse nested JSON." Spark handles nested JSON natively with dot notation for structs and explode for arrays. UDFs are unnecessary and slow for this.

SECTION 17: Scenario-Based Interview Questions

Scenario 1: Reading 50 CSV files from multiple folders at once

What the interviewer is testing: Can you read multiple files efficiently with proper schema management and file tracking?

python — editable
# Requirement: read all booking CSVs from 2023 and 2024 directories
# Add source file name column to track origin

from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

schema = StructType([
    StructField("booking_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("booking_date", DateType(), True)
])

df = spark.read \
    .schema(schema) \
    .option("header", "true") \
    .csv(
        "/data/bookings/2023/*/bookings_*.csv",
        "/data/bookings/2024/*/bookings_*.csv"
    ) \
    .withColumn("source_file", input_file_name())

print(f"Total records: {df.count()}")
print(f"Files loaded: {df.select('source_file').distinct().count()}")

Follow-up they will ask: "What if some files have extra columns?" Use unionByName(allowMissingColumns=True) or define a superset schema.

Scenario 2: Deduplicate CDC records keeping latest

What the interviewer is testing: Do you know the window function dedup pattern? This is the single most common PySpark interview coding question.

python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# CDC table: same customer_id may appear multiple times (updates over time)
# Keep only the MOST RECENT record per customer_id

window = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())

df_deduped = df \
    .withColumn("rn", row_number().over(window)) \
    .filter(col("rn") == 1) \
    .drop("rn")

# In Databricks/Delta Lake: MERGE INTO handles this as SCD Type 1

Follow-up they will ask: "Why row_number() and not dense_rank()?" Because row_number() guarantees exactly 1 row per partition (breaks ties), while dense_rank() can return multiple rows if timestamps are identical.

Scenario 3: Self-join to find pairs

What the interviewer is testing: Can you handle self-joins and avoid duplicate/self pairs?

python — editable
# Find all flight booking pairs for the same customer on the same day
bookings1 = df.alias("b1")
bookings2 = df.alias("b2")

pairs = bookings1.join(bookings2,
    (col("b1.customer_id") == col("b2.customer_id")) &
    (col("b1.booking_date") == col("b2.booking_date")) &
    (col("b1.booking_id") < col("b2.booking_id"))   # avoid duplicates and self-pairs
).select(
    col("b1.booking_id").alias("booking_1"),
    col("b2.booking_id").alias("booking_2"),
    col("b1.customer_id"),
    col("b1.booking_date")
)

Follow-up they will ask: "Why < and not !=?" Using < ensures each pair appears only once (A,B but not B,A), while != would give both (A,B) and (B,A).

Scenario 4: Find customers who bought in January but NOT in February

What the interviewer is testing: Do you know anti joins?

python — editable
jan_customers = df.filter(month(col("purchase_date")) == 1).select("customer_id").distinct()
feb_customers = df.filter(month(col("purchase_date")) == 2).select("customer_id").distinct()

# Anti join: customers in Jan who are NOT in Feb
jan_only = jan_customers.join(feb_customers, on="customer_id", how="left_anti")

Follow-up they will ask: "How would you do this in SQL?"

sql
SELECT DISTINCT customer_id
FROM purchases
WHERE MONTH(purchase_date) = 1
  AND customer_id NOT IN (
      SELECT DISTINCT customer_id FROM purchases WHERE MONTH(purchase_date) = 2
  )

Scenario 5: Running total of revenue by date per region

What the interviewer is testing: Window functions with frame specification.

python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import sum

window = Window.partitionBy("region") \
    .orderBy("sale_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_running = df.withColumn("running_revenue", sum("revenue").over(window))

Follow-up they will ask: "What if you want a 30-day rolling sum instead of a running total?" Change the frame to .rowsBetween(-29, 0) for row-based, or use rangeBetween with days cast to numeric for date-based windows.

Scenario 6: Replace nulls with the previous non-null value (forward fill)

What the interviewer is testing: Advanced window function usage.

python — editable
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col

window = Window.partitionBy("sensor_id") \
    .orderBy("timestamp") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_filled = df.withColumn("value_filled",
    last("value", ignorenulls=True).over(window)
)

Quick Reference: DataFrame API vs SQL Equivalents

DataFrame API | SQL Equivalent
---------------------------------------|----------------------------------
df.select("a", "b") | SELECT a, b FROM table
df.filter(col("a") > 10) | WHERE a > 10
df.groupBy("a").agg(count("*")) | GROUP BY a ... COUNT(*)
df.orderBy(col("a").desc()) | ORDER BY a DESC
df.limit(10) | LIMIT 10
df.distinct() | SELECT DISTINCT *
df.join(df2, on="key", how="left") | LEFT JOIN df2 ON key = key
df.withColumn("b", col("a") * 2) | SELECT *, a * 2 AS b
df.drop("col") | (no direct equivalent, use SELECT)
df.union(df2) | UNION ALL
df.na.fill(0) | COALESCE(col, 0)
df.filter(col("a").isNull()) | WHERE a IS NULL