Day 2: DataFrame + SparkSQL — Deep Interview Guide
MASTER MEMORY MAP — Day 2
SECTION 1: RDD vs DataFrame vs Dataset
Definition: A DataFrame is a distributed collection of data organized into named columns, equivalent to a table in a relational database, optimized by Spark's Catalyst optimizer and Tungsten execution engine.
Simple Explanation: Think of an RDD as a raw list of items with no labels. A DataFrame is like an Excel spreadsheet with column headers. A Dataset adds strict data typing (only in Scala/Java).
Real-world Analogy: RDD is a box of unsorted papers. DataFrame is papers organized in a filing cabinet with labeled folders. Dataset is a filing cabinet where each folder only accepts a specific document type.
| Feature | RDD | DataFrame | Dataset (Scala/Java) |
|---|---|---|---|
| API Level | Low-level | High-level SQL | High-level typed |
| Schema | No | Yes (column names) | Yes (typed case class) |
| Type Safety | Python runtime | Runtime only | COMPILE TIME |
| Catalyst Optim. | None | Full | Full |
| Tungsten | None | Full | Full |
| Performance | Slowest | Fast | Fast |
| Language | Python/Scala/Java | All languages | Scala/Java ONLY |
| Null Handling | Manual | Automatic | Automatic |
| When to use | Unstructured data | Structured data | N/A in PySpark |
| Complex custom | SQL-like ops | ||
| logic | Most cases |
What NOT to say: "Dataset is available in PySpark." It is not — Dataset is Scala/Java only. In PySpark, DataFrame IS the Dataset[Row] equivalent.
SECTION 2: SparkSession + DataFrame Creation
What is SparkSession?
Definition: SparkSession is the unified entry point for all Spark functionality — reading data, running SQL, accessing the catalog, and configuring the application.
Simple Explanation: Before you do anything in Spark, you need a SparkSession. It is the single door you walk through to access everything Spark offers.
getOrCreate() — it prevents creating multiple sessions in the same application. Mention that SparkSession replaced the older SparkContext + SQLContext + HiveContext pattern.What NOT to say: "You need a SparkContext to use DataFrames." SparkSession is the entry point since Spark 2.0. SparkContext is still used internally but you access it via spark.sparkContext.
Creating SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("DataPipeline") \
.master("yarn") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.executor.instances", "10") \
.config("spark.sql.shuffle.partitions", "400") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.autoBroadcastJoinThreshold", str(50 * 1024 * 1024)) \
.enableHiveSupport() \
.getOrCreate()
# getOrCreate() -> creates new session OR returns existing one
# Prevents creating multiple SparkSessions in the same application
DataFrame Creation Methods
Definition: A DataFrame can be created from files (CSV, JSON, Parquet, etc.), databases (JDBC), in-memory data (lists, RDDs), or existing tables.
# --- FROM A PYTHON LIST ---
data = [("Alice", 30, "Engineering"), ("Bob", 25, "Marketing")]
columns = ["name", "age", "department"]
df = spark.createDataFrame(data, columns)
# --- FROM AN RDD ---
rdd = spark.sparkContext.parallelize([("Alice", 30), ("Bob", 25)])
df = rdd.toDF(["name", "age"])
# --- FROM A PANDAS DATAFRAME ---
import pandas as pd
pdf = pd.DataFrame({"name": ["Alice", "Bob"], "age": [30, 25]})
df = spark.createDataFrame(pdf)
# --- FROM AN EXISTING TABLE (Hive/Delta metastore) ---
df = spark.table("database_name.table_name")
# --- FROM A RANGE ---
df = spark.range(0, 100, 1) # single column 'id' with values 0-99
spark.createDataFrame() for testing and spark.read for production.SECTION 3: Schema Definition (StructType, StructField)
Definition: A schema in PySpark defines the structure of a DataFrame — column names, data types, and nullability — using StructType (the table) and StructField (each column).
Simple Explanation: A schema is the blueprint of your data. Just like a building blueprint specifies where every room goes, a schema specifies what every column looks like before any data arrives.
Real-world Analogy: Schema is like the column headers and data validation rules in an Excel template — it says "Column A must be text, Column B must be a number, Column C cannot be blank."
from pyspark.sql.types import (
StructType, StructField,
StringType, IntegerType, LongType, DoubleType, FloatType,
BooleanType, DateType, TimestampType, ArrayType, MapType
)
# Define schema explicitly (no inferSchema scan needed)
booking_schema = StructType([
StructField("booking_id", StringType(), nullable=False),
StructField("customer_id", LongType(), nullable=True),
StructField("flight_code", StringType(), nullable=True),
StructField("amount", DoubleType(), nullable=True),
StructField("booking_date", DateType(), nullable=True),
StructField("is_cancelled", BooleanType(), nullable=True),
# Nested struct:
StructField("address", StructType([
StructField("city", StringType(), True),
StructField("country", StringType(), True)
]), True),
# Array column:
StructField("tags", ArrayType(StringType()), True),
# Map column:
StructField("metadata", MapType(StringType(), StringType()), True)
])
df = spark.read.schema(booking_schema).csv("/data/bookings.csv", header=True)
| Aspect | inferSchema=True | Explicit Schema |
|---|---|---|
| Performance | Extra pass over data | No extra pass |
| Type accuracy | Often wrong (123 as Long) | You control every type |
| Null handling | Guesses nullability | You define nullable |
| Streaming support | NOT supported | Required |
| Schema enforcement | None | Fail fast on bad data |
| Production use | NEVER | ALWAYS |
What NOT to say: "I just use inferSchema=True." This signals you have not worked on production pipelines. Also do not say "I use DDL strings for complex schemas" — StructType is the standard.
SECTION 4: Reading Data (CSV, JSON, Parquet, JDBC, Delta)
spark.read is the DataFrameReader API that loads data from external storage into a DataFrame, supporting multiple formats and configuration options.Simple Explanation: spark.read is how you bring outside data into Spark. You specify the format, set options (like whether there is a header row), and point it to the file path.
Reading Single Files — All Formats
# --- CSV ---
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.option("nullValue", "N/A") \
.option("dateFormat", "yyyy-MM-dd") \
.csv("/data/bookings.csv")
# WARNING: inferSchema triggers extra read pass — use explicit schema in prod
# --- JSON ---
df_json = spark.read \
.option("multiLine", "true") \
.option("mode", "PERMISSIVE") \
.json("/data/events/*.json")
# mode options: PERMISSIVE (default, nulls bad fields) | DROPMALFORMED | FAILFAST
# --- PARQUET (PREFERRED FORMAT) ---
df_parquet = spark.read.parquet("/data/warehouse/bookings/")
# Schema embedded in file (no inferSchema needed)
# Columnar format with predicate pushdown
# Efficient compression (Snappy by default)
# --- ORC ---
df_orc = spark.read.orc("/data/hive_table/")
# --- JDBC (relational database) ---
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://db-host:5432/mydb") \
.option("dbtable", "bookings") \
.option("user", "etl_user") \
.option("password", "secret") \
.option("driver", "org.postgresql.Driver") \
.option("numPartitions", "10") \
.option("partitionColumn", "booking_id") \
.option("lowerBound", "1") \
.option("upperBound", "1000000") \
.load()
# --- AVRO ---
df_avro = spark.read.format("avro").load("/data/kafka_output/")
# --- DELTA LAKE ---
df_delta = spark.read.format("delta").load("/data/delta/bookings/")
# Or using table name if registered in metastore:
df_delta = spark.table("bookings_db.bookings")
Reading Multiple Files from Multiple Sources
# --- GLOB PATTERN (most common) ---
df = spark.read.csv("/data/2024/*/bookings_*.csv", header=True)
# Reads: /data/2024/jan/bookings_1.csv, /data/2024/feb/bookings_2.csv etc.
# --- LIST OF SPECIFIC FILES ---
df = spark.read.parquet(
"/data/2024/jan/bookings.parquet",
"/data/2024/feb/bookings.parquet",
"/data/2024/mar/bookings.parquet"
)
# Or pass as Python list:
files = ["/data/2024/jan/bookings.parquet", "/data/2024/feb/bookings.parquet"]
df = spark.read.parquet(*files) # unpack list
# --- DIRECTORY (all files in dir with same format) ---
df = spark.read.parquet("/data/2024/") # reads ALL parquet files in dir
# --- ADD SOURCE FILE NAME COLUMN ---
from pyspark.sql.functions import input_file_name
df = spark.read.option("header", "true") \
.csv("/data/2024/*/bookings_*.csv") \
.withColumn("source_file", input_file_name())
# --- READ MULTIPLE FORMATS AND UNION ---
df_csv = spark.read.option("header", True).csv("/landing/batch/")
df_json = spark.read.json("/landing/api/events/")
df_parquet = spark.read.parquet("/landing/warehouse/")
# Align columns then union
cols = ["booking_id", "customer_id", "amount", "booking_date"]
df_all = df_csv.select(cols) \
.union(df_json.select(cols)) \
.union(df_parquet.select(cols))
# Better: unionByName (aligns by column name, not position)
df_all = df_csv.unionByName(df_json, allowMissingColumns=True) \
.unionByName(df_parquet, allowMissingColumns=True)
# --- READ FROM DIFFERENT STORAGE SYSTEMS ---
df_s3 = spark.read.parquet("s3a://my-bucket/data/")
df_adls = spark.read.parquet("abfss://container@storage.dfs.core.windows.net/data/")
df_hdfs = spark.read.parquet("hdfs://namenode:8020/data/")
df_local = spark.read.parquet("file:///local/path/data/")
# Combine multiple storage locations:
df_combined = spark.read.parquet(
"s3a://bucket/data/2023/",
"s3a://bucket/data/2024/",
"hdfs://namenode/archive/2022/"
)
numPartitions + partitionColumn for parallel reads.What NOT to say: "I just use spark.read.csv(path) with defaults." This shows you do not understand production considerations like schema enforcement, read modes, or performance tuning.
SECTION 5: DataFrame Transformations
Definition: Transformations are lazy operations that define a computation plan on a DataFrame without executing it. They produce a new DataFrame (DataFrames are immutable).
Simple Explanation: Transformations are instructions you stack up. Nothing actually runs until you call an action (like .show(), .count(), .collect()). Spark waits so it can optimize the entire chain at once.
Real-world Analogy: Writing a recipe (transformations) vs actually cooking it (actions). You write all the steps first, then the chef (Catalyst optimizer) rearranges them for efficiency before cooking.
select
Definition: Projects a set of columns or expressions from a DataFrame, equivalent to SELECT in SQL.
df.select("id", "name", "amount") # by name
df.select(col("id"), col("amount") * 1.1) # with expression
df.select("*", (col("amount") * 1.1).alias("new_amount")) # all + new column
filter / where
Definition: Returns rows that satisfy a given condition. filter() and where() are identical — aliases of each other.
df.filter(col("age") > 30)
df.filter("age > 30") # SQL string form
df.where((col("country") == "IN") & (col("amount") > 100))
df.filter(col("status").isin("CONFIRMED", "PENDING"))
df.filter(col("name").startswith("A"))
df.filter(col("name").like("A%"))
df.filter(~col("is_cancelled")) # NOT condition
filter() and where() are the same — interviewers sometimes test this. Also mention that Spark pushes filter predicates down to the data source (predicate pushdown) for formats like Parquet and JDBC.withColumn
Definition: Returns a new DataFrame with a column added or replaced. If the column name already exists, it replaces the column.
df.withColumn("tax", col("amount") * 0.18)
df.withColumn("amount", col("amount").cast("double")) # modify existing column
df.withColumn("status", when(col("amount") > 1000, "high")
.when(col("amount") > 100, "medium")
.otherwise("low"))
select() with multiple expressions instead for better performance.drop
Definition: Returns a new DataFrame with specified columns removed.
df.drop("col1", "col2")
df.drop(col("temporary_column"))
distinct / dropDuplicates
Definition: distinct() removes duplicate rows considering ALL columns. dropDuplicates() removes duplicates based on a SUBSET of columns.
df.distinct() # all columns must match
df.dropDuplicates(["customer_id", "booking_date"]) # subset columns
dropDuplicates for subset-based dedup. For "keep the latest record per key", use window function with row_number() — not dropDuplicates, because dropDuplicates gives you an arbitrary row, not the most recent.What NOT to say: "I use distinct() to deduplicate by key." distinct() considers ALL columns. If you only want unique keys, use dropDuplicates(["key_col"]).
withColumnRenamed / sort
# --- RENAME ---
df.withColumnRenamed("old_name", "new_name")
# --- SORT ---
df.orderBy("amount") # ascending
df.orderBy(col("amount").desc()) # descending
df.orderBy(col("date").asc(), col("amount").desc()) # multi-column
SECTION 6: Column Operations (col, lit, when/otherwise, cast)
Definition: Column operations are functions that create, transform, or reference individual columns within a DataFrame expression.
Simple Explanation: col() points to an existing column. lit() creates a constant value column. when().otherwise() is your IF-ELSE logic. cast() converts data types.
col() and lit()
from pyspark.sql.functions import col, lit
# col() — reference an existing column
df.select(col("name"), col("amount") * 2)
# lit() — create a constant value column
df.withColumn("country", lit("India"))
df.withColumn("multiplier", lit(1.18))
# Common mistake: trying to use a Python variable directly
tax_rate = 0.18
# WRONG: df.withColumn("tax", col("amount") * tax_rate) # works but unclear
# RIGHT: df.withColumn("tax", col("amount") * lit(tax_rate)) # explicit
lit() is needed when you want to add a constant column or use a Python variable as a column value. Without lit(), Python values sometimes work due to implicit conversion, but lit() makes intent explicit.when / otherwise (Conditional Logic)
Definition: when().otherwise() is the PySpark equivalent of SQL's CASE WHEN. It evaluates conditions in order and returns the first matching result.
from pyspark.sql.functions import when
# Simple condition
df.withColumn("category",
when(col("amount") > 1000, "high")
.when(col("amount") > 100, "medium")
.otherwise("low")
)
# Multiple conditions combined
df.withColumn("flag",
when((col("status") == "CANCELLED") & (col("amount") > 500), "refund_priority")
.when(col("status") == "CANCELLED", "standard_refund")
.otherwise("no_action")
)
# Nested when for complex logic
df.withColumn("tier",
when(col("total_spend") > 10000, "platinum")
.when((col("total_spend") > 5000) & (col("years") > 3), "gold")
.when(col("total_spend") > 1000, "silver")
.otherwise("bronze")
)
cast (Type Conversion)
Definition: Converts a column from one data type to another.
df.withColumn("amount", col("amount").cast("double"))
df.withColumn("booking_date", col("booking_date").cast("date"))
df.withColumn("amount_int", col("amount").cast(IntegerType()))
# Common cast operations:
# "string" -> "integer", "long", "double", "float", "date", "timestamp", "boolean"
What NOT to say: "I use int() or float() to convert column types." Those are Python functions, not Spark functions. Always use .cast() for DataFrame column type conversion.
SECTION 7: Aggregations (groupBy, agg, sum, avg, count, min, max)
Definition: Aggregation operations group rows by one or more columns and compute summary statistics (count, sum, average, etc.) for each group.
Simple Explanation: Aggregation is like creating a pivot table in Excel — you pick the grouping columns and then calculate totals, averages, or counts for each group.
Real-world Analogy: Counting how many passengers boarded each flight and the total revenue per flight — that is a groupBy("flight_code") with count and sum aggregations.
from pyspark.sql.functions import count, sum, avg, max, min, countDistinct, collect_list, collect_set
# --- BASIC GROUPBY + AGG ---
df.groupBy("country") \
.agg(
count("*").alias("booking_count"),
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
max("amount").alias("max_amount"),
min("amount").alias("min_amount"),
countDistinct("customer_id").alias("unique_customers")
)
# --- MULTIPLE GROUPBY COLUMNS ---
df.groupBy("country", "booking_year") \
.agg(
count("*").alias("bookings"),
sum("amount").alias("revenue")
)
# --- COLLECT VALUES INTO A LIST/SET ---
df.groupBy("customer_id") \
.agg(
collect_list("product").alias("all_products"), # allows duplicates
collect_set("product").alias("unique_products") # no duplicates
)
# --- SHORTCUT METHODS (less flexible but quick) ---
df.groupBy("country").count()
df.groupBy("country").sum("amount")
df.groupBy("country").avg("amount")
df.groupBy("country").max("amount")
| Function | What it does | Null behavior |
|---|---|---|
| count("*") | Counts all rows including nulls | Counts everything |
| count("col") | Counts non-null values in column | Ignores nulls |
| countDistinct() | Counts unique non-null values | Ignores nulls |
| sum() | Sum of values | Ignores nulls |
| avg() / mean() | Average of values | Ignores nulls |
| max() / min() | Maximum / minimum value | Ignores nulls |
| collect_list() | Collects into array (with duplicates) | Includes nulls |
| collect_set() | Collects into array (unique only) | Excludes nulls |
| first() / last() | First / last value in group | Depends on ignorenulls |
count("*") counts all rows including nulls, while count("column_name") skips nulls. This is a common trick question. Also mention that collect_list and collect_set can cause OOM if groups are very large.What NOT to say: "count() and count(column) are the same." They are not. count("*") includes null rows; count("col") excludes them.
SECTION 8: Joins
Definition: A join combines rows from two DataFrames based on a related column (join key), similar to SQL joins.
Simple Explanation: Joins connect two tables using a shared column. Think of matching employee IDs in a "people" table with the same IDs in a "salary" table to get each person's salary.
Real-world Analogy: You have a guest list (names + invite codes) and a seating chart (invite codes + table numbers). Joining them on invite code gives you name + table number.
10 MB (spark.sql.autoBroadcastJoinThreshold)">from pyspark.sql.functions import broadcast, col
# --- INNER JOIN (only matching rows from both sides) ---
result = df1.join(df2, on="customer_id", how="inner")
# --- LEFT JOIN (all rows from left, matching from right, null if no match) ---
result = df1.join(df2, on=["customer_id", "date"], how="left")
# --- RIGHT JOIN (all rows from right, matching from left) ---
result = df1.join(df2, on="customer_id", how="right")
# --- FULL OUTER JOIN (all rows from both, nulls where no match) ---
result = df1.join(df2, on="customer_id", how="full")
# --- CROSS JOIN (cartesian product — every row with every row) ---
result = df1.crossJoin(df2)
# WARNING: N x M rows — use only when intentional (e.g., generating combinations)
# --- LEFT SEMI JOIN (rows in df1 that HAVE a match in df2, NO df2 columns) ---
result = df1.join(df2, on="customer_id", how="left_semi")
# Equivalent to: WHERE customer_id IN (SELECT customer_id FROM df2)
# --- LEFT ANTI JOIN (rows in df1 that do NOT have a match in df2) ---
result = df1.join(df2, on="customer_id", how="left_anti")
# Equivalent to: WHERE customer_id NOT IN (SELECT customer_id FROM df2)
# --- JOIN WITH DIFFERENT COLUMN NAMES ---
result = df1.join(df2, df1["cust_id"] == df2["customer_id"], how="inner")
# After join, drop the duplicate column:
result = result.drop(df2["customer_id"])
# --- BROADCAST JOIN (force small table broadcast — no shuffle) ---
result = large_df.join(broadcast(small_df), on="airport_code")
# Spark sends the small table to every executor (avoids shuffle of large table)
# Default broadcast threshold: 10 MB (spark.sql.autoBroadcastJoinThreshold)
| Join Type | Left rows | Right rows | When no match | Use case |
|---|---|---|---|---|
| inner | Matched | Matched | Row dropped | Only common records |
| left | ALL | Matched | Right cols = NULL | Keep all from primary table |
| right | Matched | ALL | Left cols = NULL | Keep all from lookup table |
| full | ALL | ALL | Opposite side = NULL | Merge two full datasets |
| cross | ALL x ALL | ALL x ALL | N/A (cartesian) | Generate all combinations |
| left_semi | Matched | NONE | Row dropped | EXISTS / IN subquery |
| left_anti | Unmatched | NONE | Row kept | NOT EXISTS / NOT IN subquery |
What NOT to say: "Semi join returns columns from both tables." It only returns columns from the LEFT table. Also never say "I always use inner join" — show awareness of when to use left/anti/semi.
SECTION 9: Window Functions
Definition: Window functions perform calculations across a set of rows (a "window") that are related to the current row, without collapsing rows like GROUP BY does.
Simple Explanation: GROUP BY gives you one row per group. Window functions give you one result per ROW, but that result is computed from a group of related rows. You keep all your original rows.
Real-world Analogy: In a race, GROUP BY tells you "the fastest time per age group." A window function tells you "each runner's rank within their age group" — every runner keeps their row, but now has a rank attached.
Defining a Window
from pyspark.sql.window import Window
from pyspark.sql.functions import (
row_number, rank, dense_rank, lag, lead,
sum, avg, max, min, count,
ntile, percent_rank, cume_dist,
first, last
)
# --- DEFINE A WINDOW ---
windowSpec = Window \
.partitionBy("department") \
.orderBy(col("salary").desc())
Ranking Functions (row_number, rank, dense_rank)
Definition: Ranking functions assign a position number to each row within a partition based on the ordering.
df = df.withColumn("row_num", row_number().over(windowSpec)) # 1,2,3,4,5 (no ties)
df = df.withColumn("rank", rank().over(windowSpec)) # 1,2,2,4,5 (ties skip)
df = df.withColumn("drank", dense_rank().over(windowSpec)) # 1,2,2,3,4 (ties no skip)
| Salary | row_number | rank | dense_rank |
|---|---|---|---|
| 100 | 1 | 1 | 1 |
| 90 | 2 | 2 | 2 |
| 90 | 3 | 2 | 2 |
| 80 | 4 | 4 | 3 |
| 70 | 5 | 5 | 4 |
row_number() or dense_rank() with a window function, filter by rank, then drop the rank column. Know the difference: row_number breaks ties arbitrarily, rank skips numbers after ties, dense_rank never skips.What NOT to say: "I use GROUP BY with LIMIT to get top N per group." That does not work — LIMIT applies globally, not per group. You MUST use a window function.
TOP N Per Group Pattern
# Top 2 highest-paid employees per department
window = Window.partitionBy("department").orderBy(col("salary").desc())
top2 = df.withColumn("rank", dense_rank().over(window)) \
.filter(col("rank") <= 2) \
.drop("rank")
lag / lead (Access Previous/Next Row)
Definition: lag() accesses a value from a previous row; lead() accesses a value from the next row, within the window partition.
lag_spec = Window.partitionBy("customer_id").orderBy("purchase_date")
df = df.withColumn("prev_purchase", lag("amount", 1, 0).over(lag_spec))
# column, offset, default_if_null
df = df.withColumn("next_purchase", lead("amount", 1).over(lag_spec))
# Month-over-month change
df = df.withColumn("mom_change",
round((col("revenue") - lag("revenue", 1).over(lag_spec))
/ lag("revenue", 1).over(lag_spec) * 100, 2))
Running Totals and Rolling Averages
Definition: A running total accumulates values from the beginning of the partition to the current row. A rolling average computes the average over a fixed sliding window of rows.
# --- RUNNING TOTAL ---
running_spec = Window.partitionBy("region") \
.orderBy("sale_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df = df.withColumn("running_total", sum("revenue").over(running_spec))
# --- ROLLING 7-DAY AVERAGE ---
rolling_spec = Window.partitionBy("user_id") \
.orderBy("event_date") \
.rowsBetween(-6, 0) # current + 6 before = 7 rows
df = df.withColumn("rolling_7day_avg", avg("daily_count").over(rolling_spec))
| Frame Type | Syntax | Based on |
|---|---|---|
| ROWS | rowsBetween(-2, 0) | Physical row positions |
| RANGE | rangeBetween(-7, 0) | Logical value range |
| Unbounded | Window.unboundedPreceding | From start of partition |
| Current | Window.currentRow | Current row |
ntile / percentile / first / last
# --- NTILE (divide into N equal buckets) ---
quartile_spec = Window.orderBy("spend")
df = df.withColumn("quartile", ntile(4).over(quartile_spec))
df = df.withColumn("percentile", percent_rank().over(quartile_spec))
# --- FIRST / LAST ---
first_spec = Window.partitionBy("customer_id").orderBy("purchase_date")
df = df.withColumn("first_purchase_amount",
first("amount", ignorenulls=True).over(first_spec))
What NOT to say: "Window functions are the same as GROUP BY." They are fundamentally different — GROUP BY collapses rows, window functions preserve every row. Also do not say "rowsBetween and rangeBetween are the same" — rows is physical position, range is logical value.
SECTION 10: Temp Views and Running SQL
Definition: A temporary view registers a DataFrame as a named table that can be queried with SQL. Session-scoped views are visible only within the current SparkSession; global views are visible across all sessions in the application.
Simple Explanation: Creating a temp view is like giving your DataFrame a table name so you can write SQL queries against it. The data is not copied — it is just a reference.
# --- SESSION-SCOPED VIEW (most common) ---
df.createOrReplaceTempView("bookings")
# --- GLOBAL VIEW (visible across sessions) ---
df.createOrReplaceGlobalTempView("bookings")
# Access global views with: global_temp.bookings
# --- RUNNING SQL ---
result = spark.sql("""
SELECT
country,
COUNT(*) as booking_count,
SUM(amount) as total_revenue,
ROUND(AVG(amount), 2) as avg_booking
FROM bookings
WHERE booking_date >= '2024-01-01'
AND is_cancelled = false
GROUP BY country
HAVING COUNT(*) > 100
ORDER BY total_revenue DESC
""")
# --- SQL WITH WINDOW FUNCTIONS ---
spark.sql("""
SELECT *,
ROW_NUMBER() OVER (PARTITION BY department ORDER BY salary DESC) as rank
FROM employees
""")
# --- MIXING SQL AND DATAFRAME API ---
spark.sql("SELECT * FROM bookings WHERE amount > 1000").groupBy("country").count()
createOrReplaceTempView is session-scoped and disappears when the session ends. For shared views across notebooks in Databricks, use createOrReplaceGlobalTempView accessed via the global_temp database.What NOT to say: "Temp views persist after the application ends." They do not — they live only as long as the SparkSession. Also do not confuse temp views with managed/external tables which persist in the metastore.
SECTION 11: UDFs (User-Defined Functions)
Definition: A UDF is a custom function that extends Spark's built-in functions, allowing you to apply arbitrary Python/Scala logic to DataFrame columns.
Simple Explanation: When Spark's built-in functions cannot do what you need, you write your own function and register it as a UDF. But UDFs are slow because data must be serialized between the JVM and Python.
Real-world Analogy: Built-in Spark functions are like the kitchen tools already in a restaurant. A UDF is bringing your own tool from home — it works, but it slows everything down because the kitchen was not designed for it.
Regular Python UDF
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, ArrayType
# --- REGULAR PYTHON UDF ---
def clean_phone(phone):
"""Remove non-digit chars from phone number"""
import re
return re.sub(r'\D', '', phone) if phone else None
# Register as UDF
clean_phone_udf = udf(clean_phone, StringType())
# Use with DataFrame API:
df = df.withColumn("clean_phone", clean_phone_udf(col("phone")))
# Register for SQL use:
spark.udf.register("clean_phone", clean_phone, StringType())
spark.sql("SELECT clean_phone(phone) FROM customers")
UDF with Decorator
@udf(returnType=DoubleType())
def calculate_tax(amount, rate):
return amount * rate if amount and rate else 0.0
df = df.withColumn("tax", calculate_tax(col("amount"), col("tax_rate")))
Pandas UDF (Vectorized — Much Faster)
# Uses Apache Arrow for zero-copy data transfer
# Processes data in COLUMNAR BATCHES (not row by row)
# 3x to 100x faster than regular Python UDF!
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def normalize_name(names: pd.Series) -> pd.Series:
"""Vectorized: processes entire column as pandas Series"""
return names.str.strip().str.title().fillna("Unknown")
df = df.withColumn("clean_name", normalize_name(col("name")))
# Pandas UDF with multiple columns:
@pandas_udf(DoubleType())
def calculate_discount(amount: pd.Series, tier: pd.Series) -> pd.Series:
discount_map = {"gold": 0.15, "silver": 0.10, "bronze": 0.05}
discount = tier.map(discount_map).fillna(0)
return amount * discount
df = df.withColumn("discount", calculate_discount(col("amount"), col("tier")))
| Type | Speed | Why |
|---|---|---|
| Built-in Spark functions | Fastest | No serialization, Catalyst optimized, codegen |
| Pandas UDF (vectorized) | Fast | Arrow-based batch transfer, columnar |
| Regular Python UDF | Slowest | Row-by-row serialization, Python<->JVM overhead |
What NOT to say: "UDFs are fine for production at scale." They should be a last resort. Also do not say "Pandas UDFs and regular UDFs have the same performance" — Pandas UDFs are 3x-100x faster due to Apache Arrow.
SECTION 12: Writing Files
Definition: df.write is the DataFrameWriter API that saves a DataFrame to external storage in various formats with configurable write modes and partitioning.
# --- WRITE MODES ---
# overwrite: replace existing data entirely
# append: add to existing data
# ignore: don't write if destination already exists
# error / errorIfExists: fail if destination exists (default)
df.write.mode("overwrite").parquet("/output/bookings/")
# --- WRITE PARTITIONED (Hive-style partitioning) ---
df.write \
.mode("overwrite") \
.partitionBy("booking_year", "booking_month") \
.parquet("/output/bookings_partitioned/")
# Creates: /output/bookings_partitioned/booking_year=2024/booking_month=01/part-00000.parquet
# --- WRITE BUCKETED (for joins -- avoids shuffle!) ---
df.write \
.mode("overwrite") \
.bucketBy(32, "customer_id") \
.sortBy("customer_id") \
.saveAsTable("default.bookings_bucketed")
# When two tables bucketed by same column + same N buckets: JOIN has NO SHUFFLE!
# --- WRITE AS DELTA ---
df.write \
.format("delta") \
.mode("overwrite") \
.save("/delta/bookings/")
# --- WRITE CSV WITH OPTIONS ---
df.write \
.option("header", "true") \
.option("delimiter", "|") \
.mode("overwrite") \
.csv("/output/bookings.csv")
# --- WRITE TO JDBC ---
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://db:5432/mydb") \
.option("dbtable", "bookings") \
.option("user", "user") \
.option("password", "pass") \
.mode("append") \
.save()
# --- CONTROL NUMBER OF OUTPUT FILES ---
df.repartition(10).write.parquet("/output/") # exactly 10 files
df.coalesce(1).write.csv("/output/single_file/") # single file (avoid for large data)
| Mode | If path exists | If path does NOT exist |
|---|---|---|
| overwrite | Replaces all data | Creates new |
| append | Adds to existing data | Creates new |
| ignore | Does nothing (no error) | Creates new |
| error (default) | Throws error | Creates new |
partitionBy for write optimization — it enables partition pruning on reads. Mention that bucketBy only works with saveAsTable, not save(). For controlling output file count, use repartition() before write.What NOT to say: "I use coalesce(1) to write a single file in production." This forces all data through one partition/core and is extremely slow for large datasets. Only acceptable for small lookup files.
SECTION 13: Handling NULL Values
Definition: NULL represents a missing or unknown value. PySpark provides isNull(), isNotNull(), na.fill(), na.drop(), and coalesce() for null handling.
Simple Explanation: NULLs are empty cells in your data. You need to decide: drop the row, fill with a default value, or handle them in your logic. Ignoring nulls causes silent bugs.
Real-world Analogy: A survey form with blank answers. You can throw away incomplete forms (dropna), write "N/A" in blank fields (fillna), or use the first available answer from a backup list (coalesce).
from pyspark.sql.functions import col, coalesce, lit, isnull
# --- FINDING NULLS ---
df.filter(col("amount").isNull()) # rows where amount is null
df.filter(col("amount").isNotNull()) # rows where amount is not null
# --- DROPPING ROWS WITH NULLS ---
df.na.drop() # drop rows with ANY null
df.na.drop(how="all") # drop only if ALL columns null
df.na.drop(subset=["customer_id", "amount"]) # only check these columns
df.na.drop(how="any", thresh=3) # keep rows with at least 3 non-null values
# --- FILLING NULLS ---
df.na.fill(0) # fill all numeric nulls with 0
df.na.fill("") # fill all string nulls with empty string
df.na.fill({"amount": 0, "country": "UNKNOWN"}) # per-column fill
# --- COALESCE (first non-null value from multiple columns) ---
df.withColumn("phone",
coalesce(col("mobile_phone"), col("home_phone"), col("work_phone"), lit("N/A"))
)
# --- SAFE DIVISION (avoid null/zero errors) ---
from pyspark.sql.functions import when
df.withColumn("rate",
when(col("impressions") != 0,
col("clicks") / col("impressions"))
.otherwise(None))
# --- NULL-SAFE EQUALITY (<=>) ---
# Regular ==: NULL == NULL returns NULL (not True!)
# Null-safe <=>: NULL <=> NULL returns True
df.filter(col("a").eqNullSafe(col("b")))
NULL == NULL returns NULL, not True. This means NULL join keys never match. Always mention eqNullSafe or <=> for null-safe comparisons. In aggregations, nulls are silently ignored — know this for accurate counts.What NOT to say: "NULL equals NULL in Spark." It does not. NULL == NULL evaluates to NULL, which is falsy. This is standard SQL three-valued logic.
SECTION 14: String Functions
Definition: PySpark provides built-in string functions for text manipulation including concatenation, trimming, pattern matching, and extraction.
from pyspark.sql.functions import (
concat, concat_ws, substring, trim, ltrim, rtrim,
lower, upper, initcap, length, lpad, rpad,
regexp_replace, regexp_extract, split,
instr, locate, translate, reverse,
col, lit
)
# --- CONCATENATION ---
df.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
df.withColumn("full_name", concat_ws(" ", col("first_name"), col("middle"), col("last_name")))
# concat_ws skips NULLs; concat returns NULL if any input is NULL
# --- CASE CONVERSION ---
df.withColumn("upper_name", upper(col("name")))
df.withColumn("lower_name", lower(col("name")))
df.withColumn("title_name", initcap(col("name"))) # "alice smith" -> "Alice Smith"
# --- TRIMMING ---
df.withColumn("clean", trim(col("name"))) # both sides
df.withColumn("clean", ltrim(col("name"))) # left only
df.withColumn("clean", rtrim(col("name"))) # right only
# --- SUBSTRING ---
df.withColumn("area_code", substring(col("phone"), 1, 3)) # first 3 chars (1-indexed)
# --- REGEX REPLACE ---
df.withColumn("clean_phone", regexp_replace(col("phone"), r"[^0-9]", ""))
df.withColumn("no_special", regexp_replace(col("text"), r"[^a-zA-Z0-9 ]", ""))
# --- REGEX EXTRACT ---
df.withColumn("domain", regexp_extract(col("email"), r"@(.+)", 1))
# Group 1 extracts the domain from "user@example.com" -> "example.com"
# --- SPLIT ---
df.withColumn("parts", split(col("full_name"), " ")) # returns array
df.withColumn("first", split(col("full_name"), " ")[0]) # first element
df.withColumn("last", split(col("full_name"), " ")[1]) # second element
# --- PADDING ---
df.withColumn("padded_id", lpad(col("id"), 10, "0")) # "42" -> "0000000042"
# --- LENGTH ---
df.withColumn("name_len", length(col("name")))
| Function | Example Input | Output |
|---|---|---|
| upper("hello") | "hello" | "HELLO" |
| lower("HELLO") | "HELLO" | "hello" |
| initcap("hello") | "hello world" | "Hello World" |
| trim(" hi ") | " hi " | "hi" |
| substring(s,1,3) | "abcdef" | "abc" |
| length("hello") | "hello" | 5 |
| lpad("42",5,"0") | "42" | "00042" |
| concat_ws | NULL handling | Skips nulls |
| concat | NULL handling | Returns NULL if any null |
concat and concat_ws: concat returns NULL if ANY input is NULL, while concat_ws (with separator) skips NULLs. This is frequently asked.What NOT to say: "I use Python string operations in a UDF for text manipulation." Always use built-in Spark string functions — they are optimized by Catalyst and avoid serialization overhead.
SECTION 15: Date Functions
Definition: PySpark provides built-in functions for date/timestamp creation, extraction, arithmetic, and formatting.
from pyspark.sql.functions import (
current_date, current_timestamp,
datediff, months_between, date_add, date_sub,
date_format, to_date, to_timestamp,
year, month, dayofmonth, dayofweek, dayofyear,
hour, minute, second, weekofyear, quarter,
last_day, next_day, trunc, date_trunc,
col, lit
)
# --- CURRENT DATE/TIMESTAMP ---
df.withColumn("today", current_date())
df.withColumn("now", current_timestamp())
# --- STRING TO DATE/TIMESTAMP ---
df.withColumn("parsed_date", to_date(col("date_str"), "yyyy-MM-dd"))
df.withColumn("parsed_ts", to_timestamp(col("ts_str"), "yyyy-MM-dd HH:mm:ss"))
# Common formats: "yyyy-MM-dd", "MM/dd/yyyy", "dd-MMM-yyyy", "yyyyMMdd"
# --- DATE FORMATTING ---
df.withColumn("formatted", date_format(col("booking_date"), "MMM dd, yyyy"))
# "2024-03-15" -> "Mar 15, 2024"
df.withColumn("year_month", date_format(col("booking_date"), "yyyy-MM"))
# --- DATE ARITHMETIC ---
df.withColumn("next_week", date_add(col("booking_date"), 7))
df.withColumn("last_week", date_sub(col("booking_date"), 7))
df.withColumn("days_diff", datediff(col("end_date"), col("start_date")))
df.withColumn("months_diff", months_between(col("end_date"), col("start_date")))
# --- EXTRACTING PARTS ---
df.withColumn("year", year(col("booking_date")))
df.withColumn("month", month(col("booking_date")))
df.withColumn("day", dayofmonth(col("booking_date")))
df.withColumn("dow", dayofweek(col("booking_date"))) # 1=Sunday, 7=Saturday
df.withColumn("quarter", quarter(col("booking_date")))
df.withColumn("week", weekofyear(col("booking_date")))
# --- TRUNCATION ---
df.withColumn("month_start", trunc(col("booking_date"), "month")) # first day of month
df.withColumn("year_start", trunc(col("booking_date"), "year")) # first day of year
df.withColumn("hour_start", date_trunc("hour", col("timestamp_col")))
# --- LAST DAY OF MONTH ---
df.withColumn("month_end", last_day(col("booking_date")))
# --- NEXT SPECIFIC DAY ---
df.withColumn("next_monday", next_day(col("booking_date"), "Monday"))
| Function | Example | Output |
|---|---|---|
| current_date() | - | 2024-03-15 |
| datediff(end, start) | (Mar 15, Mar 10) | 5 |
| date_add(date, 7) | (Mar 15, 7) | Mar 22 |
| months_between(end, start) | (Jun 15, Mar 15) | 3.0 |
| year(date) | Mar 15, 2024 | 2024 |
| date_format(date, "MMM yyyy") | 2024-03-15 | "Mar 2024" |
| to_date(str, "yyyy-MM-dd") | "2024-03-15" | Date object |
| trunc(date, "month") | 2024-03-15 | 2024-03-01 |
| last_day(date) | 2024-03-15 | 2024-03-31 |
datediff(end, start) takes the end date FIRST, which is counterintuitive. Also know that dayofweek returns 1 for Sunday (not Monday), which is a common gotcha.What NOT to say: "I use Python datetime in a UDF for date calculations." Always use built-in Spark date functions. They are Catalyst-optimized and handle distributed data correctly.
SECTION 16: Nested Data — Struct, Array, Map
Definition: PySpark supports complex nested data types: StructType (nested objects), ArrayType (lists), and MapType (key-value pairs), commonly found in JSON data.
Simple Explanation: Real-world data is not always flat tables. JSON from APIs has nested objects (structs), lists (arrays), and dictionaries (maps). PySpark can read, query, and flatten all of these.
from pyspark.sql.functions import explode, explode_outer, col, flatten, map_keys, map_values
# --- NESTED STRUCT ---
# Schema: address STRUCT<city STRING, country STRING, zip STRING>
df.select(
col("id"),
col("address.city").alias("city"),
col("address.country").alias("country"),
col("address.zip").alias("zip")
)
# --- ARRAY COLUMN ---
# Schema: tags ARRAY<STRING>
# explode: creates one row per element (NULL arrays -> zero rows)
df.withColumn("tag", explode(col("tags"))) \
.select("id", "tag")
# explode_outer: creates one row per element (NULL arrays -> one row with NULL)
df.withColumn("tag", explode_outer(col("tags"))) \
.select("id", "tag")
# size: number of elements in array
from pyspark.sql.functions import size
df.withColumn("tag_count", size(col("tags")))
# contains: check if array contains value
from pyspark.sql.functions import array_contains
df.filter(array_contains(col("tags"), "business"))
# flatten: flatten nested arrays
from pyspark.sql.functions import flatten
df.withColumn("flat_tags", flatten(col("nested_tags")))
# --- MAP COLUMN ---
# Schema: metadata MAP<STRING, STRING>
# Get value by key:
df.withColumn("status", col("metadata")["status"])
df.withColumn("status", col("metadata").getItem("status"))
# Explode map into key-value rows:
df.withColumn("kv_pair", explode(col("metadata"))) \
.select("id", col("kv_pair.key"), col("kv_pair.value"))
# --- READING NESTED JSON ---
json_data = """
{"booking_id": "B001", "customer": {"name": "Alice", "email": "a@b.com"}, "tags": ["biz", "premium"]}
"""
df = spark.read.json(sc.parallelize([json_data]))
# Flatten it:
df_flat = df.select(
col("booking_id"),
col("customer.name").alias("customer_name"),
col("customer.email").alias("customer_email"),
explode(col("tags")).alias("tag")
)
| Function | NULL array input | Empty array input | Output |
|---|---|---|---|
| explode | Drops row | Drops row | One row per element |
| explode_outer | Keeps row (NULL) | Drops row | One row per element |
| posexplode | Drops row | Drops row | Row + position index |
explode and explode_outer — explode drops null arrays silently, which can cause data loss. Use explode_outer when you want to preserve rows with null arrays.What NOT to say: "I use UDFs to parse nested JSON." Spark handles nested JSON natively with dot notation for structs and explode for arrays. UDFs are unnecessary and slow for this.
SECTION 17: Scenario-Based Interview Questions
Scenario 1: Reading 50 CSV files from multiple folders at once
What the interviewer is testing: Can you read multiple files efficiently with proper schema management and file tracking?
# Requirement: read all booking CSVs from 2023 and 2024 directories
# Add source file name column to track origin
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType
schema = StructType([
StructField("booking_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("booking_date", DateType(), True)
])
df = spark.read \
.schema(schema) \
.option("header", "true") \
.csv(
"/data/bookings/2023/*/bookings_*.csv",
"/data/bookings/2024/*/bookings_*.csv"
) \
.withColumn("source_file", input_file_name())
print(f"Total records: {df.count()}")
print(f"Files loaded: {df.select('source_file').distinct().count()}")
Follow-up they will ask: "What if some files have extra columns?" Use unionByName(allowMissingColumns=True) or define a superset schema.
Scenario 2: Deduplicate CDC records keeping latest
What the interviewer is testing: Do you know the window function dedup pattern? This is the single most common PySpark interview coding question.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
# CDC table: same customer_id may appear multiple times (updates over time)
# Keep only the MOST RECENT record per customer_id
window = Window.partitionBy("customer_id").orderBy(col("updated_at").desc())
df_deduped = df \
.withColumn("rn", row_number().over(window)) \
.filter(col("rn") == 1) \
.drop("rn")
# In Databricks/Delta Lake: MERGE INTO handles this as SCD Type 1
Follow-up they will ask: "Why row_number() and not dense_rank()?" Because row_number() guarantees exactly 1 row per partition (breaks ties), while dense_rank() can return multiple rows if timestamps are identical.
Scenario 3: Self-join to find pairs
What the interviewer is testing: Can you handle self-joins and avoid duplicate/self pairs?
# Find all flight booking pairs for the same customer on the same day
bookings1 = df.alias("b1")
bookings2 = df.alias("b2")
pairs = bookings1.join(bookings2,
(col("b1.customer_id") == col("b2.customer_id")) &
(col("b1.booking_date") == col("b2.booking_date")) &
(col("b1.booking_id") < col("b2.booking_id")) # avoid duplicates and self-pairs
).select(
col("b1.booking_id").alias("booking_1"),
col("b2.booking_id").alias("booking_2"),
col("b1.customer_id"),
col("b1.booking_date")
)
Follow-up they will ask: "Why < and not !=?" Using < ensures each pair appears only once (A,B but not B,A), while != would give both (A,B) and (B,A).
Scenario 4: Find customers who bought in January but NOT in February
What the interviewer is testing: Do you know anti joins?
jan_customers = df.filter(month(col("purchase_date")) == 1).select("customer_id").distinct()
feb_customers = df.filter(month(col("purchase_date")) == 2).select("customer_id").distinct()
# Anti join: customers in Jan who are NOT in Feb
jan_only = jan_customers.join(feb_customers, on="customer_id", how="left_anti")
Follow-up they will ask: "How would you do this in SQL?"
SELECT DISTINCT customer_id
FROM purchases
WHERE MONTH(purchase_date) = 1
AND customer_id NOT IN (
SELECT DISTINCT customer_id FROM purchases WHERE MONTH(purchase_date) = 2
)
Scenario 5: Running total of revenue by date per region
What the interviewer is testing: Window functions with frame specification.
from pyspark.sql.window import Window
from pyspark.sql.functions import sum
window = Window.partitionBy("region") \
.orderBy("sale_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_running = df.withColumn("running_revenue", sum("revenue").over(window))
Follow-up they will ask: "What if you want a 30-day rolling sum instead of a running total?" Change the frame to .rowsBetween(-29, 0) for row-based, or use rangeBetween with days cast to numeric for date-based windows.
Scenario 6: Replace nulls with the previous non-null value (forward fill)
What the interviewer is testing: Advanced window function usage.
from pyspark.sql.window import Window
from pyspark.sql.functions import last, col
window = Window.partitionBy("sensor_id") \
.orderBy("timestamp") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_filled = df.withColumn("value_filled",
last("value", ignorenulls=True).over(window)
)