Python Data Structures & Coding Patterns — Data Engineer Edition
Memory Map
Q01 — Set Operations for Data Comparison
Question: Given two sets of team members, find (a) common members, (b) members only in team A, (c) members in either but not both, and (d) all members combined.
Sample Input:
Expected Output:
Solution 1 — Manual loop approach
team_a = {"Alice", "Bob", "Charlie", "Diana"}
team_b = {"Bob", "Diana", "Eve", "Frank"}
# Find common members by checking each member of A in B
common = []
for member in team_a:
if member in team_b:
common.append(member)
print("Common:", sorted(common))
# Output: Common: ['Bob', 'Diana']
# Find members only in team A
only_a = []
for member in team_a:
if member not in team_b:
only_a.append(member)
print("Only in A:", sorted(only_a))
# Output: Only in A: ['Alice', 'Charlie']
# Find symmetric difference (in one but not both)
sym_diff = []
for member in team_a:
if member not in team_b:
sym_diff.append(member)
for member in team_b:
if member not in team_a:
sym_diff.append(member)
print("Symmetric difference:", sorted(sym_diff))
# Output: Symmetric difference: ['Alice', 'Charlie', 'Eve', 'Frank']
# Combine all members using a helper set
all_members = set()
for member in team_a:
all_members.add(member)
for member in team_b:
all_members.add(member)
print("All members:", sorted(all_members))
# Output: All members: ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank']
Solution 2 — List comprehension approach
team_a = {"Alice", "Bob", "Charlie", "Diana"}
team_b = {"Bob", "Diana", "Eve", "Frank"}
# Use list comprehensions instead of explicit loops
common = sorted([m for m in team_a if m in team_b])
print("Common:", common)
# Output: Common: ['Bob', 'Diana']
only_a = sorted([m for m in team_a if m not in team_b])
print("Only in A:", only_a)
# Output: Only in A: ['Alice', 'Charlie']
sym_diff = sorted([m for m in team_a if m not in team_b] +
[m for m in team_b if m not in team_a])
print("Symmetric difference:", sym_diff)
# Output: Symmetric difference: ['Alice', 'Charlie', 'Eve', 'Frank']
all_members = sorted(set(list(team_a) + list(team_b)))
print("All members:", all_members)
# Output: All members: ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank']
Solution 3 — Optimal: Set operators (Pythonic)
team_a = {"Alice", "Bob", "Charlie", "Diana"}
team_b = {"Bob", "Diana", "Eve", "Frank"}
# Step 1: Let's SEE what these sets look like
print("Team A:", sorted(team_a))
# Output: Team A: ['Alice', 'Bob', 'Charlie', 'Diana']
print("Team B:", sorted(team_b))
# Output: Team B: ['Bob', 'Charlie', 'Diana', 'Eve', 'Frank']
# ↑ Sets are unordered — sorted() just makes output readable
# Step 2: & operator = intersection (common members)
common = team_a & team_b
print("Common (A & B):", sorted(common))
# Output: Common (A & B): ['Bob', 'Diana']
# ↑ Only Bob and Diana are in BOTH sets
# Step 3: - operator = difference (only in A, not in B)
only_a = team_a - team_b
print("Only in A (A - B):", sorted(only_a))
# Output: Only in A (A - B): ['Alice', 'Charlie']
# ↑ Alice and Charlie are in A but NOT in B — this is like a LEFT ANTI JOIN
# Step 4: ^ operator = symmetric difference (in one but not both)
sym_diff = team_a ^ team_b
print("In one but not both (A ^ B):", sorted(sym_diff))
# Output: In one but not both (A ^ B): ['Alice', 'Charlie', 'Eve', 'Frank']
# ↑ Excludes Bob and Diana (they're in both) — like a FULL OUTER minus INNER
# Step 5: | operator = union (all combined, deduplicated)
all_members = team_a | team_b
print("All members (A | B):", sorted(all_members))
# Output: All members (A | B): ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve', 'Frank']
# ↑ 6 unique members — Bob and Diana appear once, not twice
# Bonus: Subset check
small = {"Bob", "Diana"}
print("Is small ⊆ team_a?", small <= team_a)
# Output: Is small ⊆ team_a? True
# ↑ <= checks if every element in small exists in team_a
Interview Tip: "Set operations are O(min(len(a), len(b))). Much faster than list comparison loops which are O(n*m). In data engineering, use sets to compare column lists, find missing columns, and deduplicate."
What NOT to Say: "I would loop through both lists to find common elements." -- This shows you do not know set operations, which are fundamental in Python and data engineering.
Q02 — defaultdict and Counter Patterns
Question: Given a list of words, count the frequency of each word. Then, given a list of transactions with categories and amounts, group the amounts by category.
Sample Input:
Expected Output:
Solution 1 — Manual dict approach
words = ["hello", "world", "hello", "foo", "world", "hello"]
# Count words using a plain dict with key existence check
word_count = {}
for word in words:
if word not in word_count:
word_count[word] = 0 # initialize key if missing
word_count[word] += 1
print("Word counts:", dict(sorted(word_count.items())))
# Output: Word counts: {'foo': 1, 'hello': 3, 'world': 2}
# Group transactions using plain dict
transactions = [("food", 50), ("transport", 30), ("food", 25),
("entertainment", 100), ("transport", 45)]
by_category = {}
for category, amount in transactions:
if category not in by_category:
by_category[category] = [] # initialize empty list
by_category[category].append(amount)
print("Grouped:", dict(sorted(by_category.items())))
# Output: Grouped: {'entertainment': [100], 'food': [50, 25], 'transport': [30, 45]}
Solution 2 — defaultdict approach
from collections import defaultdict
words = ["hello", "world", "hello", "foo", "world", "hello"]
# defaultdict(int) auto-creates missing keys with value 0
word_count = defaultdict(int)
for word in words:
word_count[word] += 1 # no need to check if key exists
print("Word counts:", dict(sorted(word_count.items())))
# Output: Word counts: {'foo': 1, 'hello': 3, 'world': 2}
# defaultdict(list) auto-creates missing keys with empty list
transactions = [("food", 50), ("transport", 30), ("food", 25),
("entertainment", 100), ("transport", 45)]
by_category = defaultdict(list)
for category, amount in transactions:
by_category[category].append(amount) # no init needed
print("Grouped:", dict(sorted(by_category.items())))
# Output: Grouped: {'entertainment': [100], 'food': [50, 25], 'transport': [30, 45]}
Solution 3 — Optimal: Counter (for counting)
from collections import Counter
words = ["hello", "world", "hello", "foo", "world", "hello"]
# Step 1: Counter scans the list once and builds {element: count}
word_count = Counter(words)
print("Step 1 - Raw Counter:", word_count)
# Output: Step 1 - Raw Counter: Counter({'hello': 3, 'world': 2, 'foo': 1})
# ↑ Counter is a dict subclass — keys are elements, values are their counts
# Step 2: Sort by key for consistent display
print("Step 2 - Sorted:", dict(sorted(word_count.items())))
# Output: Step 2 - Sorted: {'foo': 1, 'hello': 3, 'world': 2}
# ↑ dict(sorted(...)) gives us alphabetical order for readability
# Step 3: most_common(n) returns top n as list of (element, count) tuples
top_2 = word_count.most_common(2)
print("Step 3 - Top 2:", top_2)
# Output: Step 3 - Top 2: [('hello', 3), ('world', 2)]
# ↑ Already sorted by count descending — no need to sort manually!
# Step 4: Counter arithmetic — useful for comparing frequency distributions
c1 = Counter("aabbc")
c2 = Counter("abbcc")
print("Step 4a - c1:", c1)
# Output: Step 4a - c1: Counter({'a': 2, 'b': 2, 'c': 1})
print("Step 4b - c2:", c2)
# Output: Step 4b - c2: Counter({'b': 2, 'c': 2, 'a': 1})
print("c1 + c2:", c1 + c2) # add counts element-wise
# Output: c1 + c2: Counter({'b': 4, 'a': 3, 'c': 3})
# ↑ 'b' had 2+2=4, 'a' had 2+1=3, 'c' had 1+2=3
print("c1 - c2:", c1 - c2) # subtract, drops zero/negative
# Output: c1 - c2: Counter({'a': 1})
# ↑ Only 'a' has more in c1 (2) than c2 (1) → keeps 2-1=1
print("c1 & c2:", c1 & c2) # min of each count (intersection)
# Output: c1 & c2: Counter({'b': 2, 'a': 1, 'c': 1})
# ↑ min(2,1)=1 for 'a', min(2,2)=2 for 'b', min(1,2)=1 for 'c'
Interview Tip:
"Counter supports arithmetic: +, -, & (min), | (max). These are incredibly useful for frequency analysis in ETL. defaultdict eliminates boilerplate key-existence checks."
What NOT to Say:
"I would use .setdefault() for everything." -- setdefault works but is less readable than defaultdict. Know both, but prefer defaultdict in interviews.
Q03 — Sorting with Custom Keys
Question: Given a list of student tuples (name, grade), sort them by grade descending. If grades are equal, sort by name ascending.
Sample Input:
Expected Output:
Solution 1 — Manual: Bubble sort with custom comparison
students = [("Alice", 85), ("Bob", 85), ("Charlie", 92), ("Diana", 78)]
# Copy the list so we do not mutate original
result = students[:]
# Bubble sort: compare grade desc, then name asc
for i in range(len(result)):
for j in range(i + 1, len(result)):
name_i, grade_i = result[i]
name_j, grade_j = result[j]
# Swap if j should come before i
should_swap = False
if grade_j > grade_i: # higher grade first
should_swap = True
elif grade_j == grade_i and name_j < name_i: # same grade, alpha by name
should_swap = True
if should_swap:
result[i], result[j] = result[j], result[i]
print(result)
# Output: [('Charlie', 92), ('Alice', 85), ('Bob', 85), ('Diana', 78)]
Solution 2 — sorted() with lambda key
students = [("Alice", 85), ("Bob", 85), ("Charlie", 92), ("Diana", 78)]
# Sort by grade descending using reverse=True
by_grade_desc = sorted(students, key=lambda x: x[1], reverse=True)
print(by_grade_desc)
# Output: [('Charlie', 92), ('Alice', 85), ('Bob', 85), ('Diana', 78)]
# Note: Alice and Bob both have 85, but order may not be alphabetical
Solution 3 — Optimal: Tuple key with negation for multi-criteria
students = [("Alice", 85), ("Bob", 85), ("Charlie", 92), ("Diana", 78)]
# Step 1: Let's see what the lambda key produces for each student
for s in students:
print(f" {s[0]:>8} → key = ({-s[1]}, '{s[0]}') = {(-s[1], s[0])}")
# Output: Alice → key = (-85, 'Alice') = (-85, 'Alice')
# Output: Bob → key = (-85, 'Bob') = (-85, 'Bob')
# Output: Charlie → key = (-92, 'Charlie') = (-92, 'Charlie')
# Output: Diana → key = (-78, 'Diana') = (-78, 'Diana')
# ↑ Negating the grade means -92 < -85 < -78, so highest grade sorts FIRST
# Step 2: Python compares tuples left to right
# -92 comes first (smallest). For tied grades (-85), 'Alice' < 'Bob' alphabetically
result = sorted(students, key=lambda x: (-x[1], x[0]))
print("Step 2 - Sorted:", result)
# Output: Step 2 - Sorted: [('Charlie', 92), ('Alice', 85), ('Bob', 85), ('Diana', 78)]
# ↑ Charlie (92) first, then Alice before Bob (both 85, but A < B alphabetically)
# Bonus: Sort a dict by values
scores = {'alice': 85, 'bob': 92, 'charlie': 78}
print("Step 3 - Dict keys:", list(scores.keys()))
# Output: Step 3 - Dict keys: ['alice', 'bob', 'charlie']
# ↑ sorted(scores) iterates over KEYS, but sorts them by scores.get (the values)
sorted_names = sorted(scores, key=scores.get, reverse=True)
print("Step 4 - Sorted by value desc:", sorted_names)
# Output: Step 4 - Sorted by value desc: ['bob', 'alice', 'charlie']
# ↑ bob(92) > alice(85) > charlie(78) — keys sorted by their values!
Interview Tip: "For multi-criteria sorting, use a tuple in the key function. Negate numeric values for reverse order on specific fields. Python's sort is stable, so you can also do two passes — but the tuple key is cleaner."
What NOT to Say: "I would sort twice — once by name, then by grade." -- While stable sort makes this technically correct, it shows you do not know the tuple-key trick, which is the standard approach.
Q04 — Stack and Queue Patterns
Question: Implement a function that checks whether a string of brackets is balanced. Valid pairs are (), [], {}.
Sample Input:
Expected Output:
Solution 1 — Manual: Repeated string replacement
def is_balanced(s):
# Keep removing innermost pairs until nothing is left
while "()" in s or "[]" in s or "{}" in s:
s = s.replace("()", "")
s = s.replace("[]", "")
s = s.replace("{}", "")
# If empty, all brackets were matched
return len(s) == 0
print(is_balanced("({[]})")) # Output: True
print(is_balanced("({[})")) # Output: False
print(is_balanced("((()))")) # Output: True
Solution 2 — Stack with list
def is_balanced(s):
stack = []
# Map each closing bracket to its opening counterpart
pairs = {')': '(', ']': '[', '}': '{'}
for char in s:
if char in '([{':
stack.append(char) # push opening bracket
elif char in ')]}':
if not stack: # nothing to match against
return False
if stack[-1] != pairs[char]: # top of stack must match
return False
stack.pop() # matched — remove from stack
return len(stack) == 0 # stack must be empty at end
print(is_balanced("({[]})")) # Output: True
print(is_balanced("({[})")) # Output: False
print(is_balanced("((()))")) # Output: True
Solution 3 — Optimal: Stack with deque (O(1) operations)
from collections import deque
def is_balanced(s):
# deque gives O(1) append and pop from both ends
stack = deque()
pairs = {')': '(', ']': '[', '}': '{'}
openers = set('([{')
for char in s:
if char in openers:
stack.append(char)
elif char in pairs:
# If stack empty or top does not match, it is unbalanced
if not stack or stack[-1] != pairs[char]:
return False
stack.pop()
return len(stack) == 0
print(is_balanced("({[]})")) # Output: True
print(is_balanced("({[})")) # Output: False
print(is_balanced("((()))")) # Output: True
# Bonus: Stack vs Queue demonstration
# Stack = LIFO (Last In, First Out) — use append/pop
stack = []
stack.append(1); stack.append(2); stack.append(3)
print(stack.pop()) # Output: 3
# Queue = FIFO (First In, First Out) — use deque with append/popleft
queue = deque()
queue.append(1); queue.append(2); queue.append(3)
print(queue.popleft()) # Output: 1
Interview Tip:
"Never use list.pop(0) for queues -- it is O(n) because every element shifts. Use deque.popleft() which is O(1). For stacks, list is fine since append/pop from the end are both O(1)."
What NOT to Say:
"I would use a list for the queue." -- This reveals you do not understand time complexity. list.pop(0) is O(n), while deque.popleft() is O(1).
Q05 — Lambda, Map, Filter in One-Liners
Question: Given a list of integers, filter out the negative numbers and zero, then return the squares of the remaining positive numbers.
Sample Input:
Expected Output:
Solution 1 — Manual loop
numbers = [-3, -1, 0, 2, 4, 7]
result = []
for x in numbers:
if x > 0: # filter: keep only positive numbers
result.append(x ** 2) # transform: square each one
print(result)
# Output: [4, 16, 49]
Solution 2 — map() and filter() with lambda
numbers = [-3, -1, 0, 2, 4, 7]
# filter() keeps elements where the lambda returns True
# map() applies the lambda to each remaining element
result = list(map(lambda x: x ** 2,
filter(lambda x: x > 0, numbers)))
print(result)
# Output: [4, 16, 49]
# Other useful lambda patterns:
names = ["alice", "BOB", "Charlie"]
normalized = list(map(str.title, names))
print(normalized)
# Output: ['Alice', 'Bob', 'Charlie']
# Sort dicts with lambda
data = [{"name": "Alice", "age": 30},
{"name": "Bob", "age": 25}]
sorted_data = sorted(data, key=lambda d: d["age"])
print(sorted_data)
# Output: [{'name': 'Bob', 'age': 25}, {'name': 'Alice', 'age': 30}]
Solution 3 — Optimal: List comprehension (Pythonic)
numbers = [-3, -1, 0, 2, 4, 7]
# Combines filter and transform in one readable expression
result = [x ** 2 for x in numbers if x > 0]
print(result)
# Output: [4, 16, 49]
# More one-liner patterns:
# Conditional expression in comprehension
classified = ["even" if x % 2 == 0 else "odd" for x in range(5)]
print(classified)
# Output: ['even', 'odd', 'even', 'odd', 'even']
# Filter even numbers
evens = [x for x in range(20) if x % 2 == 0]
print(evens)
# Output: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Interview Tip: "Prefer list comprehensions over map/filter with lambdas -- they are more readable and more Pythonic. But know both styles because you will encounter map/filter in legacy codebases."
What NOT to Say: "I always use map and filter because they are more functional." -- In Python, list comprehensions are considered more idiomatic. Guido van Rossum himself prefers them.
Q06 — namedtuple and dataclass
Question: Create a structured record for an Employee with name, department, and salary. Show how to create instances and access fields. Demonstrate both immutable and mutable approaches.
Sample Input:
Expected Output:
Solution 1 — Plain dictionary (simplest, no structure enforcement)
# Use a dict — flexible but no field validation
emp = {
"name": "Alice",
"department": "Data Engineering",
"salary": 95000
}
print(f"Name: {emp['name']}, Dept: {emp['department']}, Salary: {emp['salary']}")
# Output: Name: Alice, Dept: Data Engineering, Salary: 95000
# Mutable — just update the key
emp["salary"] = 100000
print(f"Updated salary: {emp['salary']}")
# Output: Updated salary: 100000
# Downside: no field validation, typos are silent
emp["salry"] = 99999 # typo creates a NEW key, no error
Solution 2 — namedtuple (immutable, tuple-like)
from collections import namedtuple
# Define the structure — fields are fixed at creation
Employee = namedtuple('Employee', ['name', 'department', 'salary'])
emp = Employee("Alice", "Data Engineering", 95000)
print(f"Immutable: {emp}")
# Output: Immutable: Employee(name='Alice', department='Data Engineering', salary=95000)
# Access by name or index
print(emp.name) # Output: Alice
print(emp[0]) # Output: Alice
# Immutable — cannot change after creation
# emp.salary = 100000 # Raises: AttributeError: can't set attribute
# To "update", use _replace() which returns a NEW namedtuple
updated = emp._replace(salary=100000)
print(f"Mutable after update: {updated}")
# Output: Mutable after update: Employee(name='Alice', department='Data Engineering', salary=100000)
Solution 3 — Optimal: dataclass (Python 3.7+, mutable with defaults)
from dataclasses import dataclass, asdict, astuple
@dataclass
class Employee:
name: str
department: str
salary: float = 50000.0 # default value
# Step 1: Create instance — @dataclass auto-generates __init__ from type hints
emp = Employee("Alice", "Data Engineering", 95000)
print("Step 1 - Created:", emp)
# Output: Step 1 - Created: Employee(name='Alice', department='Data Engineering', salary=95000)
# ↑ __repr__ is auto-generated too — no need to write it yourself!
# Step 2: Mutable by default — direct field assignment works
emp.salary = 100000
print("Step 2 - After mutation:", emp)
# Output: Step 2 - After mutation: Employee(name='Alice', department='Data Engineering', salary=100000)
# ↑ Unlike namedtuple, you CAN modify fields in-place
# Step 3: Convert to dict/tuple — useful for serialization and DB inserts
print("Step 3a - As dict:", asdict(emp))
# Output: Step 3a - As dict: {'name': 'Alice', 'department': 'Data Engineering', 'salary': 100000}
print("Step 3b - As tuple:", astuple(emp))
# Output: Step 3b - As tuple: ('Alice', 'Data Engineering', 100000)
# ↑ asdict() is perfect for JSON serialization or DataFrame row creation
# Step 4: Auto-generated __eq__ compares all fields
emp2 = Employee("Alice", "Data Engineering", 100000)
print("Step 4 - emp == emp2:", emp == emp2)
# Output: Step 4 - emp == emp2: True
# ↑ Compares by value, not identity — no need to write __eq__ yourself
# Step 5: For immutable dataclass, use frozen=True
@dataclass(frozen=True)
class ImmutableEmployee:
name: str
department: str
salary: float
frozen_emp = ImmutableEmployee("Bob", "Analytics", 80000)
# frozen_emp.salary = 90000 # Raises: FrozenInstanceError
print("Step 5 - Frozen:", frozen_emp)
# Output: Step 5 - Frozen: ImmutableEmployee(name='Bob', department='Analytics', salary=80000)
# ↑ frozen=True also makes it hashable — can use as dict key or set member
Interview Tip:
"Use namedtuple for simple immutable records (like a row from a query). Use dataclass when you need mutability, defaults, or methods. dataclass also gives you __eq__, __repr__ for free."
What NOT to Say:
"I would just use a regular class with __init__." -- This shows you are not aware of modern Python. dataclass eliminates boilerplate and is the standard for structured data since Python 3.7.
Q07 — Decorators (Simplified)
Question: Write a decorator called timer that measures and prints how long a function takes to execute. Then show how @lru_cache can speed up a recursive Fibonacci function.
Sample Input:
Expected Output:
Solution 1 — Manual timing without a decorator
import time
def slow_function():
time.sleep(1)
return "done"
# Manually measure time around the function call
start = time.time()
result = slow_function()
elapsed = time.time() - start
print(f"slow_function took {elapsed:.3f}s")
# Output: slow_function took 1.001s
# Fibonacci without cache — exponential time
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
start = time.time()
print(f"fibonacci(10) = {fibonacci(10)}")
elapsed = time.time() - start
print(f"Took {elapsed:.6f}s")
# Output: fibonacci(10) = 55
# Output: Took 0.000XXXs (small n is fast, but try n=35 and it crawls)
Solution 2 — Custom decorator with @wraps
import time
from functools import wraps
# A decorator is a function that takes a function and returns a wrapper
def timer(func):
@wraps(func) # preserves original function's name and docstring
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs) # call the original function
elapsed = time.time() - start
print(f"{func.__name__} took {elapsed:.3f}s")
return result
return wrapper
# Apply decorator with @ syntax — equivalent to: slow_function = timer(slow_function)
@timer
def slow_function():
time.sleep(1)
return "done"
slow_function()
# Output: slow_function took 1.001s
# The function's identity is preserved thanks to @wraps
print(slow_function.__name__)
# Output: slow_function (without @wraps, it would say "wrapper")
Solution 3 — Optimal: Built-in decorators (@lru_cache, @property, etc.)
from functools import lru_cache
# @lru_cache memoizes results — stores return values for given arguments
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# Step 1: Call fibonacci(10) — internally computes fib(0) through fib(10)
print(f"Step 1 - fibonacci(10) = {fibonacci(10)}")
# Output: Step 1 - fibonacci(10) = 55
# Step 2: Inspect the cache — see what's stored
print(f"Step 2 - Cache info: {fibonacci.cache_info()}")
# Output: Step 2 - Cache info: CacheInfo(hits=8, misses=11, maxsize=128, currsize=11)
# ↑ 11 unique calls (fib(0)..fib(10)), 8 cache hits (fib reuses previous results)
# Without cache: fib(10) would make 177 recursive calls. With cache: only 11!
# Step 3: Now call fibonacci(100) — fib(0)..fib(10) are already cached!
print(f"Step 3 - fibonacci(100) = {fibonacci(100)}")
# Output: Step 3 - fibonacci(100) = 354224848179261915075
print(f"Step 4 - Cache after fib(100): {fibonacci.cache_info()}")
# Output: Step 4 - Cache after fib(100): CacheInfo(hits=98, misses=101, maxsize=128, currsize=101)
# ↑ Only 90 new computations (fib(11)..fib(100)) — the first 11 were cache hits!
# Step 5: You can clear the cache if needed
fibonacci.cache_clear()
print(f"Step 5 - After clear: {fibonacci.cache_info()}")
# Output: Step 5 - After clear: CacheInfo(hits=0, misses=0, maxsize=128, currsize=0)
# Common built-in decorators to know:
# @staticmethod — no self/cls, just a function inside a class
# @classmethod — gets cls instead of self, used for factory methods
# @property — makes a method look like an attribute (getter)
# @lru_cache — memoization (caches return values by arguments)
Interview Tip:
"Always use @wraps(func) in custom decorators -- it preserves the original function's name and docstring, which matters for debugging and logging."
What NOT to Say: "A decorator modifies the function." -- It WRAPS, not modifies. The original function is unchanged. The decorator returns a new function that calls the original.
Q08 — Context Managers (with statement)
Question: Write a custom context manager called Timer that measures the elapsed time of a code block. Show both the class-based and the contextlib approach.
Sample Input:
Expected Output:
Solution 1 — Manual try/finally (no context manager)
import time
# Without a context manager, you must handle cleanup manually
start = time.time()
try:
time.sleep(1) # simulate work
finally:
elapsed = time.time() - start
print(f"Elapsed: {elapsed:.3f}s")
# Output: Elapsed: 1.001s
# Same pattern for file handling
f = open("example.txt", "w")
try:
f.write("hello")
finally:
f.close() # must close manually, even if exception occurs
Solution 2 — Class-based context manager (enter / exit)
import time
class Timer:
def __enter__(self):
# Setup: record the start time
self.start = time.time()
return self # the object bound to "as" variable
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup: calculate and print elapsed time
self.elapsed = time.time() - self.start
print(f"Elapsed: {self.elapsed:.3f}s")
return False # False = do not suppress exceptions
with Timer() as t:
time.sleep(1)
# Output: Elapsed: 1.001s
# File handling — the classic context manager use case
with open("example.txt", "w") as f:
f.write("hello")
# File is automatically closed, even if exception occurs
Solution 3 — Optimal: contextlib.contextmanager (Pythonic)
import time
from contextlib import contextmanager
@contextmanager
def timer(label="Block"):
# Everything before yield is __enter__
start = time.time()
yield # control passes to the with-block here
# Everything after yield is __exit__
elapsed = time.time() - start
print(f"{label} elapsed: {elapsed:.3f}s")
with timer("My task"):
time.sleep(1)
# Output: My task elapsed: 1.001s
# Real-world example: database connection manager
@contextmanager
def database_connection(db_url):
conn = connect(db_url) # setup: open connection
try:
yield conn # give connection to the with-block
finally:
conn.close() # cleanup: always close, even on error
# Usage:
# with database_connection("postgres://...") as conn:
# conn.execute("SELECT * FROM users")
# Connection is automatically closed
Interview Tip:
"Context managers are critical in data engineering -- database connections, file handles, Spark sessions. Always use with to prevent resource leaks. The contextlib.contextmanager decorator is the simplest way to write one."
What NOT to Say:
"I close files manually with f.close()." -- This leaks resources if an exception occurs before close(). Always use with for automatic cleanup.
Q09 — Itertools for Data Processing
Question: Given multiple lists of data, (a) merge them into one, (b) group records by a key, and (c) generate all combinations of sizes and colors.
Sample Input:
Expected Output:
Solution 1 — Manual loops (no itertools)
# Merge multiple lists manually
list_a, list_b, list_c = [1, 2], [3, 4], [5, 6]
merged = []
for lst in [list_a, list_b, list_c]:
for item in lst:
merged.append(item)
print("Chained:", merged)
# Output: Chained: [1, 2, 3, 4, 5, 6]
# Group by key manually (data must be sorted by key)
data = [("NY", 100), ("NY", 200), ("CA", 300), ("CA", 400)]
groups = {}
for state, value in data:
if state not in groups:
groups[state] = []
groups[state].append((state, value))
for state in sorted(groups):
print(f" {state} -> {groups[state]}")
# Output:
# CA -> [('CA', 300), ('CA', 400)]
# NY -> [('NY', 100), ('NY', 200)]
# Cartesian product manually
sizes = ['S', 'M', 'L']
colors = ['red', 'blue']
combos = []
for s in sizes:
for c in colors:
combos.append((s, c))
print("Product:", combos)
# Output: Product: [('S', 'red'), ('S', 'blue'), ('M', 'red'), ('M', 'blue'), ('L', 'red'), ('L', 'blue')]
Solution 2 — List comprehension and unpacking
# Merge with unpacking
list_a, list_b, list_c = [1, 2], [3, 4], [5, 6]
merged = [*list_a, *list_b, *list_c]
print("Chained:", merged)
# Output: Chained: [1, 2, 3, 4, 5, 6]
# Cartesian product with nested comprehension
sizes = ['S', 'M', 'L']
colors = ['red', 'blue']
combos = [(s, c) for s in sizes for c in colors]
print("Product:", combos)
# Output: Product: [('S', 'red'), ('S', 'blue'), ('M', 'red'), ('M', 'blue'), ('L', 'red'), ('L', 'blue')]
# Combinations with comprehension
items = [1, 2, 3]
pairs = [(items[i], items[j]) for i in range(len(items)) for j in range(i + 1, len(items))]
print("Combos:", pairs)
# Output: Combos: [(1, 2), (1, 3), (2, 3)]
Solution 3 — Optimal: itertools (memory-efficient, lazy)
from itertools import chain, groupby, islice, product, combinations
# chain — merge multiple iterables lazily
merged = list(chain([1, 2], [3, 4], [5, 6]))
print("Chained:", merged)
# Output: Chained: [1, 2, 3, 4, 5, 6]
# groupby — group CONSECUTIVE items by a key (data must be sorted!)
data = [("CA", 300), ("CA", 400), ("NY", 100), ("NY", 200)] # sorted by state
for state, group in groupby(data, key=lambda x: x[0]):
print(f" {state} -> {list(group)}")
# Output:
# CA -> [('CA', 300), ('CA', 400)]
# NY -> [('NY', 100), ('NY', 200)]
# product — cartesian product (replaces nested loops)
combos = list(product(['S', 'M', 'L'], ['red', 'blue']))
print("Product:", combos)
# Output: Product: [('S', 'red'), ('S', 'blue'), ('M', 'red'), ('M', 'blue'), ('L', 'red'), ('L', 'blue')]
# combinations — choose k items from n without replacement
print("Combos:", list(combinations([1, 2, 3], 2)))
# Output: Combos: [(1, 2), (1, 3), (2, 3)]
# islice — slice an iterator lazily (no memory for huge files)
# with open("huge_file.csv") as f:
# first_10 = list(islice(f, 10)) # reads only first 10 lines
Interview Tip:
"groupby requires SORTED data -- it only groups CONSECUTIVE items. This is the number one mistake candidates make. Always sort before groupby."
What NOT to Say:
"groupby works like SQL GROUP BY." -- It does NOT. SQL GROUP BY processes all rows regardless of order. Python's groupby only groups consecutive identical keys.
Q10 — Exception Handling Best Practices
Question: Write a function that safely converts a string to an integer. Handle invalid input with specific exceptions, and show how to create a custom exception.
Sample Input:
Expected Output:
Solution 1 — Basic try/except with specific exceptions
def convert(value):
try:
return int(value)
except ValueError:
# Raised when the string cannot be parsed as int
return f"Error: '{value}' is not a valid number"
except TypeError:
# Raised when value is None or non-string type
return f"Error: Input cannot be None"
print(convert("42"))
# Output: 42
print(convert("abc"))
# Output: Error: 'abc' is not a valid number
print(convert(None))
# Output: Error: Input cannot be None
Solution 2 — Custom exception class
class DataValidationError(Exception):
"""Custom exception for data validation failures."""
def __init__(self, column, value, message):
self.column = column
self.value = value
# Call parent __init__ with a formatted message
super().__init__(f"Column '{column}': {message} (got: {value})")
def validate_age(data):
age = data.get("age")
if age is None:
raise DataValidationError("age", age, "must not be None")
if not isinstance(age, int):
raise DataValidationError("age", age, "must be an integer")
if age < 0:
raise DataValidationError("age", age, "must be positive")
return age
# Test with valid data
try:
result = validate_age({"age": 25})
print(f"Valid age: {result}")
except DataValidationError as e:
print(f"Validation failed: {e}")
# Output: Valid age: 25
# Test with invalid data
try:
result = validate_age({"age": -5})
print(f"Valid age: {result}")
except DataValidationError as e:
print(f"Validation failed: {e}")
# Output: Validation failed: Column 'age': must be positive (got: -5)
Solution 3 — Optimal: Exception chaining and best practices
class DataValidationError(Exception):
def __init__(self, column, value, message):
self.column = column
self.value = value
super().__init__(f"Column '{column}': {message} (got: {value})")
def safe_convert(value):
try:
return int(value)
except (ValueError, TypeError) as e:
# Chain exceptions with "from e" to preserve the original traceback
raise DataValidationError("input", value, "must be a valid integer") from e
# Test
try:
result = safe_convert("abc")
except DataValidationError as e:
print(f"Error: {e}")
print(f" Column: {e.column}")
print(f" Value: {e.value}")
# Output: Error: Column 'input': must be a valid integer (got: abc)
# Output: Column: input
# Output: Value: abc
# Anti-patterns to AVOID:
# BAD — bare except catches EVERYTHING including KeyboardInterrupt
# try:
# result = process(data)
# except:
# pass
# BAD — too broad, hides real bugs
# try:
# result = process(data)
# except Exception:
# print("something went wrong")
# GOOD — always catch SPECIFIC exceptions
try:
result = int("abc")
except ValueError:
print("Caught specific ValueError")
# Output: Caught specific ValueError
Interview Tip:
"Always catch specific exceptions. except Exception hides bugs. Use raise ... from e to chain exceptions and preserve the traceback. In data pipelines, custom exceptions make debugging much easier."
What NOT to Say:
"I use try/except with bare except." -- This catches SystemExit and KeyboardInterrupt, making the program unkillable. Always catch specific exception types.
Q11 — String Formatting (f-strings, format, %)
Question: Given variables for name, age, and salary, format them in three different ways: (a) basic string with variables, (b) salary with commas and 2 decimal places, (c) aligned columns.
Sample Input:
Expected Output:
Solution 1 — %-formatting (old style, know it but do not use it)
name = "Alice"
age = 30
salary = 95432.567
# Basic substitution with %s (string) and %d (integer)
print("%s is %d years old" % (name, age))
# Output: Alice is 30 years old
# Float formatting with %.2f (2 decimal places)
print("Salary: $%.2f" % salary)
# Output: Salary: $95432.57
# Note: %-formatting does NOT support comma separators easily
Solution 2 — .format() method
name = "Alice"
age = 30
salary = 95432.567
# Positional arguments
print("{} is {} years old".format(name, age))
# Output: Alice is 30 years old
# Named arguments
print("{name} is {age} years old".format(name=name, age=age))
# Output: Alice is 30 years old
# Format spec: comma separator and 2 decimal places
print("Salary: ${:,.2f}".format(salary))
# Output: Salary: $95,432.57
# Alignment: < left, ^ center, > right (within 20 chars)
print("{:<20}|{:^20}|{:>20}".format("left", "center", "right"))
# Output: left | center | right
Solution 3 — Optimal: f-strings (Python 3.6+, always use this)
name = "Alice"
age = 30
salary = 95432.567
# Basic f-string — variables directly in curly braces
print(f"{name} is {age} years old")
# Output: Alice is 30 years old
# Format spec after colon: comma separator, 2 decimals
print(f"Salary: ${salary:,.2f}")
# Output: Salary: $95,432.57
# Alignment: < left, ^ center, > right
print(f"{'left':<20}|{'center':^20}|{'right':>20}")
# Output: left | center | right
# Zero-padded numbers
print(f"{age:05d}")
# Output: 00030
# Debug syntax (Python 3.8+) — shows variable name AND value
x = 42
print(f"{x = }")
# Output: x = 42
print(f"{x**2 = }")
# Output: x**2 = 1764
Interview Tip:
"Always use f-strings -- they are the fastest and most readable. The f'{x = }' debug syntax (Python 3.8+) is a great trick for quick debugging, as it shows both the variable name and its value."
What NOT to Say: "I use %-formatting because it is like C." -- This is outdated since Python 3.6. f-strings are faster, more readable, and the modern standard.
Q12 — Common One-Liners Data Engineers Should Know
Question: Demonstrate essential Python one-liners: (a) flatten a nested list, (b) transpose a matrix, (c) get unique items preserving order, (d) merge two dicts, (e) create a dict from two lists.
Sample Input:
Expected Output:
Solution 1 — Manual loops for each operation
# Flatten nested list
nested = [[1, 2], [3, 4], [5, 6]]
flat = []
for sublist in nested:
for item in sublist:
flat.append(item)
print("Flat:", flat)
# Output: Flat: [1, 2, 3, 4, 5, 6]
# Transpose matrix
matrix = [[1, 2, 3], [4, 5, 6]]
transposed = []
for col in range(len(matrix[0])):
row = []
for r in range(len(matrix)):
row.append(matrix[r][col])
transposed.append(tuple(row))
print("Transposed:", transposed)
# Output: Transposed: [(1, 4), (2, 5), (3, 6)]
# Unique items preserving order
items = [3, 1, 2, 1, 3, 4]
seen = set()
unique = []
for item in items:
if item not in seen:
seen.add(item)
unique.append(item)
print("Unique:", unique)
# Output: Unique: [3, 1, 2, 4]
# Merge two dicts
dict1 = {'a': 1, 'b': 2}
dict2 = {'b': 3, 'c': 4}
merged = {}
for k, v in dict1.items():
merged[k] = v
for k, v in dict2.items():
merged[k] = v # dict2 values overwrite dict1
print("Merged:", merged)
# Output: Merged: {'a': 1, 'b': 3, 'c': 4}
# Dict from two lists
keys = ['a', 'b', 'c']
values = [1, 2, 3]
d = {}
for i in range(len(keys)):
d[keys[i]] = values[i]
print("Zipped:", d)
# Output: Zipped: {'a': 1, 'b': 2, 'c': 3}
Solution 2 — Built-in functions
from itertools import chain
from collections import Counter
# Flatten with itertools.chain
nested = [[1, 2], [3, 4], [5, 6]]
flat = list(chain.from_iterable(nested))
print("Flat:", flat)
# Output: Flat: [1, 2, 3, 4, 5, 6]
# Transpose with zip and unpacking
matrix = [[1, 2, 3], [4, 5, 6]]
transposed = list(zip(*matrix))
print("Transposed:", transposed)
# Output: Transposed: [(1, 4), (2, 5), (3, 6)]
# Unique preserving order with dict.fromkeys
items = [3, 1, 2, 1, 3, 4]
unique = list(dict.fromkeys(items))
print("Unique:", unique)
# Output: Unique: [3, 1, 2, 4]
# Merge dicts with unpacking (Python 3.5+)
dict1 = {'a': 1, 'b': 2}
dict2 = {'b': 3, 'c': 4}
merged = {**dict1, **dict2}
print("Merged:", merged)
# Output: Merged: {'a': 1, 'b': 3, 'c': 4}
# Dict from two lists with zip
keys = ['a', 'b', 'c']
values = [1, 2, 3]
d = dict(zip(keys, values))
print("Zipped:", d)
# Output: Zipped: {'a': 1, 'b': 2, 'c': 3}
Solution 3 — Optimal: Pythonic one-liners
from collections import Counter
# Flatten — list comprehension (no import needed)
nested = [[1, 2], [3, 4], [5, 6]]
flat = [x for sub in nested for x in sub]
print("Flat:", flat)
# Output: Flat: [1, 2, 3, 4, 5, 6]
# Transpose — zip unpacking (cleanest)
matrix = [[1, 2, 3], [4, 5, 6]]
transposed = list(zip(*matrix))
print("Transposed:", transposed)
# Output: Transposed: [(1, 4), (2, 5), (3, 6)]
# Unique preserving order — dict.fromkeys (3.7+ guarantees order)
items = [3, 1, 2, 1, 3, 4]
unique = list(dict.fromkeys(items))
print("Unique:", unique)
# Output: Unique: [3, 1, 2, 4]
# Merge dicts — pipe operator (Python 3.9+)
dict1 = {'a': 1, 'b': 2}
dict2 = {'b': 3, 'c': 4}
merged = dict1 | dict2
print("Merged:", merged)
# Output: Merged: {'a': 1, 'b': 3, 'c': 4}
# Dict from two lists
keys = ['a', 'b', 'c']
values = [1, 2, 3]
d = dict(zip(keys, values))
print("Zipped:", d)
# Output: Zipped: {'a': 1, 'b': 2, 'c': 3}
# Bonus one-liners:
# Swap two variables
a, b = 1, 2
a, b = b, a
print(a, b)
# Output: 2 1
# Most common element
items = ["a", "b", "a", "c", "a", "b"]
most_common = Counter(items).most_common(1)[0][0]
print("Most common:", most_common)
# Output: Most common: a
# Check all items satisfy a condition
numbers = [1, 2, 3, 4, 5]
all_positive = all(x > 0 for x in numbers)
print("All positive:", all_positive)
# Output: All positive: True
# Conditional assignment
x = 42 if True else 0
print(x)
# Output: 42
Interview Tip:
"These one-liners show Python fluency. Interviewers love seeing clean, idiomatic Python. Know dict.fromkeys for order-preserving dedup, zip(*matrix) for transpose, and dict1 | dict2 for merging."
What NOT to Say: "I would use a for loop to flatten a list." -- While correct, it signals you are not comfortable with Pythonic patterns. Always show the comprehension version first, then mention the loop if asked.