🐘
Hadoop
Day 1: HDFS + YARN + MapReduce — Deep Dive
🐘
🐘
Hadoop · Section 2 of 8

Day 1: HDFS + YARN + MapReduce — Deep Dive

Day 1: HDFS + YARN + MapReduce — Deep Dive

Time: 5-6 hours | Experience: 10 years — expect DEEP internals + scenario questions Philosophy: Learn the WHY, not just the WHAT. Every concept taught through the problem it solves. Levels: ⬜ Direct (what/define) | 🟨 Mid-level (how/why) | 🟥 Scenario (design/debug/fix)

SECTION 1: HDFS — HOW HADOOP STORES DATA

🧠 The Core Problem HDFS Solves

Before HDFS: You have 100 TB of booking logs. One server has 4 TB disk. You need 25 servers. But:

  • If one server crashes → that portion of data is LOST
  • To process data → move it all to one machine → network bottleneck
  • No way to scale beyond one machine's processing power

HDFS solution: Split files into blocks, store each block on 3 different servers (replication), run computation WHERE the data lives (data locality). No data movement, built-in fault tolerance.

Q1: What is HDFS Architecture? Explain every component.

Simple Explanation: HDFS (Hadoop Distributed File System) is a distributed file system that splits large files into fixed-size blocks and stores them across many commodity servers. It has two types of nodes: one master (NameNode) that tracks WHERE everything is, and many workers (DataNodes) that actually store the data blocks.

📐 Architecture Diagram
HDFS CLUSTER ARCHITECTURE:
═══════════════════════════════════════════════════════════════════

Client (your application)
    │
    ▼
┌─────────────────────────────────────────────────────────────┐
│  NAMENODE (Master — 1 per cluster, critical!)               │
│                                                             │
│  What it stores IN MEMORY (RAM — everything in RAM!):       │
│  ┌───────────────────────────────────────────────────────┐  │
│  │ FsImage  = snapshot of entire filesystem namespace    │  │
│  │           (all file paths, permissions, replication)  │  │
│  │ EditLog  = transaction log of every change since      │  │
│  │           last FsImage snapshot                       │  │
│  │ Block Map = file → list of block IDs                  │  │
│  │ Location Map = block ID → DataNode IPs (NOT on disk!) │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                             │
│  What it NEVER stores: actual data bytes                    │
│  It only stores: metadata (where data is)                  │
└────────────────────────┬────────────────────────────────────┘
                         │ heartbeat every 3 seconds
                         │ block report every 6 hours
         ┌───────────────┼───────────────┐
         ▼               ▼               ▼
┌──────────────┐  ┌──────────────┐  ┌──────────────┐
│  DATANODE 1  │  │  DATANODE 2  │  │  DATANODE 3  │
│  (Worker)    │  │  (Worker)    │  │  (Worker)    │
│              │  │              │  │              │
│  Block A     │  │  Block A     │  │  Block B     │
│  Block B     │  │  Block C     │  │  Block C     │
│  ...         │  │  ...         │  │  ...         │
│              │  │              │  │              │
│  Stores:     │  │  Stores:     │  │  Stores:     │
│  actual data │  │  actual data │  │  actual data │
│  block files │  │  block files │  │  block files │
└──────────────┘  └──────────────┘  └──────────────┘

Key: Block A is on DN1 AND DN2 → that's REPLICATION (default 3x)
     If DN1 crashes → data is still safe on DN2 and others

Key numbers to memorize:

Default block size: 128 MB (Hadoop 2+), was 64 MB in Hadoop 1
Default replication: 3 copies per block
Heartbeat interval: 3 seconds (DataNode → NameNode)
Block report interval: 6 hours (full block list from DataNode)
Missing block threshold: 10 minutes without heartbeatnode marked dead
Replication target: Rack-aware (1 copy local rack, 2 copies other rack)

Q2: How does HDFS WRITE work? Step-by-step.

Why this matters: This is the most asked HDFS internals question. If you understand the write path, you understand replication, pipeline writes, fault tolerance, and checksum verification — all in one answer.

📐 Architecture Diagram
HDFS WRITE PATH — Step by Step:
════════════════════════════════

CLIENT wants to write: booking_logs.csv (400 MB)

STEP 1: Client contacts NameNode
    Client → NameNode: "I want to write booking_logs.csv, 400 MB"
    NameNode checks:
      - Does this file already exist? (no → proceed)
      - Is there enough space across DataNodes?
    NameNode → Client: "Split into 4 blocks of 128 MB. Here are the DataNodes:"
      - Block 1 → store on DN1, DN2, DN3 (replication pipeline)
      - Block 2 → store on DN1, DN4, DN2
      - Block 3 → store on DN2, DN3, DN5
      - Block 4 → store on DN3, DN4, DN1

STEP 2: Client writes Block 1 using PIPELINE
    Client writes to DN1 → DN1 streams to DN2 → DN2 streams to DN3
    ┌────────┐   128 MB    ┌──────┐   128 MB   ┌──────┐   128 MB   ┌──────┐
    │ Client │──────────► │ DN1  │──────────►│ DN2  │──────────►│ DN3  │
    └────────┘            └──────┘           └──────┘           └──────┘
                                      ▲ pipeline — not 3 separate uploads!
                          Each packet ~64 KB, acknowledged back

STEP 3: Acknowledgement chain
    DN3 sends ACK → DN2 → DN1 → Client (block written successfully!)
    If DN2 fails mid-write:
      - Client is notified
      - Block is recovered from DN1 (partial write discarded on DN3)
      - NameNode assigns a new DataNode to complete replication

STEP 4: NameNode updates metadata
    After ALL blocks written: NameNode records file→block mapping in FsImage+EditLog
    File is now visible in HDFS namespace

STEP 5: Checksum verification
    Every 512 bytes, HDFS writes a checksum (CRC32)
    On every READ, checksum is verified → detects bit-rot automatically!
    ⚠️ If checksum fails → HDFS uses another replica and flags the bad block

What-if scenarios:

🧠 Memory Map
WHAT IF NameNode crashes during write?
→ Client gets connection error
→ Partial blocks on DataNodes are eventually cleaned up (next NameNode restart)
→ File is NOT visible in namespace (atomicity — all or nothing for file creation)
WHAT IF a DataNode in the pipeline fails?
→ Client notified via ACK failure
→ Pipeline reformed with remaining DataNodes
→ NameNode schedules re-replication to hit the replication factor again
→ Write continues (no data loss, just slight delay)
WHAT IF disk is full on a DataNode?
→ DataNode reports low disk space in heartbeat
→ NameNode stops assigning new blocks to that DataNode
→ DataNode continues serving reads for existing blocks

Q3: How does HDFS READ work? (Data Locality)

📐 Architecture Diagram
HDFS READ PATH:
═══════════════

STEP 1: Client asks NameNode for block locations
    Client → NameNode: "Give me locations for all blocks of booking_logs.csv"
    NameNode → Client: block list with DataNode locations (sorted by proximity)
    NameNode does NOT participate in actual data transfer after this!

STEP 2: Client reads blocks directly from DataNodes
    Client reads Block 1 from DN1 (closest/fastest)
    Client reads Block 2 from DN3 (if DN3 is on same rack)
    Client reads Block 3 from DN2

    DATA LOCALITY MAGIC:
    If your MapReduce job runs ON DN1 → reads Block 1 from LOCAL disk
    → No network transfer! This is "data locality" — the big Hadoop performance win

STEP 3: If a DataNode fails during read
    Client switches to another replica automatically
    NameNode is notified of the bad DataNode

DATA LOCALITY PRIORITY (YARN allocates tasks in this order):
    1. LOCAL node (same machine as data)     → fastest (local disk)
    2. LOCAL rack (same rack, different node) → fast (intra-rack network)
    3. REMOTE rack (different rack)           → slowest (inter-rack network)

Q4: What is NameNode High Availability (HA)? — CRITICAL QUESTION

The Problem with Single NameNode (Hadoop 1.x):

Hadoop 1.x had ONE NameNode. It was the SPOF (Single Point of Failure).
If NameNode server crashesENTIRE CLUSTER IS DOWN.
No one can read or write ANY file.
For a 10PB production cluster — this is catastrophic.
Also: NameNode maintenance (upgrades, patches) = cluster downtime.

The Solution — NameNode HA (Hadoop 2.x):

📐 Architecture Diagram
NAMENODE HIGH AVAILABILITY ARCHITECTURE:
═════════════════════════════════════════

┌──────────────────┐              ┌──────────────────┐
│  ACTIVE          │              │  STANDBY         │
│  NAMENODE        │◄────────────►│  NAMENODE        │
│                  │   Shared     │                  │
│  Serves clients  │   EditLog    │  In warm standby │
│  Handles all     │              │  Applies edits   │
│  metadata ops    │              │  from JournalNodes│
└────────┬─────────┘              └─────────┬────────┘
         │                                  │
         │ both write/read                  │ reads same
         ▼                                  ▼
┌──────────────────────────────────────────────────────┐
│          JOURNAL NODES (3 or 5 — odd number!)         │
│                                                      │
│  Journal Node 1   Journal Node 2   Journal Node 3    │
│  [EditLog copy]   [EditLog copy]   [EditLog copy]    │
│                                                      │
│  Active NN writes edits → majority must confirm      │
│  Standby NN reads edits → stays in sync              │
│  Quorum write: 2 of 3 must succeed (majority)        │
└──────────────────────────────────────────────────────┘
         │                     │
         ▼                     ▼
┌──────────────────────────────────────────────────────┐
│                    ZOOKEEPER                          │
│          (decides who is ACTIVE NameNode)             │
│                                                      │
│  ZooKeeper watches both NameNodes                    │
│  If Active NN misses heartbeat → ZK starts FAILOVER  │
│  ZKFC (ZooKeeper Failover Controller) on each NN     │
│  ZKFC monitors NN health → tells ZK → ZK triggers   │
│  failover → Standby becomes Active                   │
└──────────────────────────────────────────────────────┘
         │
         ▼  sends heartbeats + block reports to BOTH NNs
┌──────────────────────────────────────────────────────┐
│           ALL DATANODES                               │
│  Report to BOTH Active and Standby NameNodes         │
│  So Standby always has up-to-date block locations    │
└──────────────────────────────────────────────────────┘

FAILOVER PROCESS (automatic, ~30-60 seconds):
  1. Active NN misses ZooKeeper heartbeat
  2. ZKFC detects health failure
  3. ZooKeeper triggers failover
  4. Standby NN takes ACTIVE lock in ZooKeeper
  5. Standby NN reads remaining edits from JournalNodes
  6. Standby NN is now Active → resumes serving clients
  7. Old Active NN is FENCED (SSH kill, IPMI power off) to prevent split-brain

⚠️ SPLIT-BRAIN PROBLEM — critical to mention:

🧠 Memory Map
SPLIT-BRAIN: Both NameNodes think they are Active.
Both accept writesmetadata diverges → DATA CORRUPTION!
SOLUTION: FENCING
Before Standby becomes Active, it MUST kill the old Active.
Methods:
1. SSH fencing: SSH into old Activekill -9 the NameNode process
2. IPMI/DRAC: Power off the old Active server at hardware level
3. Shared storage fencing: revoke old Active's access to shared disk
⚠️If fencing FAILS → failover is ABORTED (better to be down than corrupted!)

Interview tip (10-year framing):

"In Hadoop 1, I've personally dealt with NameNode failures taking down entire clusters. We implemented NameNode HA in Hadoop 2 with 3 JournalNodes and ZooKeeper for automatic failover. The key lesson: always configure FENCING — without it, split-brain can corrupt the entire namespace, which is worse than downtime."

Q5: What is HDFS Federation? When do you need it?

Problem that Federation solves:

📐 Architecture Diagram
Single NameNode (even HA) has limits:
  - All metadata in ONE NameNode's RAM
  - NameNode RAM = limiting factor for cluster size
  - 1 billion files × ~150 bytes metadata = ~150 GB RAM just for metadata!
  - Active NameNode serves ALL clients → becomes bottleneck

FEDERATION SOLUTION: Multiple NameNodes, each owning a namespace volume

HDFS FEDERATION ARCHITECTURE:
═══════════════════════════════

  NameNode 1                NameNode 2                NameNode 3
  (namespace: /user)        (namespace: /data)        (namespace: /tmp)
  [metadata for /user]      [metadata for /data]      [metadata for /tmp]
        │                         │                         │
        └─────────────────────────┼─────────────────────────┘
                                  │ all NameNodes share the same DataNodes
                      ┌───────────┼───────────┐
                      ▼           ▼           ▼
                    DN 1        DN 2        DN 3
                 (all blocks  (all blocks  (all blocks
                  for all      for all      for all
                  namespaces)  namespaces)  namespaces)

Block Pools: Each NameNode has its own "block pool" on DataNodes
             Block pool = separate namespace on same disk
             DN stores blocks for ALL NameNodes (separated by pool ID)

When to use:

USE FEDERATION when:
✓ Cluster > 300 million files (RAM becoming the limit)
✓ Multiple teams with different SLAs (isolate namespaces)
✓ Want to scale metadata independently
✓ Cluster > 10,000 nodes
DO NOT use when:
✗ Small cluster (< 100M files) — complexity not worth it
✗ Single team/use case — HA alone is sufficient

Q6: The Small Files Problem — THE most common production issue

What is the small files problem?

🧠 Memory Map
HDFS is designed for LARGE files (GBs, TBs).
Problem: When you have MILLIONS of tiny files (KBs):
Example: 10 million log files, each 1 KB:
Total data: 10 million × 1 KB = ~10 GB (not that much data!)
But: each file = at least 1 metadata entry in NameNode RAM
10 million files × 150 bytes metadata = 1.5 GB RAM just for metadata!
Each file = at least 1 HDFS block (even if file is 1 KB, block is 128 MB)
MapReduce: creates 1 Map task per block → 10 million Map tasks!
Task startup overhead × 10 million = pipeline takes HOURS
THE MATH PROBLEM
1 file of 128 MB1 block, 1 Map task, 1 metadata entry
128,000 files of 1 KB each128,000 blocks, 128,000 Map tasks!
Same data volume, 128,000x more overhead!

Solutions — know ALL of these:

🧠 Memory Map
SOLUTION 1: HAR FILES (Hadoop Archive)
─────────────────────────────────────
Groups many small files into one HAR archive
Single metadata entry, but files accessible individually
hadoop archive -archiveName bookings.har -p /logs/small/ /archives/
Pros: Reduces NameNode metadata pressure
Cons: Read-only (can't append), slower random access
Use when: archiving old data you rarely need to process
SOLUTION 2: SEQUENCE FILES
─────────────────────────────────────
Binary format: key-value pairs, multiple small files merged into one
Key = filename, Value = file content
Used in: MapReduce as input, KafkaHDFS pipelines
Pros: Splittable (supports parallel reads), fast
Cons: Not human-readable, only MapReduce/Spark can read
Use when: processing pipeline, not for human inspection
SOLUTION 3: COMBINE INPUT FORMAT (MapReduce)
─────────────────────────────────────
CombineFileInputFormat groups multiple small files into one Map task
Instead of 10,000 files = 10,000 map tasks:
CombineFileInputFormat = 100 map tasks (each processes 100 files)
In Hive: SET hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
SOLUTION 4: HIVE MERGE ON INSERT (most practical)
─────────────────────────────────────
Hive creates small files on every INSERT INTO partition
Solution: merge small files after write
SET hive.merge.mapfiles=true; -- merge after map-only jobs
SET hive.merge.mapredfiles=true; -- merge after map-reduce jobs
SET hive.merge.size.per.task=256000000; -- target 256 MB per merged file
SET hive.merge.smallfiles.avgsize=16000000; -- trigger if avg < 16 MB
SOLUTION 5: SPARK COALESCE/REPARTITION (modern approach)
─────────────────────────────────────
After processing, write fewer, larger files:
df.coalesce(10).write.parquet("/output/bookings/")
-- Reduces from 10,000 tiny files to 10 files of ~128 MB each
This is the MODERN solution — most interviewers expect this answer!
SOLUTION 6: AVOID CREATING SMALL FILES
─────────────────────────────────────
Root cause fix: why are small files being created?
Streaming writes: use batching (Kafka → buffer 128 MB before writing)
Many small partitions: use fewer, coarser partitions
Dynamic partitions: too many partition values → too many files
Solution: partition by month instead of day if data is sparse

Q7: YARN Architecture — How Hadoop manages resources

The problem YARN solves:

Hadoop 1.x: MapReduce did BOTH resource management AND job execution.
JobTracker = resource manager + job scheduler (one server does everything)
TaskTracker = executes Map/Reduce tasks on each node
Problems:
Only MapReduce could run (no Spark, no Storm, no other frameworks)
JobTracker was SPOF and bottleneck (managed everything for ALL jobs)
No resource isolation between jobs
YARN"Yet Another Resource Negotiator"
Decouples resource management from job execution
ANY computation framework can now run: Spark, MapReduce, Flink, Tez, Storm
📐 Architecture Diagram
YARN ARCHITECTURE:
══════════════════

┌───────────────────────────────────────────────────────────────┐
│  RESOURCE MANAGER (one per cluster — HA supported)            │
│                                                               │
│  ┌─────────────────────┐  ┌─────────────────────────────────┐ │
│  │   SCHEDULER         │  │   APPLICATIONS MANAGER          │ │
│  │                     │  │                                 │ │
│  │  Allocates          │  │  Accepts job submissions        │ │
│  │  containers based   │  │  Starts ApplicationMaster       │ │
│  │  on policy          │  │  Monitors AM health             │ │
│  │  (Capacity/Fair)    │  │  Restarts AM on failure         │ │
│  └─────────────────────┘  └─────────────────────────────────┘ │
└───────────────────────────────────────────────────────────────┘
              │
              │ manages resources on
              ▼
┌─────────────────────┐  ┌─────────────────────┐  ┌─────────────────────┐
│  NODE MANAGER       │  │  NODE MANAGER       │  │  NODE MANAGER       │
│  (one per node)     │  │  (one per node)     │  │  (one per node)     │
│                     │  │                     │  │                     │
│  Reports available  │  │  Manages containers │  │  Monitors container │
│  CPU/RAM/disk       │  │  on this node       │  │  health             │
│  to ResourceManager │  │                     │  │                     │
│                     │  │  Container = unit   │  │  Kills containers   │
│  Runs containers    │  │  of resource        │  │  when RM says so    │
│  (actual tasks)     │  │  (CPU + RAM slice)  │  │                     │
└─────────────────────┘  └─────────────────────┘  └─────────────────────┘
         │
         ▼ runs inside containers
┌─────────────────────────────────────────────────────────────────────┐
│  APPLICATION MASTER (one per job — runs IN a container!)            │
│                                                                     │
│  For each submitted job:                                            │
│  1. RM starts one AM for this specific job                          │
│  2. AM negotiates containers from RM for its tasks                  │
│  3. AM communicates with NMs to launch tasks in containers          │
│  4. AM monitors task progress, handles failures, re-submits tasks   │
│  5. AM reports job completion to RM, then exits                     │
│                                                                     │
│  Key insight: AM is JOB-SPECIFIC — Spark has SparkAM,              │
│  MapReduce has MRAppMaster, Flink has its own AM                    │
└─────────────────────────────────────────────────────────────────────┘

WHAT-IF scenarios for YARN:

🧠 Memory Map
WHAT IF ResourceManager crashes?
→ RM HA: Active/Standby RM with ZooKeeper (like NameNode HA)
→ Running jobs continue (AM is running independently in containers)
→ New RM recovers job state from ZooKeeper (work preservation)
→ AM reconnects to new RM
WHAT IF ApplicationMaster crashes?
→ RM detects AM failure (missed heartbeat)
→ RM restarts AM (up to yarn.am.max-attempts times, default 2)
→ Restarted AM can recover task progress from completed tasks
→ ⚠️ If AM fails more than max-attemptsentire job fails
WHAT IF a NodeManager crashes?
→ NM stops sending heartbeats to RM
→ RM marks NM as dead after timeout
→ RM asks other NMs to run the tasks that were on dead NM
→ Containers on dead NM = their tasks are rescheduled

Q8: YARN Schedulers — Three types, know when to use each

🧠 Memory Map
THREE SCHEDULERS
1. FIFO SCHEDULER (First In, First Out)
───────────────────────────────────
How it works: Jobs run one at a time in submission order.
Job 2 waits until Job 1 is 100% complete before starting.
Queue: [Job1 (ETL, 100 GB)] [Job2 (analyst query)] [Job3 (ML)]
Job 1 uses ALL cluster resources until done.
Job 2 and 3 wait.
✓ Simple, maximum resource use for one job
✗ Interactive queries starve behind long batch jobs
Use when: single-user cluster, dev/test only
NEVER use in production with multiple teams.
2. CAPACITY SCHEDULER (default in Apache Hadoop)
───────────────────────────────────
How it works: Divide cluster into QUEUES with guaranteed capacity.
Each queue gets a % of cluster resources.
Queue can borrow from others if they're idle.
Config: capacity-scheduler.xml
Example at Amadeus:
ETL queue: 40% guaranteed (overnight batch jobs)
Analytics queue: 30% guaranteed (BI queries, 9-6 PM)
ML queue: 20% guaranteed (data science)
Default queue: 10% guaranteed (everything else)
✓ Multiple teams share cluster fairly
✓ Guaranteed minimum resources per team
✓ Elastic: can use idle capacity from other queues
Use when: multi-tenant cluster, multiple teams with SLAs
3. FAIR SCHEDULER (default in Cloudera CDH)
───────────────────────────────────
How it works: ALL jobs get equal share of resources.
Resources rebalanced as jobs arrive/finish.
3 jobs runningeach gets 33%
2 jobs doneremaining job gets 100%
New job arriveseach gets 50%
✓ No starvation — every job makes progress
✓ Interactive + batch mix well
✓ Preemption: can kill low-priority tasks to give resources to high-priority
Use when: mixed workloads (interactive + batch), fairness is priority
WHICH TO USE
Dev/test onlyFIFO
Enterprise productionCapacity (guaranteed SLAs per team)
Mixed interactive+batchFair (Cloudera default)

Q9: MapReduce — How it REALLY works inside

The mental model: MapReduce is a distributed computing framework. You describe WHAT to compute (not HOW to distribute it). Hadoop handles: data distribution, parallel execution, fault tolerance, and aggregation.

📐 Architecture Diagram
MAPREDUCE COMPLETE EXECUTION FLOW:
════════════════════════════════════

INPUT DATA (in HDFS):
/bookings/2026/march/*.csv (10 GB of booking files)

STEP 1: JOB SUBMISSION
  Client submits job to YARN ResourceManager
  RM starts ApplicationMaster (MRAppMaster) in a container
  AM reads input splits from HDFS (one split per HDFS block = 128 MB)
  AM requests containers from RM for Map tasks

STEP 2: INPUT SPLITS & RECORD READER
  InputFormat splits input into InputSplits (usually = 1 HDFS block each)
  RecordReader reads each split → produces key-value pairs for Mapper
  Default: TextInputFormat → key=line offset (Long), value=line text (Text)

  Split 1 (block 1, 128 MB) → Map Task 1 on Node 1 (DATA LOCALITY!)
  Split 2 (block 2, 128 MB) → Map Task 2 on Node 2
  ...
  Split 80 (block 80) → Map Task 80 on Node 80

STEP 3: MAP PHASE (runs in parallel on all nodes)
  Each Mapper:
    - Reads its InputSplit line by line
    - Applies your map() function to each record
    - Outputs intermediate key-value pairs

  Example: Count bookings per airline
    map("AI,BOM,LHR,500") → emit("AI", 1)
    map("LH,FRA,JFK,800") → emit("LH", 1)
    map("AI,DEL,DXB,300") → emit("AI", 1)

STEP 4: COMBINER (optional, runs on each node BEFORE shuffle)
  "Mini-reducer" that runs locally on each Map task output
  Reduces data volume BEFORE sending to reducers (network optimization!)

  Without combiner: send 1,000,000 ("AI", 1) pairs across network
  With combiner:    combine locally first → send ("AI", 50000) per node

  ⚠️ Combiner function must be same as Reducer function (associative+commutative)
  ⚠️ Combiner is an OPTIMIZATION, not guaranteed to run (Hadoop may skip it)
  Use when: reduce function is associative (sum, count, max/min — YES; avg — NO!)

STEP 5: PARTITIONER (decides which Reducer gets which keys)
  After Map+Combine: output sorted by key within each Map task
  Partitioner determines: which Reducer handles which keys

  Default: HashPartitioner → (key.hashCode() & Integer.MAX_VALUE) % numReducers
  Result: ALL records with key "AI" → same Reducer (regardless of which Map produced it)

  ⚠️ BAD partitioner → DATA SKEW (one reducer gets 90% of data, others idle)
  Custom partitioner: override to distribute evenly by key range or custom logic

STEP 6: SHUFFLE & SORT (the heart of MapReduce, most expensive phase)
  ────────────────────────────────────────────────────────────────────
  What happens:
  1. Map output written to LOCAL DISK (not HDFS!) in Map task's buffer
  2. Map output sorted by key in buffer (in-memory sort)
  3. When buffer is 80% full → SPILL to local disk (sort + spill file)
  4. Multiple spill files → MERGED and sorted (merge sort)
  5. Each Reducer FETCHES (via HTTP) its assigned partitions from ALL Mappers
     → This is the NETWORK TRANSFER (the expensive part!)
  6. Reducer MERGES all fetched files → one sorted input stream

  The shuffle data flow:
  Mapper 1 output → Sort → [Partition for R0][Partition for R1][Partition for R2]
  Mapper 2 output → Sort → [Partition for R0][Partition for R1][Partition for R2]
  Mapper N output → Sort → [Partition for R0][Partition for R1][Partition for R2]
                             ▼                  ▼                  ▼
                         Reducer 0          Reducer 1          Reducer 2
                         fetches all        fetches all        fetches all
                         R0 partitions      R1 partitions      R2 partitions

STEP 7: REDUCE PHASE
  Each Reducer:
    - Receives sorted input: all ("AI", 1), all ("LH", 1), etc.
    - Calls reduce() once per unique key with all values
    - Outputs final key-value results

  reduce("AI", [1,1,1,...,1]) → emit("AI", 150000) [total AI bookings]
  reduce("LH", [1,1,1,...,1]) → emit("LH", 89000)

STEP 8: OUTPUT
  Reducer output written to HDFS (final output directory)
  Output: _SUCCESS file (job succeeded) + part-r-00000, part-r-00001 (output files)
  Number of output files = number of Reducers

What-if scenarios:

🧠 Memory Map
WHAT IF a Map task fails?
→ AM detects failure (task tracker error or timeout)
→ AM reschedules the same Map task on a different node
→ Default retries: 4 times
→ If all 4 failjob fails
WHAT IF a Reduce task fails?
→ Same: AM reschedules
→ But Reduce must re-fetch all shuffle data again (expensive!)
WHAT IF one Reducer is much slower than others (SPECULATIVE EXECUTION)?
→ Hadoop launches a DUPLICATE task on another node for slow tasks
→ Whichever finishes firstwins, other is killed
→ Controlled by: mapreduce.map.speculative=true (default)
→ Helps when: node is degraded (bad disk, CPU issue) but not failed
WHAT IF output directory already exists?
→ Job FAILS immediately with FileAlreadyExistsException
→ HDFS does NOT overwrite directories (prevents accidental data loss)
→ Fix: delete output directory before job: hdfs dfs -rm -r /output/

Q10: HDFS Important Commands — Know these cold

bash
# ═══════════════════════════════════
# FILE SYSTEM OPERATIONS
# ═══════════════════════════════════
hdfs dfs -ls /user/data/bookings/          # List directory contents
hdfs dfs -ls -R /user/data/                # List RECURSIVELY (all subdirs)
hdfs dfs -du -s -h /user/data/bookings/   # Disk usage (-h=human readable, -s=summary)
hdfs dfs -mkdir -p /user/data/2026/march/ # Create directory (and parents with -p)
hdfs dfs -put bookings.csv /user/data/    # Upload local file to HDFS
hdfs dfs -get /user/data/bookings.csv .   # Download from HDFS to local
hdfs dfs -cat /user/data/file.csv         # Print file contents to stdout
hdfs dfs -tail /user/data/logs.txt        # Print last 1 KB of file (like Unix tail)
hdfs dfs -cp /src/file.csv /dst/          # Copy within HDFS
hdfs dfs -mv /src/file.csv /dst/          # Move within HDFS
hdfs dfs -rm /user/data/old.csv           # Delete file (goes to Trash by default!)
hdfs dfs -rm -r /user/data/old_dir/      # Delete directory recursively
hdfs dfs -rm -skipTrash /user/data/old.csv  # Delete PERMANENTLY (bypass Trash)
hdfs dfs -expunge                          # Empty Trash (permanently delete)
hdfs dfs -setrep -R 2 /user/data/archive/ # Change replication factor to 2 (save space)
hdfs dfs -checksum /user/data/file.csv    # Get MD5/CRC checksum of file

# ═══════════════════════════════════
# ADMIN OPERATIONS
# ═══════════════════════════════════
hdfs dfsadmin -report                     # Cluster status (live nodes, capacity, used)
hdfs dfsadmin -safemode get               # Check if HDFS is in safe mode
hdfs dfsadmin -safemode leave             # Force exit safe mode (after NameNode restart)
hdfs dfsadmin -refreshNodes               # Re-read includes/excludes (add/remove DNs)
hdfs dfsadmin -setQuota 100 /user/team1/ # Set namespace quota (max 100 files)
hdfs dfsadmin -setSpaceQuota 1t /user/team1/ # Set storage quota (1 TB)

# ═══════════════════════════════════
# FSCK — Filesystem health check
# ═══════════════════════════════════
hdfs fsck /user/data/bookings/            # Check health of all files
hdfs fsck / -files -blocks -locations     # Show all files + block locations
hdfs fsck / -list-corruptfileblocks       # List corrupted blocks
# Output to watch for:
# "HEALTHY" = all good
# "Under-replicated blocks" = some blocks don't have 3 replicas (DataNode down?)
# "Missing blocks" = data loss! Block has 0 replicas (need to recover from backup)
# "Corrupt blocks" = checksum mismatch (need to restore from another replica)

# ═══════════════════════════════════
# BALANCER — Fix data skew across DataNodes
# ═══════════════════════════════════
hdfs balancer -threshold 10               # Balance cluster (max 10% imbalance)
# Run when: adding new DataNodes (they start empty, all writes go to them)
# Balancer moves blocks from full nodes to empty nodes gradually

# ═══════════════════════════════════
# SAFE MODE — Important to understand
# ═══════════════════════════════════
# Safe mode = NameNode startup state
# During safe mode: HDFS is READ-ONLY (no writes allowed)
# NameNode waits for DataNodes to report their blocks
# Safe mode exits when: enough blocks have minimum replication
# ⚠️ If NameNode is stuck in safe mode → cluster appears DOWN
# Fix: hdfs dfsadmin -safemode leave  (only if blocks are actually replicated!)

Q11: Hadoop 1 vs Hadoop 2 vs Hadoop 3 — The evolution

🧠 Memory Map
HADOOP 1 (2006-2012)
✓ HDFS + MapReduce (one framework)
✗ Single NameNode (SPOF)
✗ JobTracker bottleneck (all scheduling in one daemon)
✗ Only MapReduce (no other frameworks)
✗ Max cluster: ~4000 nodes
HADOOP 2 (2012-2017): Major redesign
+ YARN: decoupled resource managementany framework runs
+ NameNode HA: Active/Standby + JournalNodes
+ HDFS Federation: multiple NameNodes
+ Snapshot support (create point-in-time HDFS snapshots)
+ NFS gateway (mount HDFS as NFS drive)
Max cluster: 10,000+ nodes
HADOOP 3 (2017-present): Key improvements
+ HDFS ERASURE CODING (replaces 3x replication!)
Old: store 128 MB block → 384 MB used (3 copies)
New: store 128 MB block → ~192 MB used (EC-encoded)
50% storage savings! But: higher CPU cost for reconstruction
Use for: cold/archive data (rarely read, storage matters)
Keep replication for hot data (often read, fast recovery needed)
+ YARN Timeline Server v2 (better job history + metrics)
+ Opportunistic Containers (run low-priority tasks in spare capacity)
+ Multiple Standby NameNodes (more than 1 standby for HA)
+ Minimum Java 8 (drops Java 7 support)
+ Intra-DataNode balancer (balance disks within one DataNode)

⚡ SECTION 8: OPTIMIZATIONS — THE MOST IMPORTANT SECTION

Senior engineers are judged on this. Knowing "what is MapReduce" is basic. Knowing "which config knob to turn and WHY" is what gets you hired at 10 years.

🧠 OPTIMIZATION MASTER MEMORY MAP

HADOOP OPTIMIZATION"MY BLOCK COMPRESS FLY"
MMemory sizing (mapper/reducer heap, YARN container sizes)
YYARN container settings (vcores, memory ratio)
BBlock size tuning (bigger blocks for large files)
LLocality maximized (rack-aware placement)
OOutput compression (reduce network + disk IO)
CCombiner (mini-reducer, cut network traffic 80%)
Kpartitioning (uniform Reduce load distribution)
CCompression codecs (Snappy=fast, GZIP=small, LZO=splittable)
OORC/Parquet for Hive (columnar = 10x faster)
MMap join (small tables broadcast, avoid shuffle)
PParallel copy during shuffle (mapreduce.reduce.shuffle.parallelcopies)
RReplication factor for cold data (drop to 2 or 1)
EErasure coding (Hadoop 3, 50% storage savings for cold data)
SSpeculative execution (re-run slow tasks automatically)
SSort buffer tuning (mapreduce.task.io.sort.mb)

Q12: MapReduce Optimization — All Settings Explained

💡 Interview Tip
🎤Interviewer will ask:
: "You have a MapReduce job that runs for 6 hours. How do you optimize it?"

STEP 1: Optimize Memory (Most Common Problem)

xml
<!-- mapred-site.xml — MapReduce memory config -->

<!-- Container memory for Map tasks -->
<property>
  <name>mapreduce.map.memory.mb</name>
  <value>2048</value>
  <!-- Default is 1024 MB. Increase if mappers are getting killed (OOM) -->
  <!-- Rule: set to 1.5x - 2x your data per mapper -->
</property>

<!-- Container memory for Reduce tasks -->
<property>
  <name>mapreduce.reduce.memory.mb</name>
  <value>4096</value>
  <!-- Reducers need more memory than Mappers (they hold sorted data) -->
  <!-- Rule: 2x mapper memory is a safe starting point -->
</property>

<!-- JVM heap for Map tasks (must be LESS than container memory) -->
<property>
  <name>mapreduce.map.java.opts</name>
  <value>-Xmx1638m</value>
  <!-- Rule: 80% of mapreduce.map.memory.mb -->
  <!-- 2048 * 0.8 = 1638 MB — remaining 20% is for JVM overhead -->
</property>

<!-- JVM heap for Reduce tasks -->
<property>
  <name>mapreduce.reduce.java.opts</name>
  <value>-Xmx3276m</value>
  <!-- Rule: 80% of mapreduce.reduce.memory.mb = 4096 * 0.8 = 3276 MB -->
</property>

STEP 2: Sort Buffer (Shuffle Performance)

xml
<!-- The sort buffer is IN-MEMORY — bigger = fewer disk spills = FASTER shuffle -->

<property>
  <name>mapreduce.task.io.sort.mb</name>
  <value>512</value>
  <!-- Default: 100 MB — way too small for large jobs -->
  <!-- Increase to 512 MB or more to reduce disk spills during shuffle -->
  <!-- Must fit within map container memory (mapreduce.map.memory.mb) -->
  <!-- Tuning: If you see "Spilling to disk" logs → increase this first! -->
</property>

<!-- When buffer is X% full, start spilling to disk (while still mapping) -->
<property>
  <name>mapreduce.map.sort.spill.percent</name>
  <value>0.80</value>
  <!-- Default: 0.80 (80%) — spill when 80% of sort buffer is used -->
  <!-- Increasing to 0.90 keeps more in memory but risks OOM -->
</property>

<!-- Number of parallel sort/merge threads for spill files -->
<property>
  <name>mapreduce.task.io.sort.factor</name>
  <value>100</value>
  <!-- Default: 10 — how many spill files merged at once -->
  <!-- Increase to reduce the number of merge passes → fewer I/O operations -->
</property>

STEP 3: Compression (Biggest Network Win)

xml
<!-- Compress map output (the shuffle data — this crosses network!) -->
<property>
  <name>mapreduce.map.output.compress</name>
  <value>true</value>
  <!-- TRUE = compress data during shuffle → HUGE network bandwidth savings -->
  <!-- Cost: CPU for compress/decompress (usually worth it on network-bound jobs) -->
</property>

<property>
  <name>mapreduce.map.output.compress.codec</name>
  <value>org.apache.hadoop.io.compress.SnappyCodec</value>
  <!-- Snappy: FAST compress/decompress, moderate compression ratio -->
  <!-- Best choice for intermediate shuffle data (speed matters here) -->
  <!-- Alternatives: LZ4 (even faster), Zlib (smaller but slower) -->
</property>

<!-- Compress final job output -->
<property>
  <name>mapreduce.output.fileoutputformat.compress</name>
  <value>true</value>
</property>

<property>
  <name>mapreduce.output.fileoutputformat.compress.codec</name>
  <value>org.apache.hadoop.io.compress.GzipCodec</value>
  <!-- GZIP for final output: better compression ratio (smaller files stored long-term) -->
  <!-- ⚠️ GZIP is NOT splittable! Use LZO or Snappy if next job needs to read this -->
  <!-- LZO: splittable + fast — best for output that will be input to another MR job -->
  <!-- Bzip2: most compressed, splittable, but VERY slow compression -->
</property>

COMPRESSION CODEC COMPARISON TABLE

CODECSpeedRatioSplittableBest For
SnappyFASTMediumNOShuffle intermediate data, hot data
LZ4FASTERMediumNOReal-time, fastest option
LZOFastMediumYES*Pipeline data (input to next job)
GZIP/ZlibMediumHighNOFinal archive output (read rarely)
Bzip2SLOWHighestYESCold archive (max compression)
ORC/ParquetN/AHighestYESHive tables (columnar, built-in compress)

* LZO needs an index file created (lzop --index) to be splittable

STEP 4: Number of Reducers

xml
<!-- Number of reduce tasks (parallel reducers) -->
<property>
  <name>mapreduce.job.reduces</name>
  <value>10</value>
  <!-- Default: 1 (terrible for large jobs!) -->
  <!-- Rule of thumb: 0.95 × (nodes × max_containers_per_node) -->
  <!-- Example: 10 nodes × 4 containers = 40 → set reducers = 38 -->
  <!-- Too many reducers: overhead of managing many tasks + small output files -->
  <!-- Too few reducers: bottleneck, data skew impact is worse -->
  <!-- 0 reducers: no shuffle phase (map-only job) — for filter/transform jobs! -->
</property>

STEP 5: JVM Reuse (Huge Win for Many Small Tasks)

xml
<!-- Reuse JVM containers across multiple mapper tasks -->
<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <!-- Default: 1 (new JVM per task = slow startup for each task!) -->
  <!-- Setting to 10: same JVM handles 10 tasks before restarting -->
  <!-- Setting to -1: unlimited reuse (one JVM per slot for entire job) -->
  <!-- HUGE gain when you have MANY small mapper tasks (small files problem!) -->
  <!-- ⚠️ Can cause memory leaks if tasks have bugs → use with stable code only -->
</property>

STEP 6: Speculative Execution

xml
<!-- Automatically re-run slow (straggler) tasks on another node -->
<property>
  <name>mapreduce.map.speculative</name>
  <value>true</value>
  <!-- Default: true — Hadoop automatically detects slow mappers and duplicates them -->
</property>

<property>
  <name>mapreduce.reduce.speculative</name>
  <value>true</value>
  <!-- Default: true -->
  <!-- ⚠️ Disable if your tasks have side effects (writing to DB, calling API) -->
  <!-- Otherwise same row could be written TWICE to the external system! -->
</property>

Q12 INTERVIEW ANSWER — "How to optimize a 6-hour MapReduce job?"

🧠 Memory Map
"I would approach this systematically — 5 steps:
1. DIAGNOSE FIRST: Look at the job logs — is it slow in map phase or reduce phase?
Is there 'Spilling to disk' in logs? Are reducers getting skewed data?
2. MEMORY + SORT BUFFER:
Increase mapreduce.task.io.sort.mb from 100 MB512 MB (reduce disk spills)
Set map memory to 2 GB, Java opts to 80% = 1.6 GB
Result: fewer shuffle spills → faster map phase
3. COMPRESSION:
Enable mapreduce.map.output.compress with Snappy codec
This cuts shuffle network traffic by 50-70%
Result: HUGE win on network-bound jobs
4. COMBINERS:
Add a Combiner class = same logic as Reducer
Runs on mapper's local output BEFORE shuffle
Cuts data volume going over network by 60-80% for aggregation jobs
5. REDUCERS + PARTITIONING:
If job has only 1 reducer → parallelism is the bottleneck
Increase mapreduce.job.reduces to nodes × containers × 0.95
Use custom Partitioner to ensure even data distribution
6. JVM REUSE:
If job has thousands of small tasks, set mapreduce.job.jvm.numtasks=10
Saves JVM startup overhead for every task"

Q13: HDFS Block Size Optimization

🧠 Memory Map
BLOCK SIZE TUNING
Default: 128 MB (Hadoop 2/3), was 64 MB in Hadoop 1
WHEN TO INCREASE BLOCK SIZE (256 MB, 512 MB, 1 GB)
✓ Very large files (terabytes of log data, genomics data)
✓ Sequential scan workloads (full table scans in Hive/MapReduce)
✓ Want fewer blocksless NameNode memory pressure
✓ Fewer mapper tasksless task overhead
WHEN TO KEEP SMALL BLOCK SIZE (64 MB, 128 MB)
✓ Lots of small-to-medium files
✓ Random read patterns (HBase uses 64 MB HDFS blocks!)
✓ When parallelism matters more than overhead
FORMULA FOR OPTIMAL BLOCK SIZE
Goal: each mapper processes 1-2 blocks in 1-2 minutes
If mappers finish in 2 secondsblocks too small → increase
If mappers run for 30 minutesblocks too large → decrease
PRACTICAL EXAMPLE
Daily log file: 10 GBat 128 MB = 80 blocks = 80 mappers (good!)
Daily log file: 10 GBat 64 MB = 160 blocks = 160 mappers (more parallelism, more overhead)
Yearly archive: 10 TBat 128 MB = 80,000 blocks (high NameNode memory)
at 512 MB = 20,000 blocks (4x less NameNode memory!)
CHANGE BLOCK SIZE PER FILE
bash
# Set block size when writing a specific file (512 MB blocks)
hdfs dfs -D dfs.blocksize=536870912 -put large_file.dat /data/archive/

# Or set in your application
# In MapReduce job config:
job.getConfiguration().set("dfs.blocksize", "536870912");

Q14: YARN Resource Optimization

xml
<!-- yarn-site.xml — resource manager settings -->

<!-- Total memory available for YARN on each NodeManager -->
<property>
  <name>yarn.nodemanager.resource.memory-mb</name>
  <value>49152</value>
  <!-- On a 64 GB node: reserve 8 GB for OS + other services = 56 GB, then YARN gets 48 GB -->
  <!-- Rule: YARN memory = total RAM - OS overhead (8-16 GB) - HBase/etc memory -->
</property>

<!-- Total CPU vcores available for YARN on each NodeManager -->
<property>
  <name>yarn.nodemanager.resource.cpu-vcores</name>
  <value>16</value>
  <!-- On a 16-core node: give all 16 to YARN (OS uses very little normally) -->
</property>

<!-- Minimum container size (floor) -->
<property>
  <name>yarn.scheduler.minimum-allocation-mb</name>
  <value>512</value>
  <!-- No container gets less than 512 MB — prevents tiny useless containers -->
</property>

<!-- Maximum container size (ceiling) -->
<property>
  <name>yarn.scheduler.maximum-allocation-mb</name>
  <value>16384</value>
  <!-- No single container gets more than 16 GB (prevents one job hogging all RAM) -->
</property>

<!-- Minimum vcores per container -->
<property>
  <name>yarn.scheduler.minimum-allocation-vcores</name>
  <value>1</value>
</property>

<!-- Maximum vcores per container -->
<property>
  <name>yarn.scheduler.maximum-allocation-vcores</name>
  <value>4</value>
</property>

YARN Capacity Scheduler Optimization

xml
<!-- capacity-scheduler.xml -->

<!-- Define queue capacities (must add up to 100%) -->
<property>
  <name>yarn.scheduler.capacity.root.production.capacity</name>
  <value>70</value>
  <!-- Production queue gets 70% of cluster resources guaranteed -->
</property>

<property>
  <name>yarn.scheduler.capacity.root.development.capacity</name>
  <value>20</value>
</property>

<property>
  <name>yarn.scheduler.capacity.root.adhoc.capacity</name>
  <value>10</value>
</property>

<!-- Maximum capacity (can use idle resources from other queues) -->
<property>
  <name>yarn.scheduler.capacity.root.production.maximum-capacity</name>
  <value>90</value>
  <!-- Production can burst to 90% if other queues are idle -->
</property>

<!-- User limit: how much one user can use within a queue -->
<property>
  <name>yarn.scheduler.capacity.root.production.user-limit-factor</name>
  <value>2</value>
  <!-- One user can use up to 2x their fair share within the queue -->
</property>

Q15: Data Skew in MapReduce — Problem + Solutions

⚠️ Common Trap
THIS IS A MUST-KNOW SCENARIO QUESTION FOR 10-YEAR ENGINEERS
PROBLEM: Data Skew
Your MapReduce job has 100 reducers, 99 finish in 5 minutes
But 1 reducer takes 3 HOURS — the whole job waits for it!
Reason: one key has 10 million values, others have 100 values each
EXAMPLE
Booking data grouped by country_code
US: 10 million bookingsone reducer handles ALL US data
Fiji: 200 bookings
SOLUTIONS
1. SALTING (most common technique):
Add random suffix to hot key: "US_0", "US_1", "US_2" ... "US_99"
Now US data splits across 100 reducers
Downside: need a second step to re-aggregate the salted results
2. CUSTOM PARTITIONER:
Write custom Partitioner that knows "US" is hot
Sends US data to multiple reducers intentionally
More maintenance but precise control
3. SAMPLING:
Sample data first, find hot keys
Use TotalOrderPartitioner for sorted output without skew
4. COMBINERS:
Reduce data at mapper output BEFORE shuffle
Cuts how much data hot-key reducer receives from each mapper
5. FOR HIVE (Day 2 — but mention here):
SET hive.groupby.skewindata=true → Hive automatically handles skew
Uses two-phase aggregation for skewed GROUP BY

Q16: HDFS Replication Optimization

bash
# Change replication factor for cold/archive data (save storage)
hdfs dfs -setrep -w 1 /data/archive/2020/     # Old archive → 1 copy (risk: data loss!)
hdfs dfs -setrep -w 2 /data/historical/2023/  # Historical → 2 copies (balanced)
hdfs dfs -setrep -w 3 /data/production/       # Hot production data → 3 copies (default)

# Check current replication factor of a file
hdfs fsck /data/archive/2020/ -files | grep "replication"

# HDFS Balancer — rebalance blocks across DataNodes (run after adding new nodes)
hdfs balancer -threshold 10
# threshold: consider node "balanced" if within 10% of average utilization
# Run this AFTER adding new DataNodes — new nodes start empty, data doesn't auto-move!

# Or run balancer as background process
hdfs balancer -threshold 5 &
🧠 Memory Map
REPLICATION TUNING STRATEGY
HOT DATA (accessed daily)
→ replication = 3 (default, fast recovery, data locality for jobs)
WARM DATA (accessed weekly/monthly)
→ replication = 2 (saves 33% storage)
COLD DATA (accessed rarely, archive)
→ replication = 1 (saves 67% storage vs default)
→ OR: Hadoop 3 ERASURE CODING (saves ~50% vs default 3x replication)
→ EC-6-3-1024k: store 6 data blocks + 3 parity blocks
→ can lose any 3 of 9 blocks and still recover
→ storage overhead: 1.5x vs 3x replication = 50% savings
⚠️Erasure coding has higher CPU cost for recovery — use only for cold/archive data!

Q17: MapReduce Performance Summary — Interview Answer Framework

"When I get a slow MapReduce job, I follow this checklist (mnemonic: MCJ-COMP):
MMemory: increase sort buffer (task.io.sort.mb), container memory, Java heap
CCombiners: add combiner to cut shuffle data 60-80% for aggregation jobs
JJVM Reuse: set jvm.numtasks=10 for jobs with many small tasks
CCompression: enable map output compress with Snappy (cuts network 50-70%)
OOutput: compress final output with GZIP (smaller files on disk)
MMore Reducers: increase from 1 to nodes × containers × 0.95
PPartitioner: ensure even key distribution (custom partitioner for skewed data)
MOST COMMON FIX: 90% of slow jobs = sort buffer too small OR map output not compressed
SECOND MOST COMMON: Only 1 reducer when there should be 20+
For Hive jobs (same cluster): ORC format + vectorization + Tez engine — Day 2 topic."