Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Chapter 6: Fault Tolerance and Reliability

In distributed systems, failure is not an exception — it is the norm. Hadoop was designed from the ground up to operate reliably in environments where individual components fail regularly. This chapter explores the sophisticated fault tolerance mechanisms that make Hadoop one of the most resilient distributed systems ever built. The intellectual foundation for Hadoop’s fault tolerance comes from two sources. First, the Google File System paper (2003), which reported that Google’s clusters experienced roughly 1,000 individual disk failures and dozens of machine failures per day across their fleet. Google’s engineers did not try to prevent these failures — they designed a system that treated them as routine events. Second, Jim Gray’s landmark work on fault-tolerant computing at Tandem Computers (1980s), which established the principle that hardware will fail and software must compensate. The Hadoop project inherited both philosophies: replicate data, retry failed computations, and never trust any single machine. What makes Hadoop’s approach distinctive is the separation of concerns. HDFS handles data durability (your data survives hardware failures), MapReduce handles computation reliability (your job completes despite task failures), and YARN handles resource availability (the cluster continues scheduling work despite node losses). Each layer has its own fault tolerance mechanisms, and they compose together to create a system where the end user rarely sees the failures that are constantly happening underneath.
Chapter Goals:
  • Master HDFS replication and recovery mechanisms
  • Understand MapReduce fault tolerance strategies
  • Learn YARN reliability features
  • Explore failure detection and handling
  • Design fault-tolerant Hadoop applications

The Philosophy of Failure in Distributed Systems

The intellectual roots of Hadoop’s fault tolerance trace back to a single, uncomfortable truth that Jim Gray articulated in his 1985 paper “Why Do Computers Stop and What Can Be Done About It?”: hardware fails, software has bugs, and operators make mistakes. The question is never whether a component will fail, but when and how many at once. Google internalized this reality when building GFS and MapReduce in the early 2000s. Their commodity hardware clusters experienced disk failures daily, node reboots weekly, and entire rack outages quarterly. The original GFS paper (2003) explicitly states that “component failures are the norm rather than the exception.” Hadoop inherited this philosophy wholesale. Every design decision — from triple replication to speculative execution — flows from the assumption that individual components are unreliable, and reliability must be an emergent property of the system as a whole. This is the same principle that underpins modern cloud-native systems: AWS designs every service to tolerate Availability Zone failures, Kubernetes restarts crashed pods automatically, and Cassandra replicates across data centers. Hadoop was arguably the first open-source system to make this philosophy accessible to the entire industry.

Why Fault Tolerance is Critical

+---------------------------------------------------------------+
|              FAILURE PROBABILITY AT SCALE                     |
+---------------------------------------------------------------+
|                                                               |
|  Single Machine (99.9% uptime):                               |
|  ┌────────────────────────────────────────┐                   |
|  │ Downtime: ~8.7 hours/year             │                   |
|  │ Reliability: Good for single server    │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
|  1000-Node Cluster (each 99.9% uptime):                       |
|  ┌────────────────────────────────────────┐                   |
|  │ At any moment:                         │                   |
|  │ • 1 node down on average               │                   |
|  │ • Multiple failures per day likely     │                   |
|  │ • Correlated failures possible         │                   |
|  │                                        │                   |
|  │ Without fault tolerance:               │                   |
|  │ ✗ Jobs fail constantly                 │                   |
|  │ ✗ Data loss is inevitable              │                   |
|  │ ✗ System becomes unusable              │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
|  With Hadoop Fault Tolerance:                                 |
|  ┌────────────────────────────────────────┐                   |
|  │ ✓ Jobs complete despite failures       │                   |
|  │ ✓ Data is never lost                   │                   |
|  │ ✓ System remains operational           │                   |
|  │ ✓ Transparent recovery                 │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
+---------------------------------------------------------------+

Types of Failures Hadoop Handles

Disk Failures:
  • Individual disk corruption or crash
  • RAID controller failures
  • Storage network issues
  • Silent data corruption
Node Failures:
  • Complete server crash
  • Power supply failure
  • Memory errors (ECC failures)
  • CPU/motherboard failure
  • Network interface card failure
Network Failures:
  • Switch failures
  • Cable disconnections
  • Network partition (split-brain)
  • Bandwidth congestion
  • DNS resolution failures
Process Crashes:
  • Out of memory errors
  • Segmentation faults
  • Uncaught exceptions
  • Resource exhaustion
Bugs:
  • Data processing errors
  • Infinite loops
  • Deadlocks
  • Race conditions
Configuration Errors:
  • Incorrect settings
  • Version mismatches
  • Permission issues
Slow Nodes (Stragglers):
  • Disk degradation
  • CPU thermal throttling
  • Memory pressure
  • Network congestion
Resource Contention:
  • Competing workloads
  • Background processes
  • I/O bottlenecks

HDFS Fault Tolerance Mechanisms

Data Replication Architecture

+---------------------------------------------------------------+
|                 HDFS REPLICATION STRATEGY                     |
+---------------------------------------------------------------+
|                                                               |
|  Block Placement Policy (Rack-Aware):                         |
|                                                               |
|    Rack 1              Rack 2              Rack 3             |
|  ┌─────────┐        ┌─────────┐        ┌─────────┐           |
|  │ Node A  │        │ Node C  │        │ Node E  │           |
|  │  Blk 1  │◄───────│  Blk 1  │        │         │           |
|  │ [Replica│   Copy │ [Replica│        │         │           |
|  │    1]   │        │    2]   │        │         │           |
|  └─────────┘        └─────────┘        └─────────┘           |
|  ┌─────────┐        ┌─────────┐        ┌─────────┐           |
|  │ Node B  │        │ Node D  │        │ Node F  │           |
|  │  Blk 1  │        │         │        │         │           |
|  │ [Replica│        │         │        │         │           |
|  │    3]   │        │         │        │         │           |
|  └─────────┘        └─────────┘        └─────────┘           |
|      ▲                                                        |
|      │                                                        |
|      └──────────────────────────────────┐                     |
|                                         │                     |
|  Placement Strategy:                    │                     |
|  1. First replica: Same node as writer (or random)            |
|  2. Second replica: Different rack                            |
|  3. Third replica: Same rack as #2, different node            |
|                                                               |
|  Benefits:                                                    |
|  • Survives rack failure (lose 1 replica)                     |
|  • Survives node failures (lose 1-2 replicas)                 |
|  • Fast local reads when possible                             |
|  • Balanced network usage                                     |
|                                                               |
+---------------------------------------------------------------+

Deep Dive: Rack Awareness vs. DynamoDB AZ-Awareness

Hadoop’s rack-aware placement policy was not the first system to distribute replicas across failure domains, but it was one of the first to make the topology explicitly configurable by operators. The concept maps directly to the “failure domain” abstraction used across distributed systems. AWS Availability Zones, Azure Fault Domains, and GCP Zones all implement the same fundamental idea: ensure that a single blast radius event (power failure, network partition, cooling system outage) cannot destroy all copies of your data. Understanding this pattern is essential because interviewers will expect you to draw connections between Hadoop’s rack awareness and how modern cloud-native databases like DynamoDB, Cosmos DB, and Cloud Spanner handle the same problem automatically. Hadoop’s Rack Awareness and DynamoDB’s AZ-Awareness share the same goal: ensuring that a single infrastructure failure (like a top-of-rack switch or a data center power outage) does not result in data loss.

1. The Strategy Comparison

FeatureHadoop Rack AwarenessDynamoDB AZ-Awareness
Fault DomainPhysical Server Rack (Power/Network)AWS Availability Zone (Data Center)
Data Placement1st: Local, 2nd: Remote Rack, 3rd: Same as 2ndAutomatically spread across 3 AZs
Distance MetricNetwork hops (Topology script)Logical AZ IDs
ConsistencyEventual (Replication Pipeline)Strong or Eventual (Paxos Quorum)

2. Implementation Differences

  • Hadoop: Relies on a Topology Script (typically /etc/hadoop/conf/topology.sh) that maps IP addresses to rack IDs (e.g., /dc1/rack1). The NameNode uses this map to calculate the “distance” between nodes.
  • DynamoDB: Managed by AWS. The partitioning logic automatically ensures that replicas for a single partition are never placed in the same AZ. This is transparent to the user, unlike Hadoop where you must manually configure the topology.

3. Locality vs. Durability

  • Hadoop Locality: Prioritizes “Local Reads” (Reading from the same node). If the local node is down, it falls back to the same rack.
  • DynamoDB Locality: Prioritizes “AZ Locality” for reads. While you can’t request a “Node Local” read, DynamoDB routes requests to the closest healthy replica to minimize latency.

Replication Factor Configuration

// HDFS replication configuration examples

// 1. Global default (hdfs-site.xml)
<property>
  <name>dfs.replication</name>
  <value>3</value>
  <description>Default block replication factor</description>
</property>

// 2. Per-file replication at write time
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);

// Set replication for specific file
Path filePath = new Path("/data/important.txt");
FSDataOutputStream out = fs.create(
    filePath,
    true,        // overwrite
    4096,        // buffer size
    (short) 5,   // replication factor = 5 (critical data)
    134217728    // block size (128MB)
);

// 3. Change replication for existing file
fs.setReplication(new Path("/data/logs.txt"), (short) 2);

// 4. Directory-level replication policy
// All files in this directory get replication=2
fs.setReplication(new Path("/data/temp/"), (short) 2);

// 5. Programmatic replication management
public class ReplicationManager {

    public void adjustReplicationByImportance(
            FileSystem fs,
            Path path,
            String importance) throws IOException {

        short replication;
        switch (importance) {
            case "CRITICAL":
                replication = 5;  // High durability
                break;
            case "IMPORTANT":
                replication = 3;  // Standard
                break;
            case "TEMPORARY":
                replication = 2;  // Reduce storage
                break;
            case "CACHE":
                replication = 1;  // Minimal (risky)
                break;
            default:
                replication = 3;
        }

        fs.setReplication(path, replication);
    }

    // Monitor and maintain replication levels
    public void ensureReplication(FileSystem fs, Path path)
            throws IOException {

        FileStatus status = fs.getFileStatus(path);
        BlockLocation[] blocks = fs.getFileBlockLocations(
            status, 0, status.getLen()
        );

        for (BlockLocation block : blocks) {
            int currentReplicas = block.getHosts().length;
            int targetReplicas = status.getReplication();

            if (currentReplicas < targetReplicas) {
                System.err.println(
                    "Under-replicated block: " +
                    block.getNames()[0] +
                    " (has " + currentReplicas +
                    ", needs " + targetReplicas + ")"
                );
            }
        }
    }
}

Deep Dive: Erasure Coding (Reed-Solomon) in Hadoop 3.x

Erasure coding is not a Hadoop invention — it has been used in RAID systems since the 1980s (RAID-5 and RAID-6 are special cases of erasure coding) and in telecommunications for decades before that. The Reed-Solomon algorithm itself dates back to a 1960 paper by Irving Reed and Gustave Solomon. What Hadoop 3.x did was bring erasure coding to the distributed file system layer, applying it across nodes and racks rather than across disks within a single machine. This was a significant engineering achievement because distributed erasure coding introduces network latency during reconstruction — a trade-off that does not exist in local RAID. Facebook (now Meta) was one of the earliest large-scale adopters of HDFS erasure coding through their HDFS-RAID project (circa 2010), which predated the official Hadoop 3.x implementation and saved them petabytes of storage. The same principle now powers cloud storage backends: Azure Storage uses erasure coding (Local Reconstruction Codes) internally, and Amazon S3’s eleven-nines durability guarantee relies on similar Reed-Solomon-based schemes spread across Availability Zones. One of the biggest limitations of standard 3-way replication is the 200% storage overhead. For every 1TB of data, you need 3TB of physical disk space. Hadoop 3.x introduced Erasure Coding (EC) to solve this, providing the same level of fault tolerance with significantly less storage.

1. How Erasure Coding Works

Unlike replication, which copies the entire block, EC uses mathematical formulas (typically Reed-Solomon) to generate “parity” data.
  • Data Cells (d): The original data blocks.
  • Parity Cells (p): Calculated from the data cells.
  • Policy (d+p): A common policy is RS-6-3 (6 data blocks, 3 parity blocks).
If you lose any 3 out of the 9 total blocks, you can mathematically reconstruct the missing data using the remaining 6.
REPLICATION (3x) vs. ERASURE CODING (RS-6-3)
─────────────────────────────────────────────

REPLICATION:
[Block A] [Block A] [Block A]
Total: 3x Storage

ERASURE CODING (RS-6-3):
[d1] [d2] [d3] [d4] [d5] [d6] | [p1] [p2] [p3]
Total: 1.5x Storage (50% overhead)

2. Trade-offs: Durability vs. Performance

Feature3x ReplicationErasure Coding (RS-6-3)
Storage Overhead200%50%
Fault ToleranceCan lose 2 nodesCan lose 3 nodes
Write PerformanceHigh (Sequential I/O)Lower (Requires parity calculation)
Read PerformanceHigh (Local reads)Lower (Requires network for reconstruction)
CPU UsageLowHigh (XOR and Galois Field math)
Best ForHot data (frequent access)Cold data (archives)

3. Striped vs. Contiguous Layout

HDFS Erasure Coding uses a Striped Layout. Instead of coding across entire large blocks (128MB), it codes across small “cells” (typically 64KB or 1MB) within a block group. This allows for parallel I/O and reduces the “wait time” for reconstruction.

The Reed-Solomon (RS-6-3) Mathematical Proof

To understand why RS-6-3 can lose 3 blocks, we look at the Vandermonde matrix used in Galois Fields (GF(28)GF(2^8)):
  1. Data Vector (DD): [d1,d2,d3,d4,d5,d6]T[d_1, d_2, d_3, d_4, d_5, d_6]^T
  2. Coding Matrix (AA): A 9×69 \times 6 matrix where the top 6×66 \times 6 is an Identity Matrix (II) and the bottom 3×63 \times 6 is the Generator Matrix (GG).
  3. Resulting Code (CC): A×D=[d1,d2,d3,d4,d5,d6,p1,p2,p3]TA \times D = [d_1, d_2, d_3, d_4, d_5, d_6, p_1, p_2, p_3]^T
Reconstruction: If blocks d1,d4,d_1, d_4, and p2p_2 are lost, the NameNode identifies the remaining 6 blocks. It creates a new 6×66 \times 6 matrix AA' by deleting the rows corresponding to the lost indices from AA. Since AA is a Vandermonde matrix, any 6×66 \times 6 sub-matrix is guaranteed to be invertible. D=(A)1×CD = (A')^{-1} \times C' Where CC' is the vector of the 6 surviving blocks. The missing data is recovered by a single matrix-vector multiplication.

Block Recovery Process

+---------------------------------------------------------------+
|              HDFS BLOCK RECOVERY WORKFLOW                     |
+---------------------------------------------------------------+
|                                                               |
|  Step 1: Failure Detection                                    |
|  ┌────────────────────────────────────────┐                   |
|  │ NameNode monitors DataNodes via        │                   |
|  │ heartbeats (every 3 seconds)           │                   |
|  │                                        │                   |
|  │ DataNode Timeout:                      │                   |
|  │ • Heartbeat interval: 3s               │                   |
|  │ • Timeout threshold: 10 min            │                   |
|  │ • Recheck interval: 5 min              │                   |
|  │                                        │                   |
|  │ If no heartbeat → Node marked DEAD     │                   |
|  └────────────────────────────────────────┘                   |
|           ↓                                                   |
|  Step 2: Identify Under-Replicated Blocks                     |
|  ┌────────────────────────────────────────┐                   |
|  │ NameNode scans block metadata:         │                   |
|  │                                        │                   |
|  │ Block B1:                              │                   |
|  │   Expected replicas: 3                 │                   |
|  │   Current replicas: 2 (Node3 died)     │                   |
|  │   Status: UNDER-REPLICATED             │                   |
|  │                                        │                   |
|  │ Priority Queue:                        │                   |
|  │   1. Blocks with 0 replicas (HIGHEST)  │                   |
|  │   2. Blocks with 1 replica             │                   |
|  │   3. Blocks below target               │                   |
|  └────────────────────────────────────────┘                   |
|           ↓                                                   |
|  Step 3: Select Replication Source & Target                   |
|  ┌────────────────────────────────────────┐                   |
|  │ Source Selection:                      │                   |
|  │ • Choose healthy DataNode with copy    │                   |
|  │ • Prefer nodes with low load           │                   |
|  │ • Consider network topology            │                   |
|  │                                        │                   |
|  │ Target Selection:                      │                   |
|  │ • Choose node with free space          │                   |
|  │ • Follow rack-awareness policy         │                   |
|  │ • Balance cluster utilization          │                   |
|  └────────────────────────────────────────┘                   |
|           ↓                                                   |
|  Step 4: Copy Block                                           |
|  ┌────────────────────────────────────────┐                   |
|  │   Source DN          →        Target DN│                   |
|  │  ┌──────────┐              ┌──────────┐│                   |
|  │  │ Block B1 │─────Copy────→│ Block B1 ││                   |
|  │  │ (Replica)│              │  (New)   ││                   |
|  │  └──────────┘              └──────────┘│                   |
|  │                                        │                   |
|  │ • Stream block data                    │                   |
|  │ • Verify checksum                      │                   |
|  │ • Report to NameNode                   │                   |
|  └────────────────────────────────────────┘                   |
|           ↓                                                   |
|  Step 5: Update Metadata                                      |
|  ┌────────────────────────────────────────┐                   |
|  │ NameNode updates:                      │                   |
|  │                                        │                   |
|  │ Block B1:                              │                   |
|  │   Replicas: 3 ✓                        │                   |
|  │   Locations: [Node1, Node2, Node5]     │                   |
|  │   Status: HEALTHY                      │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
+---------------------------------------------------------------+

4. The Block Lifecycle State Machine

The NameNode maintains a state machine for every block in the system. Understanding these transitions is key to debugging “missing” or “corrupt” data.
  • UNDER_REPLICATED: Replicas < configured replication factor (RFRF).
  • PENDING: The NameNode has commanded a DataNode to copy the block, but the DataNode hasn’t reported back yet.
  • HEALTHY: Replicas = RFRF.
  • CORRUPT: A client or DataNode background scanner reported a checksum mismatch. The NameNode will NOT use this block for replication; it will only use healthy replicas to fix it.


## Deep Dive: The Replication Pipeline

The replication pipeline is one of Hadoop's most elegant design decisions, and it directly mirrors the chain replication approach described in the 2004 USENIX paper "Chain Replication for Supporting High Throughput and Availability" by van Renesse and Schneider. The key insight is that having the client send data to all replicas in parallel would saturate the client's outbound network link (it would need to send 3x the data). By chaining the writes -- client sends to node 1, node 1 forwards to node 2, node 2 forwards to node 3 -- the client only sends the data once, and the replication cost is distributed across the network. This same pattern appears in modern systems: Apache Kafka uses a leader-follower replication model (though followers pull rather than having the leader push), and CockroachDB's Raft-based replication chains writes through the leader to followers.

When a client writes a block, it doesn't send data to all three DataNodes simultaneously. Instead, it uses a **Pipeline** to minimize network bottlenecks.

### 1. Chain Replication
The client sends the data in 64KB packets to the first DataNode in the pipeline. The first DataNode receives the packet, writes it to its local disk, and immediately forwards it to the second DataNode. This continues until the last DataNode in the chain.

- **Pipeline Ordering**: The NameNode orders the pipeline to optimize for distance (Local -> Same Rack -> Remote Rack).
- **Acknowledge Pipeline**: ACKs flow in the reverse direction. The client only considers the packet "done" once it receives an ACK from the first DataNode (meaning all nodes have the packet).

### 2. Handling Pipeline Failures
If a DataNode fails during the write:
1.  **Close Pipeline**: The client stops sending data and closes the current pipeline.
2.  **Report Failure**: The client notifies the NameNode of the failed node.
3.  **Bypass/Resume**: The client removes the bad node from the list and opens a new pipeline with the remaining healthy nodes.
4.  **Re-replicate**: After the file is closed, the NameNode notices the replication factor is below target (e.g., 2 instead of 3) and triggers a background re-replication to a new node.

## Deep Dive: Heartbeats, Block Reports, and Scaling the NameNode

The heartbeat mechanism is the simplest form of failure detection in distributed systems, and it traces back to foundational work on group membership protocols. The trade-off is straightforward: frequent heartbeats detect failures faster but consume more network bandwidth and CPU. Hadoop's 3-second heartbeat interval with a 10-minute timeout represents a deliberate choice to avoid false positives (marking a healthy but temporarily slow node as dead) at the cost of slower failure detection. Compare this with Kubernetes, which uses a similar approach -- the kubelet sends heartbeats to the API server, and nodes are marked NotReady after a configurable timeout (default 40 seconds). The difference in timeout values reflects the different failure recovery costs: in Kubernetes, restarting a pod is fast and cheap, so aggressive timeouts are acceptable. In Hadoop, marking a DataNode dead triggers massive block re-replication, so conservative timeouts prevent unnecessary data movement.

The NameNode's primary job is keeping the **Block Map** in sync with reality. It relies on three communication channels from every DataNode:

### 1. Heartbeats (Liveness)
- **Frequency**: Every 3 seconds (`dfs.heartbeat.interval`).
- **Content**: Node status (Disk space, CPU, Load).
- **Failure Detection**: If no heartbeat is received for 10 minutes, the NameNode marks the DataNode as "Dead".

### 2. Full Block Reports (Initial & Periodic)
- **Frequency**: Every 6 hours (`dfs.blockreport.intervalMsec`).
- **Content**: A complete list of ALL blocks stored on that DataNode.
- **Purpose**: This is the source of truth for the NameNode. On startup, the NameNode enters **Safe Mode** until a threshold percentage of blocks are reported by DataNodes.

### 3. Incremental Block Reports (Dynamic)
- **Trigger**: Whenever a block is received, deleted, or moved.
- **Content**: Only the changes since the last report.
- **Why it matters**: Full block reports are massive and can crash a NameNode if thousands of nodes report at once (the "Block Report Storm"). Incremental reports allow the NameNode to stay updated with minimal CPU overhead.

### NameNode High Availability

The NameNode was Hadoop's most glaring single point of failure for years. Before Hadoop 2.0 (released in 2013), a NameNode crash meant the entire cluster was unusable until an operator manually restarted it and waited for it to replay the edit log and receive block reports from all DataNodes -- a process that could take 30 minutes or more on large clusters. This was Hadoop's equivalent of a "brain death" scenario. The Quorum Journal Manager (QJM) solution, contributed primarily by engineers at Cloudera, drew on the same quorum-based consensus principles that underpin Paxos and Raft. The epoch-based fencing mechanism is particularly important because it solves the "zombie leader" problem -- a challenge that every leader-election system must address. In modern cloud deployments, managed Hadoop services (Amazon EMR, Google Dataproc, Azure HDInsight) abstract this complexity away, but understanding the QJM architecture remains essential because the same patterns appear in every highly available stateful system, from etcd to CockroachDB to Apache BookKeeper.

```text
+---------------------------------------------------------------+
|           NAMENODE HA ARCHITECTURE (QJM)                      |
+---------------------------------------------------------------+
|                                                               |
|  Active NameNode              Standby NameNode                |
|  ┌─────────────────┐          ┌─────────────────┐            |
|  │  • Serves reads │          │  • Warm standby │            |
|  │  • Handles writes│         │  • Applies edits│            |
|  │  • Updates edits│          │  • Ready to take│            |
|  │  • Sends to QJM │          │    over         │            |
|  └────────┬────────┘          └────────┬────────┘            |
|           │                            │                     |
|           │  Write                Read │                     |
|           │  Edits                Edits│                     |
|           ▼                            ▼                     |
|  ┌──────────────────────────────────────────┐                |
|  │      Quorum Journal Manager (QJM)        │                |
|  │  ┌──────┐    ┌──────┐    ┌──────┐       │                |
|  │  │ JN 1 │    │ JN 2 │    │ JN 3 │       │                |
|  │  │      │    │      │    │      │       │                |
|  │  │ Edit │    │ Edit │    │ Edit │       │                |
|  │  │ Log  │    │ Log  │    │ Log  │       │                |
|  │  └──────┘    └──────┘    └──────┘       │                |
|  │                                          │                |
|  │  Quorum: Majority must acknowledge       │                |
|  │  (2 out of 3 in this example)            │                |
|  └──────────────────────────────────────────┘                |
|           │                            │                     |
|           ▼                            ▼                     |
|  ┌─────────────────┐          ┌─────────────────┐            |
|  │  ZooKeeper      │          │  ZooKeeper      │            |
|  │  Failover       │◄────────►│  Failover       │            |
|  │  Controller     │  Monitor │  Controller     │            |
|  │  (ZKFC)         │          │  (ZKFC)         │            |
|  └─────────────────┘          └─────────────────┘            |
|           │                            │                     |
|           └──────────┬─────────────────┘                     |
|                      ▼                                       |
|           ┌──────────────────────┐                           |
|           │  ZooKeeper Ensemble  │                           |
|           │  ┌────┐ ┌────┐ ┌────┐│                           |
|           │  │ ZK │ │ ZK │ │ ZK ││                           |
|           │  │ 1  │ │ 2  │ │ 3  ││                           |
|           │  └────┘ └────┘ └────┘│                           |
|           │  (Leader Election)   │                           |
|           └──────────────────────┘                           |
|                                                               |
+---------------------------------------------------------------+

Failover Process:
1. Active NN fails
2. ZKFC detects failure (via health checks)
3. ZKFC releases ZooKeeper lock
4. Standby ZKFC acquires lock
5. Standby promotes itself to Active
6. Reads latest edits from QJM
7. Begins serving requests
8. Total failover time: ~seconds
9. **Epoch Fencing**: To prevent a "Zombie" Active NameNode (one that is slow but not dead) from corrupting the edits, QJM uses **Epoch Numbers**.
    - When a NameNode is promoted, it is assigned a unique, monotonically increasing **Epoch Number** (e.g., Epoch 5).
    - It sends this number to all JournalNodes (JNs).
    - The JNs will reject any write requests from NameNodes with an Epoch < 5.
    - If the old Active NN (Epoch 4) tries to write, it receives an "Unauthorized" error and immediately shuts itself down (fencing).

NameNode HA Configuration

<!-- hdfs-site.xml for NameNode HA -->

<!-- HA Configuration -->
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
  <description>Logical name for the cluster</description>
</property>

<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
  <description>Unique identifiers for each NameNode</description>
</property>

<!-- NameNode RPC addresses -->
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>namenode1.example.com:8020</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>namenode2.example.com:8020</value>
</property>

<!-- NameNode HTTP addresses -->
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>namenode1.example.com:50070</value>
</property>

<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>namenode2.example.com:50070</value>
</property>

<!-- Journal Node configuration -->
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://jn1.example.com:8485;jn2.example.com:8485;jn3.example.com:8485/mycluster</value>
  <description>URIs of Journal Nodes</description>
</property>

<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/data/hadoop/journal</value>
  <description>Local path for Journal Node edits</description>
</property>

<!-- Client failover configuration -->
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

<!-- Automatic failover -->
<property>
  <name>dfs.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>

<!-- Fencing configuration (prevents split-brain) -->
<property>
  <name>dfs.ha.fencing.methods</name>
  <value>
    sshfence
    shell(/bin/true)
  </value>
</property>

<property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/hadoop/.ssh/id_rsa</value>
</property>

<!-- ZooKeeper configuration -->
<property>
  <name>ha.zookeeper.quorum</name>
  <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
</property>

Checksum Verification

// HDFS checksum mechanisms

public class ChecksumVerification {

    /**
     * HDFS Checksum Architecture:
     * - Every 512 bytes of data → 4 bytes CRC32C checksum
     * - Checksums stored separately from data
     * - Verified on read and write
     */

    // 1. Write with checksum verification
    public void writeWithChecksumVerification() throws IOException {
        Configuration conf = new Configuration();

        // Enable checksum verification (enabled by default)
        conf.setBoolean("dfs.client.read.checksum.verify", true);

        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/data/verified-file.txt");

        // Write data - checksums automatically created
        FSDataOutputStream out = fs.create(path);

        byte[] data = "Important data".getBytes();
        out.write(data);

        // Before close, checksums are computed and stored
        out.close();

        System.out.println("File written with checksums");
    }

    // 2. Read with checksum verification
    public void readWithChecksumVerification() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/data/verified-file.txt");

        FSDataInputStream in = fs.open(path);

        try {
            byte[] buffer = new byte[1024];
            int bytesRead;

            // Each read verifies checksums
            while ((bytesRead = in.read(buffer)) != -1) {
                // If checksum fails, IOException thrown
                processData(buffer, bytesRead);
            }

            System.out.println("File read successfully, all checksums valid");

        } catch (ChecksumException e) {
            // Checksum mismatch detected
            System.err.println("Checksum verification failed: " + e.getMessage());

            // HDFS will automatically try another replica
            // and report corrupted block to NameNode
        } finally {
            in.close();
        }
    }

    // 3. Disable checksum for performance (risky)
    public void readWithoutChecksum() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("dfs.client.read.checksum.verify", false);

        FileSystem fs = FileSystem.get(conf);

        // Raw read without checksum verification
        // Faster but no corruption detection
        FSDataInputStream in = fs.open(new Path("/data/file.txt"));
        // ... read data ...
        in.close();
    }

    // 4. Manual checksum verification
    public void manualChecksumCheck() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path dataPath = new Path("/data/file.txt");

        // Get checksum file
        FileChecksum checksum = fs.getFileChecksum(dataPath);

        System.out.println("Algorithm: " + checksum.getAlgorithmName());
        System.out.println("Length: " + checksum.getLength());
        System.out.println("Bytes: " +
            java.util.Arrays.toString(checksum.getBytes()));
    }

    // 5. Handle corrupted blocks
    public void handleCorruption() throws IOException {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path path = new Path("/data/possibly-corrupt.txt");

        // Set number of retries for checksum failures
        conf.setInt("dfs.client.read.checksum.retries", 3);

        FSDataInputStream in = fs.open(path);

        try {
            byte[] buffer = new byte[4096];
            in.read(buffer);

        } catch (ChecksumException e) {
            System.err.println("Corruption detected: " + e.getPos());

            // HDFS automatically:
            // 1. Tries other replicas
            // 2. Reports bad block to NameNode
            // 3. Schedules re-replication from good replica
            // 4. Marks corrupted replica for deletion
        }
    }

    private void processData(byte[] buffer, int length) {
        // Process the data
    }
}

MapReduce Fault Tolerance

MapReduce’s fault tolerance model is fundamentally different from HDFS’s because compute failures are handled at the task level, not the data level. The original Google MapReduce paper (2004) reported that a typical MapReduce job at Google would experience multiple worker failures during execution, and the framework was designed to transparently re-execute failed tasks without any user intervention. This “re-execution” approach works because MapReduce tasks are stateless transformations of immutable input data — if a task fails, you simply run it again on the same input split. This is a much simpler model than the checkpoint-and-recover approach used by systems like MPI (Message Passing Interface), where a single node failure often requires restarting the entire computation from the last global checkpoint. The trade-off is that MapReduce sacrifices the ability to handle fine-grained, long-running stateful computations — a limitation that later frameworks like Apache Flink and Spark Streaming addressed with their own checkpointing mechanisms.

Task Failure Handling

+---------------------------------------------------------------+
|           MAPREDUCE TASK FAILURE RECOVERY                     |
+---------------------------------------------------------------+
|                                                               |
|  Scenario 1: Map Task Failure                                |
|  ┌────────────────────────────────────────┐                   |
|  │  Node 1: Running Map Task M1           │                   |
|  │  ┌──────────┐                          │                   |
|  │  │  Map M1  │ ─ CRASH! ─► X            │                   |
|  │  │ Progress │                          │                   |
|  │  │   47%    │                          │                   |
|  │  └──────────┘                          │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  ApplicationMaster Detects Failure     │                   |
|  │  • No heartbeat from task              │                   |
|  │  • Task timeout (10 min default)       │                   |
|  │  • Mark task as FAILED                 │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  Retry Decision:                       │                   |
|  │                                        │                   |
|  │  Attempt 1: FAILED                     │                   |
|  │  Max Attempts: 4 (configurable)        │                   |
|  │  Decision: RETRY on different node     │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  Node 2: Retry Map Task M1             │                   |
|  │  ┌──────────┐                          │                   |
|  │  │  Map M1  │ ─ SUCCESS ─► ✓           │                   |
|  │  │ (Attempt │                          │                   |
|  │  │    2)    │                          │                   |
|  │  └──────────┘                          │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
|  Scenario 2: Reduce Task Failure                             |
|  ┌────────────────────────────────────────┐                   |
|  │  Reduce Task R1 fails at 85%           │                   |
|  │                                        │                   |
|  │  Recovery:                             │                   |
|  │  1. Fetch phase restarts (get map     │                   |
|  │     outputs again)                     │                   |
|  │  2. Sort phase restarts                │                   |
|  │  3. Reduce phase restarts from 0%      │                   |
|  │                                        │                   |
|  │  Note: Must re-fetch all map outputs   │                   |
|  │  (Map outputs may be on failed nodes)  │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
+---------------------------------------------------------------+

ApplicationMaster Failure Recovery

+---------------------------------------------------------------+
|        APPLICATIONMASTER FAILURE & RECOVERY                   |
+---------------------------------------------------------------+
|                                                               |
|  Normal Operation:                                            |
|  ┌──────────────────────────────────────────────────┐         |
|  │           ApplicationMaster (AM)                 │         |
|  │  ┌────────────────────────────────────┐          │         |
|  │  │ Job State:                         │          │         |
|  │  │ • Task tracking                    │          │         |
|  │  │ • Progress monitoring              │          │         |
|  │  │ • Resource negotiation             │          │         |
|  │  │ • Intermediate data locations      │          │         |
|  │  └────────────────────────────────────┘          │         |
|  │              │                                   │         |
|  │              ▼ Checkpoint to HDFS               │         |
|  │  ┌────────────────────────────────────┐          │         |
|  │  │ /tmp/hadoop/job_12345/state.dat    │          │         |
|  │  └────────────────────────────────────┘          │         |
|  └──────────────────────────────────────────────────┘         |
|                      │                                        |
|                      │ CRASH!                                 |
|                      ▼                                        |
|  ┌──────────────────────────────────────────────────┐         |
|  │  ResourceManager Detects AM Failure              │         |
|  │  • No heartbeat received                         │         |
|  │  • Timeout threshold exceeded                    │         |
|  └──────────────────────────────────────────────────┘         |
|                      │                                        |
|                      ▼                                        |
|  ┌──────────────────────────────────────────────────┐         |
|  │  Recovery Process:                               │         |
|  │                                                  │         |
|  │  1. Start new AM on different node               │         |
|  │                                                  │         |
|  │  2. New AM reads checkpoint from HDFS:           │         |
|  │     • Completed tasks → Don't re-run             │         |
|  │     • Running tasks → Re-schedule                │         |
|  │     • Failed tasks → Retry                       │         |
|  │     • Pending tasks → Schedule                   │         |
|  │                                                  │         |
|  │  3. New AM reconstructs job state:               │         |
|  │     • Contact running tasks                      │         |
|  │     • Verify task status                         │         |
|  │     • Re-fetch map output locations              │         |
|  │                                                  │         |
|  │  4. Continue job execution                       │         |
|  │                                                  │         |
|  │  Max AM failures: 2 (configurable)               │         |
|  │  If exceeded → Job FAILED                        │         |
|  └──────────────────────────────────────────────────┘         |
|                                                               |
+---------------------------------------------------------------+

Speculative Execution

Speculative execution is Hadoop’s answer to the “straggler problem” — the observation that in a large cluster, a small number of tasks will run significantly slower than the rest due to hardware degradation, resource contention, or other unpredictable factors. Google’s original MapReduce paper identified stragglers as one of the most significant sources of job latency: a single slow task can hold up an entire job that is otherwise 99% complete. The idea of launching redundant work to hedge against slow responses is not unique to MapReduce. DNS resolution uses a similar technique (sending queries to multiple resolvers), and modern microservice architectures use “hedged requests” (sending the same RPC to two backends and taking the first response). Dean and Barroso’s influential 2013 paper “The Tail at Scale” formalized this pattern and showed that issuing redundant requests is often the most cost-effective way to reduce tail latency at scale. In practice, speculative execution in Hadoop trades a modest increase in cluster resource usage (typically 2-5%) for a significant reduction in job completion time.
+---------------------------------------------------------------+
|              SPECULATIVE EXECUTION MECHANISM                  |
+---------------------------------------------------------------+
|                                                               |
|  Problem: Slow Tasks (Stragglers)                             |
|  ┌────────────────────────────────────────┐                   |
|  │  10 Map Tasks Running:                 │                   |
|  │                                        │                   |
|  │  Tasks 1-9:  [████████████] 100% ✓    │                   |
|  │  Task 10:    [███░░░░░░░░░]  25%      │                   |
|  │              (Slow disk, high load)    │                   |
|  │                                        │                   |
|  │  Without speculation:                  │                   |
|  │  • Job waits for Task 10               │                   |
|  │  • Entire job delayed                  │                   |
|  │  • Resources underutilized             │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  Detection: AM monitors progress       │                   |
|  │                                        │                   |
|  │  Average task time: 2 minutes          │                   |
|  │  Task 10 time: 5 minutes (still 25%)   │                   |
|  │                                        │                   |
|  │  Heuristic:                            │                   |
|  │  • Task significantly slower than avg  │                   |
|  │  • Not due to different input size     │                   |
|  │  • Resources available for duplicate   │                   |
|  │                                        │                   |
|  │  Decision: Launch speculative copy     │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  Speculative Execution:                │                   |
|  │                                        │                   |
|  │  Node A:  Task 10    [███░░░░░░░░░]   │                   |
|  │           (Original)      25%          │                   |
|  │                                        │                   |
|  │  Node B:  Task 10    [░░░░░░░░░░░░]   │                   |
|  │           (Backup)        0%           │                   |
|  │                                        │                   |
|  │  Both tasks process same input         │                   |
|  │  First to complete wins                │                   |
|  │  Slower task is killed                 │                   |
|  └────────────────────────────────────────┘                   |
|           │                                                   |
|           ▼                                                   |
|  ┌────────────────────────────────────────┐                   |
|  │  Outcome:                              │                   |
|  │                                        │                   |
|  │  Node B completes first: ✓             │                   |
|  │  Node A task killed                    │                   |
|  │                                        │                   |
|  │  Result: Job completes faster          │                   |
|  │  Cost: Extra resource usage            │                   |
|  │  (Usually worth it for job completion) │                   |
|  └────────────────────────────────────────┘                   |
|                                                               |
+---------------------------------------------------------------+

MapReduce Fault Tolerance Configuration

// MapReduce fault tolerance settings

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.MRJobConfig;

public class FaultToleranceConfig {

    public static Job configureFaultTolerance() throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Fault Tolerant Job");

        // ============================================
        // TASK FAILURE HANDLING
        // ============================================

        // Max attempts for map tasks (default: 4)
        conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 4);

        // Max attempts for reduce tasks (default: 4)
        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 4);

        // Max % of failed maps before job fails (default: 0)
        // 0 means any map failure causes job failure
        // Set to 5 = allow 5% of maps to fail
        conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 0);

        // Max % of failed reduces before job fails
        conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 0);

        // Max failed task attempts before blacklisting node
        conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);

        // ============================================
        // TIMEOUT CONFIGURATION
        // ============================================

        // Task timeout (milliseconds)
        // Kill task if no progress for this duration
        // Default: 600000 (10 minutes)
        conf.setLong(MRJobConfig.TASK_TIMEOUT, 600000);

        // Set to 0 to disable timeout (risky)
        // conf.setLong(MRJobConfig.TASK_TIMEOUT, 0);

        // For long-running tasks, increase timeout
        conf.setLong(MRJobConfig.TASK_TIMEOUT, 1800000); // 30 min

        // ============================================
        // SPECULATIVE EXECUTION
        // ============================================

        // Enable speculative execution for maps
        conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);

        // Enable speculative execution for reduces
        conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, true);

        // Disable speculation for non-idempotent tasks
        // (tasks with side effects)
        // conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
        // conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

        // Speculation threshold (default: 1.0)
        // Launch backup if task is this many times slower than average
        conf.setDouble("mapreduce.job.speculative.slowtaskthreshold", 1.0);

        // ============================================
        // APPLICATION MASTER FAILURE
        // ============================================

        // Max AM attempts (default: 2)
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);

        // For critical jobs, allow more AM retries
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 4);

        // ============================================
        // DATA LOCALITY
        // ============================================

        // Node-local attempts before rack-local
        conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 5);

        // Rack-local attempts before off-rack
        // Helps ensure tasks run near data even with failures

        // ============================================
        // MEMORY SETTINGS (Prevent OOM failures)
        // ============================================

        // Map memory (MB)
        conf.setInt(MRJobConfig.MAP_MEMORY_MB, 2048);

        // Reduce memory (MB)
        conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 4096);

        // Java heap size (as fraction of container memory)
        conf.setFloat(MRJobConfig.MAP_JAVA_OPTS, 0.8f);
        conf.setFloat(MRJobConfig.REDUCE_JAVA_OPTS, 0.8f);

        // ============================================
        // INTERMEDIATE DATA PROTECTION
        // ============================================

        // Number of fetch failures before blacklisting map output
        conf.setInt(MRJobConfig.SHUFFLE_FETCH_FAILURES, 10);

        // Re-run map tasks if reduce cannot fetch output
        // (map node may have failed)

        // ============================================
        // JOB RECOVERY
        // ============================================

        // Enable job recovery (AM restart)
        conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);

        // Store recovery state in HDFS
        conf.set(MRJobConfig.MR_AM_STAGING_DIR,
                 "/tmp/hadoop-yarn/staging");

        return job;
    }

    /**
     * Example: Critical job with aggressive fault tolerance
     */
    public static Job criticalJobConfig() throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Critical Job");

        // Allow multiple retries
        conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 8);
        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 8);

        // Longer timeout for complex operations
        conf.setLong(MRJobConfig.TASK_TIMEOUT, 3600000); // 1 hour

        // More AM retry attempts
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 5);

        // Enable speculation
        conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
        conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, true);

        // Extra memory to prevent OOM
        conf.setInt(MRJobConfig.MAP_MEMORY_MB, 4096);
        conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 8192);

        return job;
    }

    /**
     * Example: Fast job with minimal overhead
     */
    public static Job fastJobConfig() throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Fast Job");

        // Fewer retries for fast failure
        conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
        conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);

        // Shorter timeout
        conf.setLong(MRJobConfig.TASK_TIMEOUT, 300000); // 5 min

        // Disable speculation (save resources)
        conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
        conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);

        return job;
    }
}

YARN Fault Tolerance

YARN’s fault tolerance represents a significant evolution over the Hadoop 1.x JobTracker model. In the original Hadoop architecture, the JobTracker was both the resource manager and the job coordinator — a single process responsible for scheduling tasks across the entire cluster and tracking the state of every running job. This monolithic design meant that a JobTracker failure killed every running job on the cluster. YARN’s split of responsibilities (ResourceManager for cluster-level resource allocation, ApplicationMaster for per-job coordination) is an application of the single-responsibility principle at the distributed systems level. Each component can fail independently, and each has its own recovery mechanism. This architecture mirrors the separation between Kubernetes’ control plane (kube-scheduler, kube-controller-manager) and per-pod lifecycle management. The work-preserving RM restart feature (added in Hadoop 2.6) was a particularly important milestone: it meant that a ResourceManager restart no longer killed running applications, because the RM could reconstruct its state from the running NodeManagers and ApplicationMasters.

NodeManager Failure Handling

+---------------------------------------------------------------+
|           YARN NODEMANAGER FAILURE HANDLING                   |
+---------------------------------------------------------------+
|                                                               |
|  Healthy Cluster:                                             |
|  ┌─────────────────────────────────────────┐                  |
|  │         ResourceManager (RM)            │                  |
|  │  ┌───────────────────────────┐          │                  |
|  │  │ Node Health Tracker       │          │                  |
|  │  │ • Monitors all NMs        │          │                  |
|  │  │ • Heartbeat every 1 sec   │          │                  |
|  │  └───────────────────────────┘          │                  |
|  └──────────┬──────────────────────────────┘                  |
|             │                                                 |
|        Heartbeats                                             |
|             │                                                 |
|    ┌────────┼────────┬───────────┐                            |
|    ▼        ▼        ▼           ▼                            |
|  ┌────┐  ┌────┐  ┌────┐      ┌────┐                          |
|  │ NM │  │ NM │  │ NM │  ... │ NM │                          |
|  │ 1  │  │ 2  │  │ 3  │      │ N  │                          |
|  └────┘  └────┘  └────┘      └────┘                          |
|    ✓       ✓       ✗           ✓                             |
|                    │                                          |
|                  CRASH!                                       |
|                    │                                          |
|                    ▼                                          |
|  ┌─────────────────────────────────────────┐                  |
|  │  RM Detection (NM3 failed):             │                  |
|  │                                         │                  |
|  │  1. No heartbeat for 10 minutes         │                  |
|  │     (dfs.heartbeat.interval × 10)       │                  |
|  │                                         │                  |
|  │  2. Mark NM3 as LOST                    │                  |
|  │                                         │                  |
|  │  3. Identify affected containers:       │                  |
|  │     • Container C1 (AM for Job1)        │                  |
|  │     • Container C2 (Map task)           │                  |
|  │     • Container C3 (Reduce task)        │                  |
|  └─────────────────────────────────────────┘                  |
|                    │                                          |
|                    ▼                                          |
|  ┌─────────────────────────────────────────┐                  |
|  │  Recovery Actions:                      │                  |
|  │                                         │                  |
|  │  For ApplicationMaster (C1):            │                  |
|  │  ├─ Restart AM on different node        │                  |
|  │  ├─ Restore state from checkpoint       │                  |
|  │  └─ Continue job execution              │                  |
|  │                                         │                  |
|  │  For Map Task (C2):                     │                  |
|  │  ├─ Mark task as FAILED                 │                  |
|  │  ├─ Re-schedule on healthy node         │                  |
|  │  └─ Restart from beginning              │                  |
|  │                                         │                  |
|  │  For Reduce Task (C3):                  │                  |
|  │  ├─ Mark task as FAILED                 │                  |
|  │  ├─ Re-schedule on healthy node         │                  |
|  │  ├─ Re-fetch map outputs                │                  |
|  │  └─ Restart reduce phase                │                  |
|  └─────────────────────────────────────────┘                  |
|                                                               |
+---------------------------------------------------------------+

ResourceManager High Availability

+---------------------------------------------------------------+
|          RESOURCEMANAGER HA ARCHITECTURE                      |
+---------------------------------------------------------------+
|                                                               |
|  Active RM                    Standby RM                      |
|  ┌─────────────────┐         ┌─────────────────┐             |
|  │  • Serves clients│         │  • Monitors     │             |
|  │  • Schedules     │         │  • Warm standby │             |
|  │  • Tracks apps   │         │  • Ready to take│             |
|  │  • Writes state  │         │    over         │             |
|  └────────┬────────┘         └────────┬────────┘             |
|           │                           │                      |
|           │   Write State    Read State                      |
|           │                           │                      |
|           ▼                           ▼                      |
|  ┌──────────────────────────────────────────┐                |
|  │      State Store (ZooKeeper)             │                |
|  │  ┌────────────────────────────────┐      │                |
|  │  │ Application State:             │      │                |
|  │  │ • Running applications         │      │                |
|  │  │ • Completed applications       │      │                |
|  │  │ • Node membership              │      │                |
|  │  │ • Security tokens              │      │                |
|  │  │ • Delegation tokens            │      │                |
|  │  └────────────────────────────────┘      │                |
|  └──────────────────────────────────────────┘                |
|           │                           │                      |
|           │                           │                      |
|           └───────────┬───────────────┘                      |
|                       ▼                                      |
|           ┌──────────────────────┐                           |
|           │  ZooKeeper Ensemble  │                           |
|           │  ┌────┐ ┌────┐ ┌────┐│                           |
|           │  │ ZK │ │ ZK │ │ ZK ││                           |
|           │  │ 1  │ │ 2  │ │ 3  ││                           |
|           │  └────┘ └────┘ └────┘│                           |
|           │  (Active RM election)│                           |
|           └──────────────────────┘                           |
|                                                               |
|  Failover Process:                                            |
|  1. Active RM fails                                           |
|  2. ZK detects missing heartbeat                              |
|  3. Standby RM promoted to Active                             |
|  4. New Active loads state from ZK                            |
|  5. Clients automatically redirect                            |
|  6. Jobs continue without interruption                        |
|                                                               |
|  Failover time: < 1 minute                                    |
|                                                               |
+---------------------------------------------------------------+

YARN HA Configuration

<!-- yarn-site.xml for ResourceManager HA -->

<!-- Enable RM HA -->
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>

<!-- RM cluster ID -->
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>yarn-cluster</value>
</property>

<!-- RM IDs -->
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>

<!-- RM1 addresses -->
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>resourcemanager1.example.com</value>
</property>

<property>
  <name>yarn.resourcemanager.address.rm1</name>
  <value>resourcemanager1.example.com:8032</value>
</property>

<property>
  <name>yarn.resourcemanager.scheduler.address.rm1</name>
  <value>resourcemanager1.example.com:8030</value>
</property>

<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>resourcemanager1.example.com:8088</value>
</property>

<!-- RM2 addresses -->
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>resourcemanager2.example.com</value>
</property>

<property>
  <name>yarn.resourcemanager.address.rm2</name>
  <value>resourcemanager2.example.com:8032</value>
</property>

<property>
  <name>yarn.resourcemanager.scheduler.address.rm2</name>
  <value>resourcemanager2.example.com:8030</value>
</property>

<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>resourcemanager2.example.com:8088</value>
</property>

<!-- ZooKeeper configuration -->
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>zk1.example.com:2181,zk2.example.com:2181,zk3.example.com:2181</value>
</property>

<property>
  <name>yarn.resourcemanager.zk-state-store.parent-path</name>
  <value>/rmstore</value>
</property>

<!-- State store configuration -->
<property>
  <name>yarn.resourcemanager.store.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>

<!-- Enable automatic failover -->
<property>
  <name>yarn.resourcemanager.ha.automatic-failover.enabled</name>
  <value>true</value>
</property>

<!-- Embedded leader election -->
<property>
  <name>yarn.resourcemanager.ha.automatic-failover.embedded</name>
  <value>true</value>
</property>

<!-- Client failover -->
<property>
  <name>yarn.client.failover-proxy-provider</name>
  <value>org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider</value>
</property>

<!-- Retry settings -->
<property>
  <name>yarn.client.failover-max-attempts</name>
  <value>15</value>
</property>

<property>
  <name>yarn.client.failover-sleep-base-ms</name>
  <value>500</value>
</property>

<property>
  <name>yarn.client.failover-sleep-max-ms</name>
  <value>2000</value>
</property>

<!-- Work-preserving RM restart -->
<property>
  <name>yarn.resourcemanager.recovery.enabled</name>
  <value>true</value>
</property>

<property>
  <name>yarn.resourcemanager.work-preserving-recovery.enabled</name>
  <value>true</value>
</property>

<!-- NodeManager recovery -->
<property>
  <name>yarn.nodemanager.recovery.enabled</name>
  <value>true</value>
</property>

<property>
  <name>yarn.nodemanager.recovery.dir</name>
  <value>/var/log/hadoop-yarn/nodemanager/recovery-state</value>
</property>

Building Fault-Tolerant Applications

Designing fault-tolerant applications on Hadoop requires thinking about a property that is deceptively simple to state and surprisingly difficult to achieve in practice: idempotency. An operation is idempotent if executing it multiple times produces the same result as executing it once. This matters in Hadoop because the framework will re-execute tasks on failure, and speculative execution may run duplicate copies of the same task concurrently. If your mapper writes a record to a database on every invocation, a task retry will create duplicate records. This is not a Hadoop-specific concern — it is the central challenge of exactly-once semantics in any distributed system. Kafka addressed it with idempotent producers and transactional writes. Flink uses a two-phase commit protocol with external systems. The patterns shown below (deterministic output paths, upserts instead of inserts, and two-phase commit via OutputCommitter) are foundational techniques that apply far beyond Hadoop.

Idempotent Operations

// Designing idempotent MapReduce operations

import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import java.io.IOException;
import java.util.*;

/**
 * Idempotent operations produce the same result
 * when executed multiple times.
 * Critical for fault tolerance with task retries.
 */
public class IdempotentDesign {

    // ============================================
    // GOOD: Idempotent Mapper
    // ============================================

    public static class IdempotentMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Parse input
            String line = value.toString();
            String[] words = line.split("\\s+");

            // Emit each word
            for (String w : words) {
                word.set(w.toLowerCase());
                context.write(word, one);
            }

            // ✓ Idempotent: Same input → Same output
            // ✓ No side effects
            // ✓ No external state modification
            // ✓ Safe to retry
        }
    }

    // ============================================
    // GOOD: Idempotent Reducer
    // ============================================

    public static class IdempotentReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            context.write(key, new IntWritable(sum));

            // ✓ Idempotent: Same input values → Same sum
            // ✓ No external dependencies
            // ✓ Safe to retry
        }
    }

    // ============================================
    // BAD: Non-Idempotent Mapper (Database writes)
    // ============================================

    public static class NonIdempotentMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private java.sql.Connection dbConnection;

        @Override
        protected void setup(Context context) throws IOException {
            // Connect to database
            try {
                dbConnection = java.sql.DriverManager.getConnection(
                    "jdbc:mysql://db.example.com/analytics",
                    "user", "password"
                );
            } catch (Exception e) {
                throw new IOException("DB connection failed", e);
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                // ✗ BAD: Writing to database
                // If task retries, duplicate records created!
                String sql = "INSERT INTO events (data) VALUES (?)";
                java.sql.PreparedStatement stmt =
                    dbConnection.prepareStatement(sql);
                stmt.setString(1, value.toString());
                stmt.executeUpdate();

                // ✗ Non-idempotent operation
                // ✗ Side effects
                // ✗ Task retry creates duplicates

            } catch (Exception e) {
                throw new IOException("DB write failed", e);
            }
        }
    }

    // ============================================
    // FIXED: Make Database Writes Idempotent
    // ============================================

    public static class IdempotentDatabaseMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private java.sql.Connection dbConnection;

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                // ✓ Use unique key from input
                String recordId = key.toString(); // File offset is unique
                String data = value.toString();

                // ✓ Use REPLACE or INSERT ... ON DUPLICATE KEY UPDATE
                // This makes operation idempotent
                String sql =
                    "REPLACE INTO events (record_id, data) VALUES (?, ?)";

                java.sql.PreparedStatement stmt =
                    dbConnection.prepareStatement(sql);
                stmt.setString(1, recordId);
                stmt.setString(2, data);
                stmt.executeUpdate();

                // ✓ Idempotent: Retry produces same result
                // ✓ No duplicates on retry

            } catch (Exception e) {
                throw new IOException("DB write failed", e);
            }
        }
    }

    // ============================================
    // BAD: Non-Idempotent Counter Update
    // ============================================

    public static class NonIdempotentCounterMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // ✗ BAD: Increment counter in external system
            // If task retries, counter incremented multiple times!
            incrementExternalCounter("records_processed");

            // Process record...
            context.write(new Text("result"), new IntWritable(1));
        }

        private void incrementExternalCounter(String name) {
            // External counter increment (Redis, Memcached, etc.)
            // ✗ Non-idempotent
        }
    }

    // ============================================
    // GOOD: Use Hadoop Counters (Idempotent)
    // ============================================

    public static class IdempotentCounterMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        enum RecordCounters {
            PROCESSED,
            SKIPPED,
            ERRORS
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            try {
                // Process record
                String result = processRecord(value.toString());

                // ✓ Use Hadoop counters
                // Framework handles idempotency during task retries
                context.getCounter(RecordCounters.PROCESSED).increment(1);

                context.write(new Text(result), new IntWritable(1));

            } catch (Exception e) {
                context.getCounter(RecordCounters.ERRORS).increment(1);
            }

            // ✓ Counters are automatically deduplicated on task retry
            // ✓ Idempotent behavior
        }

        private String processRecord(String record) {
            return record.toUpperCase();
        }
    }
}

Handling External Systems

// Safe interaction with external systems

import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.io.*;
import java.io.IOException;
import java.sql.*;

public class ExternalSystemIntegration {

    // ============================================
    // Pattern 1: Write Only in OutputFormat
    // ============================================

    /**
     * Keep mappers/reducers idempotent.
     * Write to external systems only in OutputFormat.
     * OutputFormat is called once per task attempt success.
     */
    public static class SafeDatabaseOutputFormat
            extends OutputFormat<Text, IntWritable> {

        @Override
        public RecordWriter<Text, IntWritable> getRecordWriter(
                TaskAttemptContext context) throws IOException {

            return new DatabaseRecordWriter(context);
        }

        private static class DatabaseRecordWriter
                extends RecordWriter<Text, IntWritable> {

            private Connection connection;
            private PreparedStatement statement;
            private int batchSize = 1000;
            private int recordCount = 0;

            public DatabaseRecordWriter(TaskAttemptContext context)
                    throws IOException {
                try {
                    // Connect to database
                    connection = DriverManager.getConnection(
                        context.getConfiguration().get("db.url"),
                        context.getConfiguration().get("db.user"),
                        context.getConfiguration().get("db.password")
                    );

                    connection.setAutoCommit(false);

                    statement = connection.prepareStatement(
                        "INSERT INTO results (key, value) VALUES (?, ?)"
                    );

                } catch (SQLException e) {
                    throw new IOException("Failed to connect to database", e);
                }
            }

            @Override
            public void write(Text key, IntWritable value)
                    throws IOException {
                try {
                    statement.setString(1, key.toString());
                    statement.setInt(2, value.get());
                    statement.addBatch();

                    recordCount++;

                    if (recordCount % batchSize == 0) {
                        statement.executeBatch();
                        connection.commit();
                    }

                } catch (SQLException e) {
                    throw new IOException("Failed to write record", e);
                }
            }

            @Override
            public void close(TaskAttemptContext context) throws IOException {
                try {
                    // Final batch
                    if (recordCount % batchSize != 0) {
                        statement.executeBatch();
                        connection.commit();
                    }

                    statement.close();
                    connection.close();

                } catch (SQLException e) {
                    throw new IOException("Failed to close database", e);
                }
            }
        }

        @Override
        public void checkOutputSpecs(JobContext context)
                throws IOException {
            // Verify database is accessible
        }

        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext context)
                throws IOException {
            return new DatabaseOutputCommitter();
        }
    }

    // ============================================
    // Pattern 2: Two-Phase Commit
    // ============================================

    /**
     * Write to staging area first, commit in OutputCommitter.
     * Provides atomicity.
     */
    public static class DatabaseOutputCommitter extends OutputCommitter {

        @Override
        public void setupJob(JobContext jobContext) throws IOException {
            // Create staging tables if needed
        }

        @Override
        public void setupTask(TaskAttemptContext taskContext)
                throws IOException {
            // Create task-specific staging table
            String stagingTable = "results_staging_" +
                taskContext.getTaskAttemptID();

            // CREATE TABLE results_staging_taskid (...);
        }

        @Override
        public boolean needsTaskCommit(TaskAttemptContext taskContext)
                throws IOException {
            return true;
        }

        @Override
        public void commitTask(TaskAttemptContext taskContext)
                throws IOException {
            // Move data from staging to final table
            // This is called once per successful task attempt

            String stagingTable = "results_staging_" +
                taskContext.getTaskAttemptID();

            try {
                Connection conn = getConnection(taskContext);

                // Atomic insert from staging
                String sql =
                    "INSERT INTO results SELECT * FROM " + stagingTable;
                conn.createStatement().executeUpdate(sql);

                // Drop staging table
                sql = "DROP TABLE " + stagingTable;
                conn.createStatement().executeUpdate(sql);

                conn.commit();
                conn.close();

            } catch (SQLException e) {
                throw new IOException("Commit failed", e);
            }
        }

        @Override
        public void abortTask(TaskAttemptContext taskContext)
                throws IOException {
            // Drop staging table on failure
            String stagingTable = "results_staging_" +
                taskContext.getTaskAttemptID();

            try {
                Connection conn = getConnection(taskContext);
                String sql = "DROP TABLE IF EXISTS " + stagingTable;
                conn.createStatement().executeUpdate(sql);
                conn.close();

            } catch (SQLException e) {
                // Log but don't throw
            }
        }

        private Connection getConnection(TaskAttemptContext context)
                throws SQLException {
            return DriverManager.getConnection(
                context.getConfiguration().get("db.url"),
                context.getConfiguration().get("db.user"),
                context.getConfiguration().get("db.password")
            );
        }
    }

    // ============================================
    // Pattern 3: Deterministic Output Paths
    // ============================================

    /**
     * Use task attempt ID in output paths to avoid conflicts.
     * Failed attempts write to different locations.
     */
    public static class SafeFileMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private String outputPath;

        @Override
        protected void setup(Context context) throws IOException {
            // Include task attempt ID in path
            TaskAttemptID attemptId = context.getTaskAttemptID();

            // Each task attempt writes to unique location
            outputPath = "/temp/processing/" +
                attemptId.getJobID() + "/" +
                attemptId.getTaskID() + "/" +
                attemptId.getId();

            // If task retries, new attempt writes to different path
            // Old attempt's data can be safely ignored/deleted
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Process and emit
            context.write(new Text("result"), new IntWritable(1));

            // Any side-effect files written to attempt-specific path
            // Failed attempts don't interfere with successful retry
        }
    }
}

Checkpointing Strategies

// Application-level checkpointing for fault tolerance

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import java.io.*;

public class CheckpointingExample {

    // ============================================
    // Pattern 1: Periodic Checkpointing
    // ============================================

    public static class CheckpointedMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private Path checkpointPath;
        private FileSystem fs;
        private long lastCheckpoint;
        private int recordsProcessed;
        private int checkpointInterval = 10000; // Records

        @Override
        protected void setup(Context context) throws IOException {
            fs = FileSystem.get(context.getConfiguration());

            TaskAttemptID attemptId = context.getTaskAttemptID();
            checkpointPath = new Path(
                "/checkpoints/" + attemptId.toString()
            );

            // Try to restore from checkpoint
            if (fs.exists(checkpointPath)) {
                restoreCheckpoint();
            }
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Skip if already processed (based on checkpoint)
            if (key.get() <= lastCheckpoint) {
                return; // Already processed
            }

            // Process record
            processRecord(key, value, context);
            recordsProcessed++;

            // Checkpoint periodically
            if (recordsProcessed % checkpointInterval == 0) {
                checkpoint(key.get());
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException {
            // Final checkpoint
            checkpoint(lastCheckpoint);

            // Delete checkpoint file
            fs.delete(checkpointPath, false);
        }

        private void processRecord(
                LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // Process and emit
            context.write(new Text(value.toString()), new IntWritable(1));
        }

        private void checkpoint(long position) throws IOException {
            // Write checkpoint to HDFS
            FSDataOutputStream out = fs.create(checkpointPath, true);
            out.writeLong(position);
            out.writeInt(recordsProcessed);
            out.close();

            lastCheckpoint = position;
        }

        private void restoreCheckpoint() throws IOException {
            FSDataInputStream in = fs.open(checkpointPath);
            lastCheckpoint = in.readLong();
            recordsProcessed = in.readInt();
            in.close();

            System.out.println("Restored from checkpoint: " +
                "position=" + lastCheckpoint +
                ", records=" + recordsProcessed);
        }
    }

    // ============================================
    // Pattern 2: State Snapshot
    // ============================================

    public static class StatefulReducer
            extends Reducer<Text, IntWritable, Text, IntWritable> {

        private Map<String, Integer> state = new HashMap<>();
        private Path statePath;
        private FileSystem fs;

        @Override
        protected void setup(Context context) throws IOException {
            fs = FileSystem.get(context.getConfiguration());

            statePath = new Path(
                "/state/" + context.getTaskAttemptID() + ".state"
            );

            // Restore state if exists
            if (fs.exists(statePath)) {
                restoreState();
            }
        }

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {

            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }

            // Update state
            state.put(key.toString(), sum);

            // Emit result
            context.write(key, new IntWritable(sum));

            // Checkpoint state every N keys
            if (state.size() % 100 == 0) {
                saveState();
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException {
            // Final state save
            saveState();

            // Clean up
            fs.delete(statePath, false);
        }

        private void saveState() throws IOException {
            FSDataOutputStream out = fs.create(statePath, true);
            ObjectOutputStream oos = new ObjectOutputStream(out);
            oos.writeObject(state);
            oos.close();
        }

        @SuppressWarnings("unchecked")
        private void restoreState() throws IOException {
            FSDataInputStream in = fs.open(statePath);
            try {
                ObjectInputStream ois = new ObjectInputStream(in);
                state = (Map<String, Integer>) ois.readObject();
                ois.close();

                System.out.println("Restored state: " +
                    state.size() + " entries");

            } catch (ClassNotFoundException e) {
                throw new IOException("Failed to restore state", e);
            }
        }
    }

    // ============================================
    // Pattern 3: Progress Tracking
    // ============================================

    public static class ProgressTrackingMapper
            extends Mapper<LongWritable, Text, Text, IntWritable> {

        private Path progressPath;
        private FileSystem fs;
        private BufferedWriter progressWriter;
        private int recordCount = 0;

        @Override
        protected void setup(Context context) throws IOException {
            fs = FileSystem.get(context.getConfiguration());

            progressPath = new Path(
                "/progress/" + context.getTaskAttemptID() + ".progress"
            );

            FSDataOutputStream out = fs.create(progressPath);
            progressWriter = new BufferedWriter(
                new OutputStreamWriter(out)
            );
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {

            // Process record
            context.write(new Text(value.toString()), new IntWritable(1));

            recordCount++;

            // Log progress every 1000 records
            if (recordCount % 1000 == 0) {
                logProgress(key.get());

                // Report progress to framework (prevents timeout)
                context.progress();
            }
        }

        @Override
        protected void cleanup(Context context) throws IOException {
            progressWriter.close();

            // Progress file can be used for debugging/monitoring
            // Delete or keep based on policy
        }

        private void logProgress(long position) throws IOException {
            String progress = String.format(
                "%d,%d,%d%n",
                System.currentTimeMillis(),
                position,
                recordCount
            );
            progressWriter.write(progress);
            progressWriter.flush();
        }
    }
}

Monitoring and Health Checks

Health Check Scripts

#!/bin/bash
# Hadoop cluster health monitoring script

# ============================================
# HDFS Health Checks
# ============================================

check_hdfs_health() {
    echo "=== HDFS Health Check ==="

    # 1. Check NameNode status
    echo "NameNode Status:"
    hdfs dfsadmin -report | grep -A 5 "Live datanodes"

    # 2. Check for under-replicated blocks
    echo -e "\nUnder-Replicated Blocks:"
    under_rep=$(hdfs fsck / | grep "Under-replicated blocks")
    echo "$under_rep"

    if echo "$under_rep" | grep -q "0 "; then
        echo "✓ All blocks properly replicated"
    else
        echo "⚠ WARNING: Under-replicated blocks found"
    fi

    # 3. Check for missing blocks
    echo -e "\nMissing Blocks:"
    missing=$(hdfs fsck / | grep "Missing blocks")
    echo "$missing"

    if echo "$missing" | grep -q "0 "; then
        echo "✓ No missing blocks"
    else
        echo "✗ CRITICAL: Missing blocks detected!"
    fi

    # 4. Check for corrupt blocks
    echo -e "\nCorrupt Blocks:"
    corrupt=$(hdfs fsck / | grep "Corrupt blocks")
    echo "$corrupt"

    if echo "$corrupt" | grep -q "0 "; then
        echo "✓ No corrupt blocks"
    else
        echo "✗ CRITICAL: Corrupt blocks detected!"
    fi

    # 5. Check DataNode health
    echo -e "\nDataNode Health:"
    dead_nodes=$(hdfs dfsadmin -report | grep "Dead datanodes")
    echo "$dead_nodes"

    # 6. Check disk space
    echo -e "\nCluster Capacity:"
    hdfs dfsadmin -report | grep -E "DFS Used%|DFS Remaining"

    capacity=$(hdfs dfsadmin -report | grep "DFS Used%" | awk '{print $3}' | tr -d '%')
    if [ "$capacity" -gt 80 ]; then
        echo "⚠ WARNING: Disk usage above 80%"
    else
        echo "✓ Disk usage healthy"
    fi
}

# ============================================
# YARN Health Checks
# ============================================

check_yarn_health() {
    echo -e "\n=== YARN Health Check ==="

    # 1. Check NodeManager status
    echo "NodeManagers:"
    yarn node -list -all | grep -E "RUNNING|UNHEALTHY|LOST"

    # 2. Check ResourceManager HA status
    echo -e "\nResourceManager HA Status:"
    yarn rmadmin -getAllServiceState

    # 3. Check running applications
    echo -e "\nRunning Applications:"
    yarn application -list -appStates RUNNING | wc -l

    # 4. Check failed applications
    echo -e "\nRecent Failed Applications:"
    yarn application -list -appStates FAILED | head -5

    # 5. Check queue capacity
    echo -e "\nQueue Usage:"
    yarn queue -status default
}

# ============================================
# Service Status Checks
# ============================================

check_service_status() {
    echo -e "\n=== Service Status Check ==="

    services=(
        "hadoop-hdfs-namenode"
        "hadoop-hdfs-datanode"
        "hadoop-yarn-resourcemanager"
        "hadoop-yarn-nodemanager"
    )

    for service in "${services[@]}"; do
        if systemctl is-active --quiet "$service"; then
            echo "✓ $service: RUNNING"
        else
            echo "✗ $service: NOT RUNNING"
        fi
    done
}

# ============================================
# Log Analysis
# ============================================

check_recent_errors() {
    echo -e "\n=== Recent Errors ==="

    # Check NameNode logs
    echo "NameNode Errors (last hour):"
    find /var/log/hadoop-hdfs -name "hadoop-*-namenode-*.log" \
        -mmin -60 -exec grep -i "error\|exception" {} \; | \
        tail -10

    # Check ResourceManager logs
    echo -e "\nResourceManager Errors (last hour):"
    find /var/log/hadoop-yarn -name "yarn-*-resourcemanager-*.log" \
        -mmin -60 -exec grep -i "error\|exception" {} \; | \
        tail -10
}

# ============================================
# Network Connectivity
# ============================================

check_network() {
    echo -e "\n=== Network Connectivity ==="

    # Check NameNode connectivity
    namenode_host="namenode.example.com"
    namenode_port=8020

    if timeout 5 bash -c "cat < /dev/null > /dev/tcp/$namenode_host/$namenode_port"; then
        echo "✓ NameNode reachable ($namenode_host:$namenode_port)"
    else
        echo "✗ Cannot reach NameNode"
    fi

    # Check ResourceManager connectivity
    rm_host="resourcemanager.example.com"
    rm_port=8032

    if timeout 5 bash -c "cat < /dev/null > /dev/tcp/$rm_host/$rm_port"; then
        echo "✓ ResourceManager reachable ($rm_host:$rm_port)"
    else
        echo "✗ Cannot reach ResourceManager"
    fi
}

# ============================================
# Performance Metrics
# ============================================

collect_metrics() {
    echo -e "\n=== Performance Metrics ==="

    # HDFS metrics
    echo "HDFS Metrics:"
    hdfs dfsadmin -report | grep -E "Live datanodes|DFS Used|DFS Remaining"

    # YARN metrics
    echo -e "\nYARN Metrics:"
    yarn node -list | grep "Total Nodes"

    # JVM metrics
    echo -e "\nNameNode JVM Heap:"
    jps | grep NameNode | awk '{print $1}' | xargs -I {} jstat -gc {} | tail -1
}

# ============================================
# Main Execution
# ============================================

main() {
    echo "Hadoop Cluster Health Check"
    echo "Timestamp: $(date)"
    echo "========================================"

    check_hdfs_health
    check_yarn_health
    check_service_status
    check_recent_errors
    check_network
    collect_metrics

    echo -e "\n========================================"
    echo "Health check complete"
}

# Run all checks
main

# Exit with error if critical issues found
if grep -q "CRITICAL" <<< "$(main)"; then
    exit 1
fi

exit 0

Monitoring Configuration

// Programmatic health monitoring

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.protocol.*;

public class ClusterMonitoring {

    /**
     * Monitor HDFS health
     */
    public static class HDFSHealthMonitor {

        private FileSystem fs;
        private DFSClient dfsClient;

        public HDFSHealthMonitor(Configuration conf) throws Exception {
            this.fs = FileSystem.get(conf);
            this.dfsClient = new DFSClient(
                FileSystem.getDefaultUri(conf), conf
            );
        }

        public HealthStatus checkHealth() throws Exception {
            HealthStatus status = new HealthStatus();

            // 1. Check NameNode availability
            try {
                fs.getFileStatus(new Path("/"));
                status.nameNodeAvailable = true;
            } catch (Exception e) {
                status.nameNodeAvailable = false;
                status.addError("NameNode unavailable: " + e.getMessage());
            }

            // 2. Check for missing blocks
            HdfsFileStatus[] files = dfsClient.listPaths("/", new byte[0]);
            for (HdfsFileStatus file : files) {
                checkFileHealth(file.getFullPath(new Path("/")), status);
            }

            // 3. Check DataNode health
            DatanodeInfo[] datanodes = dfsClient.datanodeReport(
                HdfsConstants.DatanodeReportType.ALL
            );

            int liveNodes = 0;
            int deadNodes = 0;

            for (DatanodeInfo dn : datanodes) {
                if (dn.isAlive()) {
                    liveNodes++;
                } else {
                    deadNodes++;
                    status.addWarning("Dead DataNode: " + dn.getName());
                }
            }

            status.liveDataNodes = liveNodes;
            status.deadDataNodes = deadNodes;

            // 4. Check cluster capacity
            FsStatus fsStatus = fs.getStatus();
            long capacity = fsStatus.getCapacity();
            long used = fsStatus.getUsed();
            long remaining = fsStatus.getRemaining();

            double usedPercent = (double) used / capacity * 100;

            status.capacity = capacity;
            status.used = used;
            status.usedPercent = usedPercent;

            if (usedPercent > 90) {
                status.addError("Disk usage critical: " + usedPercent + "%");
            } else if (usedPercent > 80) {
                status.addWarning("Disk usage high: " + usedPercent + "%");
            }

            return status;
        }

        private void checkFileHealth(Path path, HealthStatus status)
                throws Exception {

            FileStatus fileStatus = fs.getFileStatus(path);

            if (fileStatus.isDirectory()) {
                return; // Skip directories
            }

            // Get block locations
            BlockLocation[] blocks = fs.getFileBlockLocations(
                fileStatus, 0, fileStatus.getLen()
            );

            for (BlockLocation block : blocks) {
                int replicas = block.getHosts().length;
                int expectedReplicas = fileStatus.getReplication();

                if (replicas < expectedReplicas) {
                    status.underReplicatedBlocks++;
                    status.addWarning(
                        "Under-replicated: " + path +
                        " (has " + replicas + ", needs " + expectedReplicas + ")"
                    );
                }

                if (replicas == 0) {
                    status.missingBlocks++;
                    status.addError("Missing block: " + path);
                }
            }
        }
    }

    /**
     * Health status data structure
     */
    public static class HealthStatus {
        public boolean nameNodeAvailable;
        public int liveDataNodes;
        public int deadDataNodes;
        public int underReplicatedBlocks;
        public int missingBlocks;
        public long capacity;
        public long used;
        public double usedPercent;

        private List<String> errors = new ArrayList<>();
        private List<String> warnings = new ArrayList<>();

        public void addError(String error) {
            errors.add(error);
        }

        public void addWarning(String warning) {
            warnings.add(warning);
        }

        public boolean isHealthy() {
            return errors.isEmpty() &&
                   underReplicatedBlocks == 0 &&
                   missingBlocks == 0 &&
                   nameNodeAvailable &&
                   deadDataNodes == 0;
        }

        public String getReport() {
            StringBuilder report = new StringBuilder();

            report.append("=== HDFS Health Report ===\n");
            report.append("NameNode: ")
                  .append(nameNodeAvailable ? "AVAILABLE" : "UNAVAILABLE")
                  .append("\n");
            report.append("Live DataNodes: ").append(liveDataNodes).append("\n");
            report.append("Dead DataNodes: ").append(deadDataNodes).append("\n");
            report.append("Under-replicated blocks: ")
                  .append(underReplicatedBlocks).append("\n");
            report.append("Missing blocks: ").append(missingBlocks).append("\n");
            report.append("Capacity: ").append(formatBytes(capacity)).append("\n");
            report.append("Used: ").append(formatBytes(used))
                  .append(" (").append(String.format("%.2f%%", usedPercent))
                  .append(")\n");

            if (!errors.isEmpty()) {
                report.append("\nERRORS:\n");
                for (String error : errors) {
                    report.append("  ✗ ").append(error).append("\n");
                }
            }

            if (!warnings.isEmpty()) {
                report.append("\nWARNINGS:\n");
                for (String warning : warnings) {
                    report.append("  ⚠ ").append(warning).append("\n");
                }
            }

            return report.toString();
        }

        private String formatBytes(long bytes) {
            String[] units = {"B", "KB", "MB", "GB", "TB", "PB"};
            int unit = 0;
            double size = bytes;

            while (size >= 1024 && unit < units.length - 1) {
                size /= 1024;
                unit++;
            }

            return String.format("%.2f %s", size, units[unit]);
        }
    }

    /**
     * Example usage
     */
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        HDFSHealthMonitor monitor = new HDFSHealthMonitor(conf);
        HealthStatus status = monitor.checkHealth();

        System.out.println(status.getReport());

        if (!status.isHealthy()) {
            System.exit(1); // Alert monitoring system
        }
    }
}

Interview Questions

Answer:Detection:
  • NameNode monitors DataNodes via heartbeats (every 3 seconds)
  • After 10 missed heartbeats (30 seconds), DataNode marked as stale
  • After 10 minutes of no heartbeat, DataNode marked as dead
Recovery Process:
  1. Identify affected blocks: NameNode scans metadata to find all blocks that were stored on the failed DataNode
  2. Determine under-replicated blocks: For each affected block, check if it now has fewer replicas than the configured replication factor
  3. Prioritize re-replication:
    • Blocks with 0 replicas (highest priority - data loss risk)
    • Blocks with 1 replica (high priority)
    • Blocks below target replication (normal priority)
  4. Schedule re-replication:
    • Select source DataNode (has healthy replica, low load)
    • Select target DataNode (free space, follows rack-awareness)
    • Issue replication command
  5. Copy blocks: Source DataNode streams block data to target DataNode
  6. Verify and update: Target verifies checksum, reports to NameNode, metadata updated
Example:
Before failure:
Block B1: [DN1, DN2, DN3] (3 replicas) ✓

DN2 fails:
Block B1: [DN1, DN3] (2 replicas) - Under-replicated!

Re-replication:
NameNode: "DN1, copy B1 to DN4"

After recovery:
Block B1: [DN1, DN3, DN4] (3 replicas) ✓
Key Points:
  • No data loss if at least one replica survives
  • Recovery happens automatically
  • Client reads are not affected (can use remaining replicas)
  • Process is gradual to avoid network saturation
Answer:Speculative Execution:
  • Mechanism to handle slow tasks (stragglers)
  • Framework launches duplicate task on different node
  • First task to complete wins, other is killed
  • Improves job completion time at cost of extra resources
How It Works:
10 map tasks running:
Tasks 1-9: Complete in 2 minutes
Task 10: Still at 30% after 5 minutes (slow disk)

Framework detects:
- Task 10 significantly slower than average
- Resources available

Action:
- Launch backup of Task 10 on faster node
- Both tasks process same input
- Faster one completes first
- Slower one killed

Result:
- Job completes in ~6 minutes instead of ~15 minutes
When to Disable:
  1. Non-Idempotent Operations:
    • Tasks with side effects (database writes, API calls)
    • Duplicate execution causes problems
    • Example: Incrementing external counter
  2. High Resource Utilization:
    • Cluster near capacity
    • Extra task copies compete for resources
    • May actually slow down job
  3. Tasks with External Dependencies:
    • Rate-limited API calls
    • License-limited software
    • Shared external resources
  4. Known Data Skew:
    • Some tasks legitimately take longer (processing more data)
    • Speculation wastes resources
    • Better to handle via custom partitioning
  5. Debugging:
    • Investigating task failures
    • Want to see actual failure, not masked by successful backup
Configuration:
// Disable speculation
conf.setBoolean("mapreduce.map.speculative", false);
conf.setBoolean("mapreduce.reduce.speculative", false);

// Adjust speculation threshold
conf.setDouble("mapreduce.job.speculative.slowtaskthreshold", 1.5);
Best Practice: Enable for pure computation, disable for side effects.
Answer:Architecture Components:
  1. Active NameNode: Serves all client requests
  2. Standby NameNode: Hot standby, ready to take over
  3. Quorum Journal Manager (QJM): Shared edit log storage
  4. ZooKeeper: Leader election and fencing
  5. ZooKeeper Failover Controller (ZKFC): Monitors NameNode health
Quorum Journal Manager (QJM):
Role: Shared edit log storage

Components:
- 3+ Journal Nodes (odd number)
- Quorum: Majority must acknowledge writes

Write Process:
Active NN writes edit → QJM

QJM replicates to all Journal Nodes

Wait for majority ACK (2 of 3)

Acknowledge to Active NN

Read Process:
Standby NN reads edits from QJM

Applies edits to its in-memory state

Stays synchronized with Active

Benefits:
- No single point of failure
- Guaranteed durability (quorum)
- Shared storage without NFS
ZooKeeper’s Role:
1. Leader Election:
   - Only one NN can be Active
   - Active holds ephemeral lock in ZK
   - If Active fails, lock released
   - Standby acquires lock, becomes Active

2. Fencing (Prevent Split-Brain):
   - Ensures old Active can't serve requests
   - SSH fence: Kill old NN process
   - Shell fence: Custom script
   - Prevents two Active NNs

3. Health Monitoring:
   - ZKFC monitors NN health
   - Heartbeat checks
   - RPC connectivity tests
   - Initiates failover if unhealthy
Failover Process:
Normal Operation:
Active NN (nn1) ← Client requests

Write edits to QJM

Standby NN (nn2) reads edits from QJM

Failure Detected:
ZKFC detects nn1 failure

ZKFC releases ZK lock

Standby ZKFC acquires lock

Fence old Active (SSH kill)

Standby reads latest edits from QJM

Standby promotes to Active

Begin serving requests

Failover time: Typically < 1 minute
Why Both QJM and ZooKeeper?:
  • QJM: Data plane (edit log storage and replication)
  • ZooKeeper: Control plane (coordination, leader election)
  • Separation of concerns
  • Each specialized for its task
Answer:Why Idempotency Matters:
  • Tasks may be retried on failure
  • Speculative execution runs duplicate tasks
  • Non-idempotent operations cause:
    • Duplicate records
    • Incorrect counters
    • Data corruption
    • Side effect chaos
Idempotent Design Patterns:1. Pure Functions (No Side Effects):
// GOOD: Idempotent
public void map(LongWritable key, Text value, Context context) {
    String word = value.toString().toLowerCase();
    context.write(new Text(word), new IntWritable(1));
    // Same input → Same output
    // No external state modification
}

// BAD: Non-idempotent
public void map(LongWritable key, Text value, Context context) {
    database.insert(value); // Retry = duplicate!
    externalCounter.increment(); // Retry = wrong count!
}
2. Use Hadoop Counters:
// GOOD: Framework handles deduplication
context.getCounter("MyCounters", "PROCESSED").increment(1);

// BAD: External counter
redis.incr("processed_count"); // Not deduplicated on retry
3. Deterministic Output Paths:
// GOOD: Include task attempt ID
String outputPath = "/output/" +
    context.getTaskAttemptID().toString();
// Each retry writes to different location

// BAD: Shared path
String outputPath = "/output/task-" + taskId;
// Retries overwrite each other
4. Database Operations - Use Unique Keys:
// GOOD: REPLACE or UPSERT with unique key
String sql = "REPLACE INTO results (record_id, value) VALUES (?, ?)";
stmt.setString(1, key.toString()); // Unique key from input
stmt.setString(2, value.toString());
// Retry produces same result

// BAD: INSERT without unique key
String sql = "INSERT INTO results (value) VALUES (?)";
// Retry creates duplicate record
5. Write in OutputFormat, Not Map/Reduce:
// GOOD: OutputFormat called once per successful task
public class MyOutputFormat extends OutputFormat {
    public RecordWriter getRecordWriter(TaskAttemptContext context) {
        return new RecordWriter() {
            public void write(K key, V value) {
                // Write to external system
                // Called only for successful task attempt
            }
        };
    }
}

// BAD: Writing in map() or reduce()
// Called multiple times if task retries
6. Two-Phase Commit:
// Write to staging area in task
public void map(K key, V value, Context context) {
    String stagingPath = "/staging/" + context.getTaskAttemptID();
    writeToStaging(stagingPath, value);
}

// Commit in OutputCommitter
public void commitTask(TaskAttemptContext context) {
    // Move from staging to final location
    // Called once per successful task
    String stagingPath = "/staging/" + context.getTaskAttemptID();
    String finalPath = "/output/data";
    fs.rename(stagingPath, finalPath);
}
Testing for Idempotency:
@Test
public void testMapperIdempotency() {
    MyMapper mapper = new MyMapper();
    Context context = mock(Context.class);

    LongWritable key = new LongWritable(1);
    Text value = new Text("test");

    // Run twice with same input
    mapper.map(key, value, context);
    List<Pair<Text, IntWritable>> output1 = captureOutput(context);

    mapper.map(key, value, context);
    List<Pair<Text, IntWritable>> output2 = captureOutput(context);

    // Should produce identical output
    assertEquals(output1, output2);
}
Key Principle: Design tasks as if they will be executed multiple times, because they might be.
Answer:Diagnosis:1. Identify the Problem:
# Check job progress
yarn application -status <application_id>

# Look for:
# - Most reducers at 100%, one at low %
# - One reducer with much higher input size
# - Task timeout errors for specific reducer

# Check task logs
yarn logs -applicationId <app_id> -containerId <container_id>

# Look for:
# - "Task timeout" errors
# - Large shuffle size for one reducer
# - Memory errors (GC overhead, OOM)
2. Analyze Data Distribution:
// Add counters to mapper to see key distribution
public void map(K key, V value, Context context) {
    // Count records per key
    context.getCounter("KeyDistribution", key.toString())
          .increment(1);

    context.write(key, value);
}

// After job, check counters
// Will show which keys have most data
Root Cause: Data skew - one key has disproportionate amount of dataSolutions:1. Custom Partitioner (Salting):
// Add random salt to hot keys
public static class SaltingMapper
        extends Mapper<Text, Text, Text, Text> {

    private Set<String> hotKeys = new HashSet<>(
        Arrays.asList("popular_key1", "popular_key2")
    );
    private Random random = new Random();

    @Override
    protected void map(Text key, Text value, Context context)
            throws IOException, InterruptedException {

        String keyStr = key.toString();

        if (hotKeys.contains(keyStr)) {
            // Add random salt to distribute across reducers
            int salt = random.nextInt(10); // 10 reducers for hot key
            Text saltedKey = new Text(keyStr + "_salt_" + salt);
            context.write(saltedKey, value);
        } else {
            // Normal keys unchanged
            context.write(key, value);
        }
    }
}

// Desalting reducer
public static class DesaltingReducer
        extends Reducer<Text, Text, Text, Text> {

    @Override
    protected void reduce(Text key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {

        // Process partial results
        String result = processValues(values);

        // Remove salt from key
        String originalKey = key.toString().replaceAll("_salt_\\d+$", "");

        context.write(new Text(originalKey), new Text(result));
    }
}

// Second job to combine partial results
2. Combiner (Reduce Hot Key Volume):
// Add combiner to reduce data volume before shuffle
job.setCombinerClass(MyReducer.class);

// For associative operations (sum, count, max, min)
// Combiner pre-aggregates on map side
// Reduces shuffle data volume
3. Increase Reducer Memory/Timeout:
Configuration conf = new Configuration();

// Increase reducer memory
conf.setInt("mapreduce.reduce.memory.mb", 8192); // 8GB

// Increase task timeout
conf.setLong("mapreduce.task.timeout", 3600000); // 60 min

// Increase JVM heap
conf.set("mapreduce.reduce.java.opts", "-Xmx6g");

// More reducers to distribute load
job.setNumReduceTasks(100); // Increase from default
4. Use Composite Keys (For Joins):
// If skew is from join, use composite key
public static class CompositeKey implements WritableComparable {
    private Text joinKey;
    private Text tableName;

    // Secondary sort ensures order
    // Process in multiple passes
}
5. Separate Hot Keys:
// Process hot keys separately

// Job 1: Filter out hot keys
public void map(Text key, Text value, Context context) {
    if (isHotKey(key)) {
        // Write to separate output
        multipleOutputs.write(key, value, "hot-keys");
    } else {
        context.write(key, value);
    }
}

// Job 2: Process normal keys (distributed)
// Job 3: Process hot keys (single reducer or in-memory)
6. Disable Speculative Execution:
// Prevent wasting resources on backup tasks
// for legitimately slow reducer
conf.setBoolean("mapreduce.reduce.speculative", false);
Prevention:
// Monitor key distribution during development
public static class MonitoringMapper extends Mapper {
    private Map<String, Long> keyCount = new HashMap<>();

    @Override
    protected void map(K key, V value, Context context) {
        keyCount.merge(key.toString(), 1L, Long::sum);
        context.write(key, value);
    }

    @Override
    protected void cleanup(Context context) {
        // Log key distribution
        keyCount.entrySet().stream()
            .sorted((e1, e2) -> e2.getValue().compareTo(e1.getValue()))
            .limit(10)
            .forEach(e ->
                System.out.println("Top key: " + e.getKey() +
                                 " count: " + e.getValue())
            );
    }
}
Best Solution: Depends on use case
  • Aggregation: Use salting + two-stage aggregation
  • Joins: Use map-side join for hot keys
  • Counting: Use combiner
  • One-off: Increase resources/timeout

Summary

Fault tolerance is the cornerstone of Hadoop’s reliability. The system is designed with the assumption that failures are normal, not exceptional. Through sophisticated mechanisms like data replication, task retries, speculative execution, and high availability architectures, Hadoop achieves remarkable resilience. The patterns established by Hadoop’s fault tolerance design — replication for durability, heartbeat-based failure detection, speculative execution for stragglers, and quorum-based consensus for metadata — have become standard building blocks in modern distributed systems. You will find these same patterns in Apache Kafka (ISR-based replication), Apache Cassandra (tunable replication and hinted handoff), Kubernetes (pod health checks and restart policies), and cloud-native databases like CockroachDB (Raft-based replication with automatic rebalancing). Learning Hadoop’s fault tolerance is not just about Hadoop — it is about learning the vocabulary and design patterns of distributed system reliability that transfer to every system you will build or operate. Key Takeaways:
  1. HDFS Protection: Multi-replica storage, rack awareness, and automatic recovery ensure no data loss
  2. MapReduce Resilience: Task-level retries, ApplicationMaster recovery, and speculative execution handle compute failures
  3. YARN Reliability: ResourceManager HA, work-preserving restart, and container recovery maintain cluster availability
  4. Design Principles: Idempotent operations, checkpointing, and proper external system handling make applications fault-tolerant
  5. Monitoring: Continuous health checks and proactive monitoring catch issues before they become critical
Understanding and properly configuring these mechanisms is essential for running production Hadoop clusters at scale.

Interview Deep-Dive

Strong Answer:The fundamental trade-off is storage overhead versus recovery performance. 3x replication stores three complete copies of every block, consuming 200% extra storage (3TB stored for every 1TB of data). Erasure coding with the RS-6-3 policy stores 6 data blocks and 3 parity blocks, consuming only 50% extra storage (1.5TB for every 1TB of data). For a petabyte-scale cluster, switching from replication to erasure coding for cold data can save hundreds of terabytes of disk, which translates directly into fewer servers and lower operational cost.The recovery behavior is fundamentally different. When a node fails with 3x replication, recovery is simple: the NameNode picks a healthy DataNode that has a copy of the block and tells it to send that copy to a new target. This is a straightforward block copy — one network transfer per block, no computation required. It is fast and cheap.With erasure coding, recovery requires reconstruction. If one of the 9 blocks (6 data + 3 parity) is lost, the NameNode must coordinate reading at least 6 of the remaining 8 blocks from their respective DataNodes, perform a matrix multiplication in Galois Field arithmetic to reconstruct the missing block, and write the result to a new DataNode. This means recovery reads 6 blocks worth of data over the network (versus 1 block for replication), uses significant CPU for the mathematical reconstruction, and takes longer to complete. During recovery, degraded reads also require this reconstruction, adding latency to client reads.When to use each: Use 3x replication for hot data — data that is actively read by MapReduce jobs, Spark queries, or HBase. The local read optimization (reading from the same node that stores the block) only works with replication, and the fast recovery ensures that failures do not impact running jobs significantly. Use erasure coding for cold data — archived logs, historical data that is rarely accessed, backup copies. The storage savings are substantial, and the occasional higher-latency read is acceptable for data that is accessed infrequently.The practical guideline many organizations use: data less than 30 days old gets 3x replication (hot tier), data between 30 days and 1 year gets RS-6-3 erasure coding (warm tier), and data older than 1 year gets moved to cloud cold storage (S3 Glacier or equivalent).Follow-up: Can you lose data with erasure coding? Under what conditions?With RS-6-3, you can tolerate the simultaneous loss of any 3 out of 9 blocks. If you lose 4 or more blocks simultaneously (before recovery can reconstruct the missing ones), the data is unrecoverable. This is why rack awareness matters even more with erasure coding — the 9 blocks must be spread across enough racks that a single rack failure (which could take out multiple blocks) does not exceed the tolerance threshold. The HDFS erasure coding block placement policy ensures that no two blocks from the same stripe are placed on the same rack. A correlated failure (like a power outage affecting multiple racks) is the primary risk scenario.
Strong Answer:NameNode HA uses five components working together. The Active NameNode handles all client requests and writes metadata changes (edit log entries) to the Quorum Journal Manager. The Standby NameNode continuously reads these edit log entries from the QJM and applies them to its own in-memory namespace, keeping itself synchronized. The Quorum Journal Manager is a set of 3 or more JournalNodes that store the edit log — writes must be acknowledged by a majority (quorum) before the Active NameNode considers them durable. The ZooKeeper Failover Controller (ZKFC) runs on each NameNode machine and monitors the local NameNode’s health via RPC calls and heartbeats. The ZooKeeper ensemble provides the leader election mechanism — the ZKFC of the Active NameNode holds an ephemeral znode (lock), and if that ZKFC stops sending heartbeats (because its NameNode died), the lock is released and the Standby’s ZKFC can acquire it.Normal failover sequence: Active NameNode crashes. Its ZKFC detects the failure (health check timeout). ZKFC’s ZooKeeper session expires, releasing the ephemeral lock. Standby’s ZKFC acquires the lock. Standby ZKFC initiates fencing — it attempts to SSH into the old Active’s machine and kill the NameNode process (to prevent a zombie Active from corrupting the edit log). Standby reads any remaining edit log entries from QJM to catch up to the latest state. Standby promotes itself to Active and begins serving client requests. Total failover time: typically 10-30 seconds.Now, the failure scenario where HA itself breaks — the split-brain. Imagine a network partition where the Active NameNode is still running and serving clients on one side of the partition, but the ZKFC’s connection to ZooKeeper is severed. ZooKeeper thinks the Active is dead (session timeout), releases the lock, and the Standby promotes itself. Now you have two Active NameNodes. Both are accepting writes. Both are writing to the QJM. This is catastrophic because the edit log becomes inconsistent.Hadoop prevents this through Epoch Fencing in the QJM. When the new Active NameNode is promoted, it is assigned a monotonically increasing epoch number. It sends this epoch to all JournalNodes. The JournalNodes will reject any write with an epoch lower than the current one. So when the old Active (with the old epoch) tries to write to the QJM, its writes are rejected, it receives an error, and it shuts itself down. This is the critical safety mechanism: the QJM acts as a distributed fence, and the epoch number is the proof of authority. Without the QJM’s epoch fencing, NameNode HA would be fundamentally unsafe.Follow-up: What happens if ZooKeeper itself loses a majority of its nodes?If ZooKeeper loses a majority (e.g., 2 out of 3 ZK nodes), it cannot form a quorum and stops serving requests entirely. This means no failover can happen — the ZKFC cannot acquire or release locks. The Active NameNode continues serving normally (it does not depend on ZooKeeper for ongoing operations, only for failover), but if the Active NameNode fails while ZooKeeper is down, there is no automatic failover. The Standby remains standby. This is why ZooKeeper should run on at least 5 nodes in production (tolerating 2 failures) and should never share machines with other high-resource services that might cause OOM kills.
Strong Answer:This is the classic exactly-once delivery problem in distributed systems, and it happens because MapReduce provides at-least-once execution semantics, not exactly-once. When a map or reduce task fails and is retried, the new attempt re-processes the same input from the beginning. If the failed task had already written some records to the external database before crashing, those records persist in the database. The retry writes them again, creating duplicates.The root cause is that the database write and the task completion are not atomic. The task wrote to the database (side effect committed), then crashed before reporting success to the ApplicationMaster. From MapReduce’s perspective, the task never completed, so it must be retried. From the database’s perspective, the records exist.There are three solution patterns, in increasing order of robustness.Pattern 1 — Idempotent writes with natural keys. If each input record has a natural unique identifier, use UPSERT or REPLACE INTO with that identifier as the primary key. When the retry writes the same records, they overwrite the existing ones rather than creating duplicates. This is the simplest solution and works when your data has natural keys. Example: if processing user events, use (user_id, event_timestamp) as the composite key.Pattern 2 — OutputCommitter with staging. Do not write to the final database table during the task. Instead, write to a staging table whose name includes the TaskAttemptID. In the OutputCommitter’s commitTask method (which is called exactly once for the successful attempt), atomically move data from the staging table to the final table. In abortTask (called for failed attempts), drop the staging table. This provides atomicity: either all records from a task attempt appear in the final table, or none do.Pattern 3 — Two-phase commit with transaction IDs. For the strongest guarantees, assign a unique transaction ID to each task attempt (using the TaskAttemptID). Write all records within a database transaction tagged with this ID. In the commit phase, mark the transaction as committed. On retry, first check if the previous attempt’s transaction was committed — if so, skip reprocessing. This is essentially implementing a distributed transaction protocol between MapReduce and the database.The honest truth is that true exactly-once semantics across a distributed computation engine and an external system is extremely difficult. Most production systems settle for “effectively once” — idempotent operations that produce the correct final result even if individual operations execute multiple times. This is the approach taken by Kafka’s exactly-once semantics (which is really idempotent production plus transactional consumption) and Flink’s checkpoint-based approach (which provides exactly-once within the Flink pipeline but requires idempotent sinks for external systems).Follow-up: How does Apache Flink solve this problem differently than MapReduce?Flink uses distributed snapshots (the Chandy-Lamport algorithm) to create consistent checkpoints of the entire pipeline state. When a failure occurs, Flink rolls back to the last checkpoint and replays input from that point. Combined with Kafka’s offset management, this means Flink reprocesses only the records since the last checkpoint. For external sinks, Flink provides a two-phase commit sink that writes to a staging area during processing and commits atomically when a checkpoint completes. This provides end-to-end exactly-once semantics without requiring the application developer to implement idempotency manually. The trade-off is that Flink’s checkpointing adds latency (the checkpoint barrier must propagate through the entire DAG) and requires the source to support replay (Kafka does, a bare TCP socket does not).
Strong Answer:This is a correlated failure scenario — the most dangerous type because it violates the assumption that failures are independent. Let me trace through each layer.HDFS response: The NameNode stops receiving heartbeats from all 40 DataNodes. After the heartbeat timeout (default 10 minutes, but the stale interval of 30 seconds triggers earlier marking), it marks all 40 nodes as dead. It then scans its block map to identify every block that had a replica on those nodes. With the default replication factor of 3 and rack-aware placement (2 replicas on one rack, 1 on another), losing an entire rack means every block that had 2 of its 3 replicas on that rack is now down to 1 replica. Blocks that had only 1 replica on the failed rack still have 2 replicas and are less urgent. The NameNode prioritizes re-replication: blocks with 1 remaining replica get highest priority (one more failure means data loss), blocks with 2 remaining replicas get normal priority. The re-replication creates a burst of cross-rack network traffic as the remaining 960 nodes copy blocks to restore the replication factor. On a petabyte-scale cluster, this can take hours and significantly impacts cluster performance during recovery.YARN response: The ResourceManager marks all 40 NodeManagers as lost. All containers running on those nodes — map tasks, reduce tasks, ApplicationMasters — are killed. For each affected application: if the ApplicationMaster was on a failed node, YARN restarts it on a healthy node (up to the maximum AM attempts). If the ApplicationMaster survived but tasks were on failed nodes, the AM marks those tasks as failed and reschedules them on healthy nodes. Reduce tasks that were in the shuffle phase may need to re-fetch map outputs that were on failed nodes — if the map output nodes are among the 40 dead nodes, those map tasks must be re-executed first.MapReduce job impact: Running jobs are disrupted but not killed (assuming the ApplicationMaster survives or is restarted). Map tasks on failed nodes are retried on other nodes. Reduce tasks that had fetched shuffle data from failed nodes must re-fetch from surviving map output locations or wait for map tasks to be re-run. The total job completion time increases significantly — potentially doubling or more depending on how much work was on the failed rack. Speculative execution helps if it was enabled, but 40 nodes lost simultaneously is beyond what speculation is designed to handle.The critical nuance: if the failed rack happened to contain a JournalNode (for NameNode HA), you lose one of your 3 JournalNodes. The QJM still has a quorum (2 of 3) and continues operating, but you have lost fault tolerance for the edit log. Restoring the JournalNode should be a high-priority task. Similarly, if a ZooKeeper node was on the failed rack, the ZK ensemble loses one member and is running degraded.Follow-up: How would you design the rack layout to minimize the blast radius of this type of failure?The key principle is spreading critical services across racks so that no single rack failure takes down a quorum of any service. Place JournalNodes on 3 different racks. Place ZooKeeper nodes on 3 or 5 different racks. Never put both NameNodes on the same rack. Never put both ResourceManagers on the same rack. For data, the rack-aware placement policy already handles this — but verify that it is correctly configured by checking the topology script. Additionally, consider increasing the replication factor for critical datasets to 4 or 5 so that a rack failure leaves more surviving replicas. Finally, implement monitoring that alerts on rack-level health (aggregating DataNode heartbeat status by rack) so that you detect a rack switch degradation before a full failure.

Next Chapter: Chapter 7 - Performance Optimization - Tuning and optimizing Hadoop for maximum performance.