Fault tolerance is at the heart of GFS’s design, and this chapter is arguably the most practically valuable for any engineer who will operate production distributed systems. Built for commodity hardware where failures are the norm, not the exception, GFS employs multiple layers of redundancy and recovery mechanisms. What makes GFS’s approach noteworthy is not any single fault tolerance technique — replication, checksumming, and heartbeats all existed before GFS — but the way these techniques are composed into a cohesive, self-healing system that requires minimal human intervention. Before GFS, most storage systems required operators to manually intervene when failures occurred. GFS demonstrated that a well-designed system could handle the vast majority of failure scenarios automatically, an insight that became the foundation for Site Reliability Engineering (SRE) practices at Google and the entire DevOps movement. This chapter explores how GFS handles failures at every level, from disk corruption to datacenter outages.
FAILURE STATISTICS (Google's 2003 Experience)─────────────────────────────────────────────Cluster: 1000 commodity machinesDAILY FAILURES:──────────────Component Frequency MTBF Impact─────────────────────────────────────────────────────────Disk failure 2-4/day ~1 year 1 chunk lostMachine crash 1-2/day ~3 years All chunks inaccessibleNetwork issue 1-3/day Varies Partial cluster partitionPower supply ~1/day ~2 years Machine downMemory error ~5/week Varies Corruption or crashMONTHLY FAILURES:────────────────Network switch ~1/month ~10 years Rack isolatedCooling problem ~1/month Varies Rack overheatingOperator error ~2/month N/A Configuration issuesYEARLY FAILURES:───────────────Rack power ~1/year N/A Entire rack downDatacenter event Rare N/A Regional outageIMPLICATIONS:────────────1. Component failures are CONTINUOUS • At least one failure per day • Multiple concurrent failures common • Must handle gracefully2. MTBF for cluster << individual component • Single machine: ~3 years • 1000 machines: ~1 day • "Failures are the norm, not the exception"3. Fast failure detection required • Heartbeat: 30-60 seconds • Multiple missed heartbeats: declare dead • False positives acceptable (re-replication cheap)4. Automatic recovery essential • No manual intervention for common failures • Self-healing system • Monitoring and alerting for trends
REPLICA PLACEMENT ALGORITHM──────────────────────────Goal: Maximize reliability and bandwidthWhen creating new chunk replica:────────────────────────────────1. SELECT FIRST REPLICA ────────────────── Factors considered: • Below-average disk utilization • Limit recent chunk creations (new chunks → writes soon) • Spread across racks Algorithm: ───────── candidates = filter_servers( disk_usage < average, recent_creations < threshold ) selected = pick_random(candidates)2. SELECT SECOND REPLICA ───────────────────── Must be on DIFFERENT RACK from first Why different rack? • Rack failure tolerance • Switch failure tolerance • Power/cooling failure tolerance ┌─────────────┐ ┌─────────────┐ │ Rack A │ │ Rack B │ │ Replica 1 │ │ Replica 2 │ └─────────────┘ └─────────────┘ Rack fails? Still have copy Algorithm: ───────── candidates = filter_servers( rack != rack_of_replica1, disk_usage < average ) selected = pick_random(candidates)3. SELECT THIRD REPLICA ──────────────────── Two strategies: Strategy A: Same rack as replica 1 ────────────────────────────────── ┌─────────────┐ ┌─────────────┐ │ Rack A │ │ Rack B │ │ Replica 1 │ │ Replica 2 │ │ Replica 3 │ │ │ └─────────────┘ └─────────────┘ Benefits: • 2 in Rack A: intra-rack bandwidth • Faster replica recovery • Cheaper network for writes Strategy B: Third rack ────────────────────── ┌──────┐ ┌──────┐ ┌──────┐ │Rack A│ │Rack B│ │Rack C│ │Rep 1 │ │Rep 2 │ │Rep 3 │ └──────┘ └──────┘ └──────┘ Benefits: • Maximum reliability • Tolerates 2 rack failures • Used for critical data GFS typically uses Strategy A: • 2 in one rack, 1 in another • Balance reliability and bandwidthEXAMPLE EXECUTION:─────────────────Creating chunk 0x1a2b:Step 1: Pick first replica──────────────────────────All servers:cs-1 (Rack A): 45% full, 2 recent creationscs-2 (Rack A): 78% full, 5 recent creations ✗ (high util)cs-3 (Rack B): 34% full, 1 recent creation ✓cs-4 (Rack B): 56% full, 0 recent creations ✓cs-5 (Rack C): 90% full, 1 recent creation ✗ (high util)Candidates: cs-3, cs-4Selected: cs-3 (random)Step 2: Pick second replica (different rack)────────────────────────────────────────────Must be != Rack BCandidates:cs-1 (Rack A): 45% full ✓cs-5 (Rack C): 90% full ✗Selected: cs-1Step 3: Pick third replica──────────────────────────Strategy A: Same as cs-1 (Rack A)Candidates in Rack A:cs-2 (Rack A): 78% full ✗cs-6 (Rack A): 42% full ✓Selected: cs-6Final placement:────────────────Chunk 0x1a2b:• Replica 1: cs-1 (Rack A)• Replica 2: cs-3 (Rack B)• Replica 3: cs-6 (Rack A)Properties:• Survives Rack B failure• Survives any single rack failure• 2 in Rack A for intra-rack bandwidth
Restoring Replica Count:
RE-REPLICATION WORKFLOW──────────────────────Trigger: Replica count falls below targetCAUSES:──────1. Chunkserver failure (most common)2. Disk failure3. Corruption detected4. Replica unreachable5. Replica version is staleDETECTION:─────────Master tracks replica count for each chunkHeartbeat from chunkservers:───────────────────────────Every 30-60 seconds:cs-5 → Master: "I'm alive, I have chunks: [0x1a2b, 0x2c3d, ...]"Master updates: chunk_replicas[0x1a2b] = [..., cs-5, ...]When chunkserver fails:──────────────────────cs-5 misses 3 heartbeats (90-180 seconds)Master marks cs-5 as dead: dead_servers.add(cs-5)Master scans all chunks: for chunk in all_chunks: chunk.replicas.remove(cs-5) if len(chunk.replicas) < REPLICATION_GOAL: schedule_re_replication(chunk)PRIORITIZATION:──────────────Not all chunks re-replicated immediatelyPrioritize by urgency:Priority Levels:───────────────P0 (CRITICAL): 0 replicas• Data loss imminent!• Re-replicate immediately• Rare (usually have 1+ replica)P1 (URGENT): 1 replica• One failure away from data loss• High priority• Re-replicate within minutesP2 (HIGH): 2 replicas (target is 3)• Below target but safe• Re-replicate within hours• Most common caseP3 (NORMAL): 3+ replicas• Above or at target• May reduce replicas if too manyAdditionally boost priority:──────────────────────────• Recently accessed (hot data)• Blocking client operations• Important system filesPriority queue:──────────────re_replication_queue = PriorityQueue()Chunk 0x1a2b: 1 replica, recently accessed→ Priority: P1 + hot data bonus = 1000Chunk 0x2c3d: 2 replicas, cold data→ Priority: P2 = 100Chunk 0x3e4f: 2 replicas, blocking write→ Priority: P2 + blocking bonus = 500Process queue in order: 0x1a2b, 0x3e4f, 0x2c3dEXECUTION:─────────For each chunk in priority queue:1. Select source chunkserver ────────────────────── • Must have valid replica • Prefer low load • Prefer near destination2. Select destination chunkserver ────────────────────────────── • Same criteria as new placement • Respect rack diversity • Balance load3. Issue clone command ────────────────── Master → destination: "Clone chunk 0x1a2b from cs-3" Destination → source: "Send me chunk 0x1a2b" Source → destination: [64MB of chunk data + checksums] Destination: • Writes to disk • Verifies checksums • Reports to master4. Update metadata ────────────── Master updates: chunk_replicas[0x1a2b].add(destination)5. Mark complete ──────────── Chunk now back to 3 replicas ✓RATE LIMITING:─────────────Master limits concurrent re-replications:Limits:──────• Max concurrent: 50-100 cluster-wide• Max per chunkserver: 1-2 as source• Max per chunkserver: 1-2 as destination• Throttle if cluster load highWhy limit?─────────• Avoid network saturation• Avoid overwhelming chunkservers• Background process, not urgent• Gradual restoration OKTypical rates:─────────────• P1 chunks: ~10 minutes to restore• P2 chunks: ~hours to restore• Full cluster re-balance: ~daysEXAMPLE SCENARIO:────────────────cs-12 fails (had 10,000 chunks)Master scans chunks on cs-12:────────────────────────────Chunk 0x1111: Now 1 replica → P1 (urgent)Chunk 0x2222: Now 2 replicas → P2 (normal)Chunk 0x3333: Now 2 replicas → P2...(10,000 chunks affected)Priority queue:──────────────P1: 100 chunks (1 replica remaining)P2: 9,900 chunks (2 replicas remaining)Re-replication timeline:───────────────────────T+0: Failure detectedT+2min: P1 chunks start re-replicatingT+15min: P1 chunks complete (100 chunks)T+30min: P2 chunks at 50% (5,000 done)T+2hr: P2 chunks at 90% (9,000 done)T+4hr: All chunks restored to 3 replicasDuring this time:────────────────• All data accessible (2+ replicas)• Read performance slightly degraded• Write performance normal• No data loss• Automatic recovery
Load Balancing:
CHUNK REBALANCING────────────────Goal: Even distribution of chunks across serversTRIGGERS:────────1. New chunkserver added • Has 0% disk usage • Should receive new chunks • Gradually fill to average2. Uneven distribution detected • Some servers > 80% full • Some servers < 40% full • Rebalance to average3. Performance imbalance • Some servers high load • Some servers idle • Migrate hot chunksREBALANCING ALGORITHM:─────────────────────Background process runs periodically:def rebalance_chunks(): # Calculate statistics avg_usage = calculate_average_disk_usage() avg_load = calculate_average_load() # Find imbalanced servers overloaded = find_servers( disk_usage > avg + THRESHOLD ) underloaded = find_servers( disk_usage < avg - THRESHOLD ) if not overloaded or not underloaded: return # Balanced # Select chunks to move for src in overloaded: for dst in underloaded: # Pick chunk to migrate chunk = select_chunk_to_move( src, dst, prefer_cold_chunks=True, respect_rack_diversity=True ) if chunk: migrate_chunk(chunk, src, dst) if servers_balanced(): returnEXAMPLE:───────Cluster state:─────────────cs-1: 90% full (overloaded)cs-2: 85% full (overloaded)cs-3: 40% full (underloaded)cs-4: 45% full (underloaded)cs-5: 35% full (underloaded)Average: 59% fullThreshold: ±20%Overloaded: cs-1, cs-2Underloaded: cs-3, cs-4, cs-5Migration plan:──────────────Move chunks from cs-1:• Chunk 0x1111 → cs-3• Chunk 0x2222 → cs-4• Chunk 0x3333 → cs-5Move chunks from cs-2:• Chunk 0x4444 → cs-3• Chunk 0x5555 → cs-5After migration:───────────────cs-1: 70% full (balanced)cs-2: 72% full (balanced)cs-3: 55% full (balanced)cs-4: 52% full (balanced)cs-5: 50% full (balanced)All within threshold ✓CONSTRAINTS:───────────When migrating, must maintain:1. Rack diversity • Don't move to same rack • Maintain cross-rack replicas2. Replication count • Don't reduce below target • Temporarily may have 4 replicas • Delete old after new confirmed3. Rate limits • Background process • Low priority • Don't impact foreground I/O • Typical: 1-5 chunks/minute4. Load considerations • Don't overload destination • Avoid moving hot chunks • Prefer cold chunks for migration
While GFS is self-contained for data, its Master Election is often coordinated by an external distributed locking service called Chubby. This architectural decision — delegating leader election to a separate, purpose-built coordination service — is a pattern that has been widely adopted in modern distributed systems. Just as GFS relied on Chubby, Hadoop HDFS relies on ZooKeeper, Kafka relies on ZooKeeper (or its newer KRaft protocol), and Kubernetes relies on etcd. The key insight is that leader election and distributed consensus are hard problems that benefit from being solved once in a dedicated, well-tested component rather than being reimplemented in every system that needs them.
Master Election: When the master starts, it attempts to acquire a specific lock in Chubby. Only one process can hold this lock at a time, becoming the “Primary Master”.
Monitoring: If the Primary Master crashes, its Chubby session expires and the lock is released.
Failover: A waiting “Shadow Master” or a new process detects the released lock, acquires it, and promotes itself to Primary Master.
Why this matters: It prevents Split-Brain scenarios where two masters think they are in charge, which would lead to catastrophic metadata corruption.
GFS chooses to verify data integrity only at the endpoints (Chunkservers) rather than during every hop in the network. This is an application of the “end-to-end argument” in systems design, a foundational principle articulated by Saltzer, Reed, and Clark in their 1984 paper. The argument states that reliability checks at intermediate points in a system do not eliminate the need for end-to-end checks, so adding intermediate checks primarily adds overhead without proportional benefit. GFS applies this principle pragmatically: verifying data at every network hop during replication would consume significant CPU and bandwidth, while end-to-end verification at the destination chunkserver catches the same errors at a fraction of the cost.
The Decision: GFS does not compare checksums between replicas during write operations.
The Reason: Network bandwidth is expensive. Comparing 64MB of data across 3 replicas every time a chunk is copied would consume massive aggregate bandwidth.
The Risk: If a chunk is corrupted during a copy from one server to another, the destination might store corrupted data.
The Mitigation: The destination chunkserver computes its own checksum after receiving the data. On the next read, the corruption will be detected, and the master will schedule a repair from a different, healthy replica.
Chunkservers don’t just wait for reads. They cycle through inactive chunks in the background to detect bit rot on aging disks. This “Scrubbing” ensures that a rarely-accessed archive doesn’t slowly decay into unreadability.
OPERATION LOG REPLICATION────────────────────────Every metadata mutation is logged and replicatedOperation Log Write:───────────────────Client → Master: create("/data/file1")Master process:──────────────1. Create log entry: ┌──────────────────────────────┐ │ Timestamp: 1234567890 │ │ Operation: CREATE_FILE │ │ Path: /data/file1 │ │ Owner: user_x │ │ Permissions: 0644 │ └──────────────────────────────┘2. Write to local disk: append_to_log(log_entry)3. Replicate to shadow masters: for shadow in shadow_masters: shadow.replicate_log(log_entry)4. Replicate to remote disks: for remote_disk in remote_disks: remote_disk.write_log(log_entry)5. Wait for ACKs from ALL: wait_for_acks([shadows, remote_disks])6. If all ACK: apply_to_memory(log_entry) return SUCCESS to client7. If any fail: retry or abortGuarantees:──────────• Client sees success → operation is durable• Replicated to multiple machines• Different failure domains• Can reconstruct from any copy• Shadow masters stay synchronizedReplication Locations:─────────────────────• Primary master local disk• Shadow master 1 (same datacenter)• Shadow master 2 (same datacenter)• Remote disk 1 (different datacenter)• Remote disk 2 (different datacenter)→ Can survive datacenter failure!
2
Shadow Masters
SHADOW MASTER OPERATION──────────────────────Purpose: Read-only replicas for failoverShadow master continuously:─────────────────────────1. Receive operation log entries (replicated from primary)2. Apply to local state: replay_operation(log_entry)3. Build same in-memory metadata: • Namespace • Chunk handles • File metadata4. Poll chunkservers: (just like primary, but read-only)5. Serve read requests: • File listings • Chunk location queries • Metadata lookups6. Cannot serve: • Writes (redirect to primary) • Lease grants • Chunk creation • MutationsLag Behind Primary:──────────────────Shadow typically lags by seconds:• Primary applies immediately• Replication: ~100ms• Shadow applies: ~1 secondAcceptable for reads:• Metadata queries slightly stale• Client cache may be stale anyway• Eventually consistentFailover Process:────────────────Primary master fails:T+0: Primary crashesT+10s: Monitoring detects (missed heartbeats)T+15s: External coordination service (e.g., Chubby) triggers failoverT+20s: Promote shadow to primary: 1. Stop accepting reads 2. Apply any pending log entries 3. Enable write mode 4. Start granting leases 5. Begin accepting writesT+30s: Update DNS/configuration Clients redirect to new primaryT+60s: System fully operationalDowntime: ~1 minuteData loss: None (log replicated)
3
Fast Recovery
MASTER RECOVERY PROCESS──────────────────────Master starts (after crash or restart)Phase 1: Load Checkpoint────────────────────────• Load latest checkpoint from disk• Compact B-tree format• Contains: - Full namespace - Chunk handles - Version numbers - File metadata• Size: <1 GB typically• Load time: 1-2 secondsAfter Phase 1:• Namespace in memory• File → chunk mappings known• But: chunk locations unknownPhase 2: Replay Operation Log─────────────────────────────• Read log entries since checkpoint• Apply each operation in order• Updates in-memory state• Typical: 100K-1M operations• Replay rate: 100K ops/second• Time: 1-10 secondsAfter Phase 2:• Namespace fully updated• All metadata current• Still: chunk locations unknownPhase 3: Chunk Location Discovery─────────────────────────────────Master → all chunkservers: "Tell me what chunks you have"Each chunkserver → Master: "I have: [chunk_handle, version] list"Example response from cs-5:──────────────────────────[ (0x1a2b, version=3), (0x2c3d, version=5), (0x3e4f, version=2), ... (10,000 chunks total)]Master builds location map:──────────────────────────chunk_locations = {}for chunk, version in chunkserver_report: if version >= expected_version[chunk]: chunk_locations[chunk].add(chunkserver) else: # Stale replica, mark for deletion mark_garbage(chunkserver, chunk)Poll time:• 1000 chunkservers• Each reports 10K chunks• Network transfer: 10-30 seconds• Processing: few secondsAfter Phase 3:• Master knows all chunk locations• Can serve requests• Stale replicas identifiedPhase 4: Resume Operations──────────────────────────Master now ready:• Accepts client requests• Grants leases• Creates chunks• Fully operationalTotal recovery time:───────────────────Phase 1: 1-2 secondsPhase 2: 1-10 secondsPhase 3: 10-30 secondsPhase 4: ReadyTotal: 15-45 seconds typicallyMaximum: ~60 secondsDuring recovery:───────────────• Shadow masters serve reads• Writes queued or failed• Clients retry automatically• Minimal user impact
CHECKPOINT MECHANISM───────────────────Purpose: Faster recovery than replaying full logCheckpoint Creation:───────────────────Background thread (while master operational):1. Master switches to new log file: current_log = new_log_file()2. Background thread snapshots state: snapshot = serialize_namespace() • All files and directories • Chunk handles • Version numbers • Metadata3. Write compact B-tree to disk: write_checkpoint(snapshot)4. Doesn't block ongoing operations! Master continues serving requests5. Complete: • Old log file can be deleted • Checkpoint is new baselineCheckpoint Frequency:────────────────────Trigger conditions:• Log file reaches size limit (e.g., 64MB)• Time since last checkpoint (e.g., 1 hour)• Master startup (after crash)Typical: Every 1-2 hours in steady stateCheckpoint Format:─────────────────Compact B-tree (on disk):• Efficient prefix compression• Fast random access• Small size (<1 GB for 100M chunks)• Quick to load (1-2 seconds)Recovery with Checkpoints:──────────────────────────Without checkpoint:• Must replay entire history• Millions of operations• Minutes to hoursWith checkpoint:• Load checkpoint (seconds)• Replay since checkpoint (seconds)• Total: <1 minuteExample Timeline:────────────────T0: Master creates checkpoint_AT0-T1: Operations logged to log_BT1: Master creates checkpoint_B (log_B incorporated)T1-T2: Operations logged to log_CT2: Master crashesRecovery:─────────• Load checkpoint_B (latest)• Replay log_C (since checkpoint_B)• Fast recovery!Without checkpoints:───────────────────• Replay all logs since system start• Could be days/weeks of operations• Impractical
CHUNKSERVER FAILURE HANDLING───────────────────────────Scenario: cs-12 crashesDetection:─────────T+0: cs-12 crashesT+30s: Master expects heartbeat, none arrivesT+60s: Master expects heartbeat, none arrivesT+90s: Master declares cs-12 dead (3 missed heartbeats)Master Action:─────────────1. Mark cs-12 as dead: dead_servers.add(cs-12)2. Scan all chunks: for chunk in all_chunks: if cs-12 in chunk.replicas: chunk.replicas.remove(cs-12) if len(chunk.replicas) < 3: schedule_re_replication(chunk)3. Identify affected chunks: • 10,000 chunks on cs-12 • All now under-replicated4. Prioritize re-replication: • 1 replica: URGENT • 2 replicas: NORMAL • Already covered above5. Execute re-replication: (covered in re-replication section)Client Impact:─────────────Reads from cs-12:• Fail immediately (connection error)• Client retries with different replica• Transparent to application• Slight latency spikeWrites to cs-12:• Primary (cs-12) is dead• Lease expires (60 seconds max)• Master grants lease to different replica• Client retries, succeedsRecovery if cs-12 Comes Back:─────────────────────────────Scenario: cs-12 was network partition, not deadT+300s: cs-12 reconnectscs-12 → Master: Heartbeat with chunk listMaster checks:─────────────For each chunk cs-12 reports:1. Is version stale? → Mark for garbage collection2. Is chunk already re-replicated? → Now have 4 replicas → Delete one (rebalancing)3. Is chunk still under-replicated? → Count cs-12's copy → May cancel pending re-replicationFalse Positive Handling:───────────────────────Network partition caused false positive:• cs-12 was alive, just unreachable• Master re-replicated chunks• Now have extra copies• Rebalancing cleans upCost of false positive:• Extra network bandwidth• Extra disk space temporarily• Acceptable (safety first)
Disk Failure
Disk Corruption or Failure:
DISK FAILURE HANDLING────────────────────Scenario: Disk fails on cs-5Detection:─────────Method 1: Read Error───────────────────Client reads chunk 0x1a2b from cs-5cs-5 attempts read from disk→ I/O error from diskcs-5 → Master: "Chunk 0x1a2b is unreadable (I/O error)"cs-5 → Client: "ERROR: Cannot read chunk"Method 2: Checksum Mismatch──────────────────────────cs-5 reads block from diskComputes checksumCompares with stored checksum→ MISMATCH (silent corruption)cs-5 → Master: "Chunk 0x1a2b is corrupted"cs-5 → Client: "ERROR: Corruption detected"Method 3: Background Scrubbing──────────────────────────────cs-5 background task verifies chunksDetects corruption before client readscs-5 → Master: "Chunk 0x1a2b is corrupted"Master Response:───────────────1. Mark cs-5's replica as corrupted: chunk_replicas[0x1a2b].mark_corrupted(cs-5)2. Schedule re-replication: re_replication_queue.add( chunk=0x1a2b, priority=HIGH )3. Select source (good replica): source = pick_random([cs-12, cs-23]) (Other replicas of 0x1a2b)4. Select destination: destination = pick_server( low_load=True, different_rack=True )5. Execute re-replication: (standard process)6. After new replica confirmed: Master → cs-5: "Delete chunk 0x1a2b"Client Handling:───────────────Client receives error from cs-5Client retries with different replica (cs-12)cs-12 serves data successfullyTransparent to applicationMultiple Disk Failures:──────────────────────Scenario: Disk fails on cs-5 (has 1000 chunks)cs-5 → Master: "Chunks [0x1111, 0x2222, ..., 1000 total] failed"Master:• Marks all 1000 as corrupted on cs-5• Schedules 1000 re-replications• High priority (data at risk)• Executes over hoursEntire Chunkserver Disk Fails:─────────────────────────────If all chunks on cs-5 fail:• Equivalent to chunkserver failure• All chunks re-replicated• cs-5 may be taken out of service• Human intervention to replace disk
Network Partition
Network Isolation:
NETWORK PARTITION HANDLING─────────────────────────Scenario: Network switch fails, isolating Rack APartition:─────────┌─────────────────────┐│ Master + Most ││ of cluster ││ (Rack B, C, D) │└─────────────────────┘ ✗ ✗ Network partition ✗┌─────────────────────┐│ Rack A ││ (Isolated) ││ cs-1, cs-2, cs-3 │└─────────────────────┘Master Perspective:──────────────────Rack A chunkservers miss heartbeatsMaster declares them deadSchedules re-replication for all chunksRack A Perspective:──────────────────Cannot reach masterLeases expire (60 seconds)Cannot serve writes (no primary)Can still serve reads (from cache)When Partition Heals:────────────────────T+0: Partition beginsT+90s: Master declares Rack A deadT+120s: Master begins re-replicationT+300s: Partition heals Rack A reconnectsRack A → Master: Heartbeats with chunk listsMaster discovers:────────────────1. Rack A servers were alive (false positive)2. Some chunks already re-replicated (4 replicas now)3. Some chunks in progress4. Some chunks not yet startedMaster reconciles:─────────────────• Cancel pending re-replications• Keep extra replicas temporarily• Rebalancing removes extras later• Update chunk location maps• Resume normal operationsSplit-Brain Prevention:──────────────────────Can Rack A serve writes during partition?NO! Here's why:──────────────1. Leases expire after 60 seconds2. Cannot renew lease (no master contact)3. Cannot grant new leases4. Cannot become primary5. Writes failPrimary authority requires master contact→ No split-brain possibleClients:• Writes fail (lease expired)• Retry → master redirects to working replicas• Reads succeed (from Rack A or other racks)Cost of Partition:─────────────────• Write unavailable for affected chunks (during partition)• Unnecessary re-replication (bandwidth, storage waste)• Extra cleanup after healing• Acceptable (safety first)
Master Failure
Most Critical Failure:
MASTER FAILURE HANDLING──────────────────────Scenario: Primary master crashesImpact:──────• Cannot create files• Cannot delete files• Cannot grant leases• Cannot get chunk locations (if not cached)• WRITES BLOCKED• Reads continue (if metadata cached)Detection:─────────T+0: Master crashesT+10s: Monitoring service detects (heartbeat failure)Failover Process:────────────────T+15s: External coordination service (e.g., Chubby lock service) triggers failoverT+20s: Promote shadow master:1. Stop accepting read requests2. Apply any pending log entries3. Flush in-memory state4. Switch to write mode: • Enable lease grants • Enable chunk creation • Enable writesT+30s: Update DNS entry: gfs-master.google.com → new_master_ipT+45s: Clients detect change: • Cached master IP fails • DNS lookup gets new IP • Retry operationsT+60s: System fully operationalDuring Failover (T+0 to T+60s):───────────────────────────────Reads:• Shadow masters serve reads• Slightly stale (seconds old)• Most clients have cached metadata anywayWrites:• Fail (no primary master)• Clients retry• Eventually succeed when new master readyNew File Operations:• Fail during failover• Succeed after new master readyData Loss:─────────Operation log replicated → NO DATA LOSS• All mutations in operation log• Replicated before client sees success• New master has full log• Replays and continuesRecovery Details:────────────────New master startup:(See "Fast Recovery" section)1. Load checkpoint2. Replay log3. Poll chunkservers4. Resume operationsTotal: ~60 secondsIf Shadow Also Fails:────────────────────Both primary and shadow crash:• Start new master from scratch• Load latest checkpoint• Replay full operation log• Poll all chunkservers• Takes longer (~few minutes)• But still recoverable• No data lossDisaster: All Masters Gone──────────────────────────Extremely rare (datacenter failure):• Operation logs in multiple datacenters• Load from remote datacenter• Start master in different location• Recovery takes longer (10-30 minutes)• Still no data loss (logs persistent)Client Behavior During Failover:────────────────────────────────Built-in retry logic:def gfs_operation_with_retry(op): max_retries = 10 for i in range(max_retries): try: return op() except MasterUnreachable: sleep(exponential_backoff(i)) continue # Retry raise Exception("Master unavailable")Transparent to most applicationsBrief unavailability acceptable
Master polls all chunkservers: “What chunks do you have?”
Each chunkserver reports:
Chunk handles
Version numbers
Master builds in-memory location map
Identifies stale replicas (version mismatch)
Marks stale replicas for deletion
Phase 4: Resume Operations (immediate):
Master now has:
Complete namespace
All chunk locations
Version information
Begins accepting requests:
Grants leases
Serves metadata queries
Creates chunks
Fully operational
Why Fast?:
Checkpoint eliminates long log replay
All metadata in memory (no disk I/O for operations)
Chunk locations discovered in parallel
Simple, single-master design
Data Loss Prevention:
Operation log replicated before client sees success
Multiple copies (shadows, remote disks)
Different failure domains/datacenters
If master fails, log intact
Replay gives exact state
During Recovery:
Shadow masters serve read requests
Writes blocked temporarily
Clients retry automatically
Minimal user impact
Total downtime: 30-60 seconds typical, acceptable for Google’s workload.
System Design: Design a more fault-tolerant master for GFS
Expected Answer:Several approaches to improve GFS master fault tolerance beyond shadow masters:Approach 1: Multi-Master with Consensus (like Colossus):
Multiple active masters using Paxos/Raft
Shared replicated state machine
Any master can handle requests
Benefits:
No failover delay (masters always available)
Higher read throughput (load balanced)
Better fault tolerance (majority quorum)
Challenges:
Complex consensus protocol
Higher write latency (consensus overhead)
More difficult to implement/debug
Approach 2: Metadata Sharding:
Partition namespace across multiple masters
Each master handles subset of files
Example: Hash(filename) % num_masters
Benefits:
Scales metadata capacity
Scales throughput
Failure affects subset
Challenges:
Cross-shard operations complex
Rebalancing difficult
Client routing logic
Approach 3: Hierarchical Masters:
Root master for namespace
Leaf masters for chunk management
Separate concerns
Benefits:
Scales chunk operations
Root master simpler (less load)
Challenges:
Two-level coordination
Partial failures complex
Approach 4: Active-Active Masters:
Multiple masters with optimistic concurrency
Eventually consistent
Conflict resolution protocol
Benefits:
No failover needed
Higher availability
Challenges:
Consistency corner cases
Conflict resolution complexity
Recommendation for GFS Workload:
Use Approach 1 (Multi-Master with Consensus):
Colossus (GFS successor) uses this
Paxos provides strong consistency
Multiple masters for availability
Acceptable latency increase (metadata ops less frequent)
Worth complexity for zero-downtime failover
Implementation Details:
5-7 master servers (quorum=3-4)
Consensus on operation log
Replicated state machine
Any master can serve requests
Automatic leader election
Client tries multiple masters
Trade-offs:
Complexity: Higher (consensus protocol)
Performance: Slightly lower write latency
Availability: Much higher (no failover delay)
Consistency: Stronger (linearizable)
For GFS 2003, single master with shadows was correct choice (simplicity). For modern systems, multi-master with consensus is standard (Spanner, CockroachDB, etcd).
GFS re-replicates chunks when a replica is lost. Walk me through how the master prioritizes which chunks to re-replicate first.
Strong Answer:When the master detects a chunkserver failure (via missed heartbeats), it identifies all chunks that were stored on that server and adds them to a re-replication queue. The queue is priority-sorted by three factors.First, current replication count. A chunk with only one surviving replica is far more urgent than a chunk with two. One more failure would cause permanent data loss for a single-replica chunk, so these are replicated first.Second, whether the chunk is blocking client operations. If a client is actively writing to a chunk that just lost a replica, that chunk gets priority to restore full replication and unblock the write pipeline.Third, recency of access. Hot chunks (recently read or written) are prioritized over cold chunks because losing them has higher operational impact.The master also throttles re-replication to avoid overwhelming the cluster. If a rack with 100 chunkservers fails, thousands of chunks need re-replication simultaneously. Without throttling, the re-replication traffic would saturate the network and interfere with normal client operations. GFS limits the number of concurrent re-replications per chunkserver (both as source and destination) and prioritizes cross-rack cloning to maintain rack diversity.Follow-up: What happens if re-replication cannot keep up and a second failure occurs before the first is fully repaired?This is the nightmare scenario — cascading failures. If a chunk drops to zero replicas before re-replication completes, the data is permanently lost. GFS mitigates this by setting higher replication factors for critical data (5 replicas instead of 3) and by using “chunk creation throttling” — limiting how many new chunks are placed on recently-added servers to prevent a single new server from becoming a single point of failure. In practice, Google monitored the “under-replicated chunk count” as a critical SLI and would alert if it exceeded thresholds that indicated re-replication was falling behind.
How does GFS handle silent data corruption -- bit rot where the disk returns wrong data without reporting an error?
Strong Answer:GFS defends against silent corruption at two levels. First, every 64KB block within a chunk has a CRC32 checksum that is verified on every read. If the computed checksum does not match the stored checksum, the chunkserver reports the corruption to the master, returns an error to the client, and the client reads from a different replica. The master then schedules re-replication from a healthy replica and instructs the corrupted chunkserver to delete the bad chunk.Second, background scrubbing. Chunkservers periodically scan all stored chunks and verify their checksums even when no reads are happening. This catches corruption in cold data that might not be read for weeks or months. Without scrubbing, corruption could go undetected until all healthy replicas have also been corrupted by other failures, at which point the data is unrecoverably lost.The combination of read-time verification and background scrubbing provides defense in depth. Read-time verification catches corruption immediately when it affects active data. Background scrubbing catches corruption in the long tail of cold data.Follow-up: Can CRC32 miss a corruption? Under what circumstances?Yes. CRC32 has a collision probability of roughly 1 in 4 billion for random errors, which is adequate for detecting single-bit flips and burst errors typical of disk hardware failures. However, CRC32 is not designed to detect adversarial modifications. A sophisticated adversary could craft data that has the same CRC32 as the original. For protecting against malicious tampering, you would need a cryptographic hash (SHA-256) or a MAC (HMAC). GFS did not need this because the threat model was hardware failures, not attackers. Modern systems that need tamper detection (like blockchain storage or compliance-critical archives) use stronger checksums at the cost of higher CPU overhead.
The GFS master is a single point of failure. How does GFS handle master crashes, and what is the recovery time?
Strong Answer:GFS master recovery has three phases. Phase one: load the latest checkpoint, which is a compact B-tree representation of the full namespace state. This takes seconds because the checkpoint is designed for fast memory-mapping. Phase two: replay the operation log since the checkpoint. The log contains all metadata mutations (file creates, deletes, chunk allocations) and is replayed sequentially. With a typical rate of 100K operations per second, replaying a million operations takes about 10 seconds. Phase three: poll all chunkservers for their chunk inventories. Since chunk locations are not persisted (they are reconstructed from chunkserver reports), the master must contact every chunkserver and rebuild its location map. This is the slowest phase and can take minutes in a large cluster.Total recovery time: typically 30 seconds to a few minutes depending on cluster size. During this time, no metadata operations can be served, which means no new file opens, no new writes (though existing writes with valid leases can continue until the lease expires).GFS also has shadow masters — read-only replicas of the operation log that can serve stale read requests during master downtime. They lag slightly behind the primary master but provide availability for read-heavy workloads.Follow-up: Modern HDFS solved this with active/standby NameNode HA. Why did GFS not do the same initially?In 2003, the engineering cost of implementing a hot standby with automatic failover was high, and Google recovery time of under a minute was acceptable for their batch workloads. MapReduce jobs could simply retry failed metadata operations. HDFS added HA in version 2.0 (around 2012) because the Hadoop ecosystem had evolved to include interactive query engines (Hive, Impala) and real-time systems (HBase) that could not tolerate minutes of metadata unavailability. The progression from single master to active/standby to distributed metadata (Colossus) is a natural evolution driven by changing workload requirements.