Chunkservers are the workhorses of GFS, storing actual data and serving client requests. While the master gets most of the attention in system design discussions, the chunkserver design contains equally important lessons. The decision to store chunks as plain Linux files (rather than using a custom block device or raw disk access) dramatically simplified implementation and debugging — an operator could use standard Linux tools like ls, df, and strace to inspect the storage layer. The data flow optimizations, particularly the pipelined chain replication and network-topology-aware routing, are textbook examples of how to squeeze maximum throughput out of commodity hardware. These same techniques appear in modern systems: chain replication is used in HDFS and Azure Storage, and topology-aware data placement is standard in every major cloud provider’s infrastructure. This chapter explores how chunks are stored, how data flows through the system, and how GFS ensures data integrity despite commodity hardware failures.
Chapter Goals:
Understand chunk storage implementation
Master the replication pipeline and data flow optimization
Learn detailed read, write, and record append flows
Explore checksumming and data integrity mechanisms
Each chunkserver is a simple Linux process storing chunks as regular files. This simplicity was intentional and strategic. By leveraging the host operating system’s file system (ext3/ext4) rather than managing raw disks directly, GFS avoided reinventing buffer management, journaling, and disk scheduling. The trade-off was some performance overhead from the extra file system layer, but the engineering velocity gained from simpler code, easier debugging, and leveraging years of Linux kernel optimization more than compensated. This is a practical lesson worth remembering: sometimes the best engineering decision is to not build something.
CHECKSUM STRUCTURE─────────────────Each chunk divided into 64KB blocks:┌─────────────────────────────────────────┐│ Chunk (64MB max) │├─────────────────────────────────────────┤│ Block 0 (64KB) → Checksum 0 (4 bytes) ││ Block 1 (64KB) → Checksum 1 (4 bytes) ││ Block 2 (64KB) → Checksum 2 (4 bytes) ││ ... ││ Block 1023 (64KB) → Checksum 1023 │└─────────────────────────────────────────┘Checksum = 32-bit CRC32For full 64MB chunk:• 1024 blocks• 1024 checksums• 4KB checksum metadataStored: In-memory + on diskCHECKSUM COMPUTATION:────────────────────On Write:─────────def write_block(block_index, data): checksum = crc32(data) write_data_to_disk(block_index, data) update_checksum(block_index, checksum) persist_checksum()On Read:────────def read_block(block_index): data = read_data_from_disk(block_index) expected_checksum = get_checksum(block_index) actual_checksum = crc32(data) if actual_checksum != expected_checksum: report_corruption() read_from_different_replica() else: return dataWHY 64KB BLOCKS?───────────────Trade-off analysis:Smaller blocks (e.g., 4KB):✓ Detect corruption precisely✓ Re-read smaller amount✗ More checksums (higher overhead)✗ More CPU for checksum computationLarger blocks (e.g., 1MB):✓ Fewer checksums✓ Less CPU overhead✗ Must re-read larger amount on corruption✗ Less precise error detection64KB: Sweet spot• 0.006% overhead (4KB per 64MB)• Fast CRC32 computation• Reasonable re-read size• Matches common I/O sizes
Detecting Stale Replicas:
VERSION NUMBER SYSTEM────────────────────Each chunk has version numberPurpose: Detect stale replicasLifecycle:──────────1. Chunk Created: version = 12. Master Grants Lease: version++ (now version = 2) All current replicas updated to 23. Replica Was Down: Server was offline during lease grant Its version still = 1 (STALE!)4. Detection: Server comes back online Heartbeat: "I have chunk X, version 1" Master: "Current version is 2, yours is stale" → Mark for garbage collectionEXAMPLE SCENARIO:────────────────Chunk 0x1a2b, initially version 1Replicas: [cs-5, cs-12, cs-23]Time T0: All have version 1────────────────────────────cs-5: version 1 ✓cs-12: version 1 ✓cs-23: version 1 ✓Time T1: cs-23 goes offline───────────────────────────cs-23 crashes, network partition, etc.Time T2: Client writes to chunk───────────────────────────────Master grants lease → version++New version: 2Master updates:cs-5: version 2 ✓cs-12: version 2 ✓cs-23: offline (still has version 1)Time T3: cs-23 comes back online────────────────────────────────cs-23 heartbeat: "I have chunk 0x1a2b, version 1"Master checks:• Current version: 2• cs-23 version: 1• STALE REPLICA!Master → cs-23: "Delete chunk 0x1a2b (stale)"Master schedules re-replication:• Copy from cs-5 or cs-12 (version 2)• To new chunkserver• Maintain 3 replicasPERSISTENCE:───────────Version stored:• In master's operation log (persistent)• In chunkserver's metadata (persistent)• In master's memory (volatile)After master restart:• Loads version from operation log• Chunkservers report their versions• Master validates and cleans up stale replicas
Globally Unique Identifier:
CHUNK HANDLE───────────64-bit globally unique identifierGeneration: Master assigns at chunk creationProperties:──────────• Unique across entire GFS cluster• Never reused• Identifies chunk forever• Used as filename on chunkserversFormat:───────0x1a2b3c4d5e6f7g8h│ ││ └─ Random/sequential component└──── Timestamp component (optional)Mapping:────────Master maintains:File → List of chunk handles/data/file1 → [0x1a2b..., 0x2c3d..., 0x3e4f...]Each chunk handle → Metadata0x1a2b... → { version: 5, replicas: [cs-5, cs-12, cs-23], primary: cs-5, lease_expiration: T+60}Chunkserver stores:Chunk handle → Chunk file0x1a2b... → /gfs/chunks/0x1a2b3c4d5e6f7g8hBenefits:────────✓ Simple mapping✓ No path dependencies✓ Easy to move chunks between servers✓ No name conflicts✓ Efficient lookup
GFS decouples data flow from control flow and optimizes for network topology. This section describes what is arguably GFS’s most clever performance optimization. The pipeline push mechanism achieves near-theoretical-maximum bandwidth utilization by exploiting full-duplex network links: each chunkserver in the chain simultaneously receives data from its predecessor and forwards to its successor. The key mathematical insight is that total transfer time for N replicas approaches the time for a single transfer (plus small propagation delays), rather than scaling linearly with replica count.
The “Record Append” is GFS’s most unique operation and arguably the single feature that made MapReduce practical at scale. It guarantees that data is appended atomically at least once. Without record append, thousands of MapReduce map tasks writing to shared intermediate files would need distributed locking or external coordination — a prohibitive overhead. Record append solved this by letting the primary chunkserver choose the offset, eliminating the need for clients to coordinate. The “at least once” semantics (with possible duplicates) were acceptable because MapReduce was already designed to be idempotent. This co-design between GFS and its primary consumer is a masterclass in API design for distributed systems.
Beyond checking data during reads, chunkservers perform background “scrubbing” to detect silent data corruption (bit rot) in rarely accessed chunks. This is critically important because disk drives can silently corrupt data without reporting errors — a phenomenon known as “bit rot” or “silent data corruption.” Studies by CERN and others have shown that undetected bit errors occur at rates of roughly 1 in 10^14 to 10^15 bits, which means a petabyte-scale cluster will encounter silent corruption regularly. Without proactive scrubbing, corruption in cold data could go undetected for months or years, until the data is finally read and discovered to be unrecoverable.
COMPLETE WRITE OPERATION────────────────────────CLIENT WRITE REQUEST:────────────────────gfs.write("/data/file1", offset=50MB, data=1MB)PHASE 1: LEASE ACQUISITION──────────────────────────Client → Master: "Need to write to /data/file1, chunk 0"Master checks: 1. Chunk 0 exists? Yes 2. Has current primary? No 3. Select primary: cs-5 (lowest load) 4. Grant lease to cs-5 (60 seconds) 5. Increment version: 4 → 5Master → cs-5, cs-12, cs-23: "Increment version to 5"All chunkservers ACKMaster → Client: { chunk_handle: 0x1a2b..., version: 5, primary: cs-5, secondaries: [cs-12, cs-23], lease_expiration: T+60 }PHASE 2: DATA PUSH─────────────────Client forms chain: cs-5 → cs-12 → cs-23(based on network topology)Client → cs-5: { type: DATA_PUSH, chunk_handle: 0x1a2b..., data_id: uuid_123, // Unique ID for this data data: [1MB payload] }cs-5: 1. Buffer data in memory (don't write to disk yet!) 2. Forward to cs-12 while receiving from client → cs-12: DATA_PUSH with same datacs-12: 1. Buffer data in memory 2. Forward to cs-23 → cs-23: DATA_PUSH with same datacs-23: 1. Buffer data in memory 2. Send ACK to cs-12cs-12 → ACK to cs-5cs-5 → ACK to clientClient now knows all replicas have buffered the dataPHASE 3: WRITE COMMAND─────────────────────Client → Primary (cs-5): { type: WRITE_COMMAND, chunk_handle: 0x1a2b..., data_id: uuid_123, // Reference to buffered data offset: 50MB, length: 1MB }PRIMARY (cs-5) PROCESSING:─────────────────────────1. Assign serial number: (If multiple concurrent writes, serialize them) serial_number = next_serial++ // e.g., 422. Apply write to local disk: fd = open("/gfs/chunks/0x1a2b...") lseek(fd, 50MB) write(fd, buffered_data[uuid_123])3. Update checksums: start_block = 50MB / 64KB end_block = 51MB / 64KB for block in [start_block..end_block]: checksum[block] = crc32(data[block])4. Forward write command to secondaries: cs-5 → cs-12, cs-23: { type: APPLY_WRITE, chunk_handle: 0x1a2b..., data_id: uuid_123, offset: 50MB, length: 1MB, serial_number: 42 }SECONDARIES (cs-12, cs-23):──────────────────────────1. Apply in serial number order: (Wait if earlier serial numbers not yet applied)2. Write to disk: Same as primary3. Update checksums: Same as primary4. Send ACK to primary: cs-12 → cs-5: ACK(serial_number=42) cs-23 → cs-5: ACK(serial_number=42)PRIMARY RECEIVES ACKS:─────────────────────If ALL secondaries ACK: cs-5 → Client: SUCCESSIf ANY secondary fails: cs-5 → Client: ERROR (Client retries entire operation)Client writes are atomic: all replicas or noneFAILURE SCENARIOS:─────────────────Scenario 1: cs-23 fails during data push────────────────────────────────────────• Client doesn't receive ACK from all replicas• Client retries entire write• Master may detect cs-23 failure• Master re-replicates chunk elsewhereScenario 2: cs-12 fails during apply────────────────────────────────────• Primary doesn't receive ACK from cs-12• Primary returns ERROR to client• Client retries• Chunk now inconsistent: - cs-5: has write - cs-12: may or may not have write - cs-23: has write• But: All replicas have same version (5)• Next write will succeed and create defined region• GFS consistency model allows this (relaxed)Scenario 3: Primary fails after apply─────────────────────────────────────• Primary wrote locally and sent to secondaries• Crashed before receiving ACKs• Client sees timeout• Client retries• Master's lease expires (60 sec max wait)• Master grants lease to different replica• New primary has the data (it was applied)• Client retry may create duplicate (application handles)
SUCCESSFUL RECORD APPEND───────────────────────CLIENT REQUEST:──────────────offset = gfs.record_append("/data/file1", data=100KB)CLIENT → MASTER:───────────────"Need to append to /data/file1"Master returns last chunk (let's say chunk 5):{ chunk_handle: 0x1a2b..., version: 3, primary: cs-5, secondaries: [cs-12, cs-23], current_size: 60MB // Approximate}DATA PUSH PHASE:───────────────(Same as write - push to all replicas)Client → cs-5 → cs-12 → cs-23Data buffered in memory on allAPPEND COMMAND:──────────────Client → Primary (cs-5):{ type: RECORD_APPEND, chunk_handle: 0x1a2b..., data_id: uuid_456, length: 100KB}PRIMARY PROCESSING:──────────────────1. Check current offset: current_offset = get_chunk_size(0x1a2b...) current_offset = 60MB2. Check if record fits: new_offset = 60MB + 100KB if new_offset <= 64MB: # Fits! Proceed else: # Doesn't fit, pad and use next chunk3. Assign offset: append_offset = 60MB4. Assign serial number: serial = next_serial++5. Apply locally: fd = open("/gfs/chunks/0x1a2b...") lseek(fd, 60MB) write(fd, buffered_data[uuid_456])6. Update checksums7. Forward to secondaries: cs-5 → cs-12, cs-23: { type: APPLY_APPEND, data_id: uuid_456, offset: 60MB, // SAME offset for all! length: 100KB, serial: serial }SECONDARIES APPLY:─────────────────cs-12: lseek(60MB) write(data) update_checksums() ACK to primarycs-23: SamePRIMARY → CLIENT:────────────────{ status: SUCCESS, offset: 60MB}Client now knows record is at offset 60MBAll replicas identical!
APPEND AT CHUNK BOUNDARY───────────────────────Current chunk state:• Chunk 5: 63.9MB used (almost full)• Want to append: 200KBCheck: 63.9MB + 200KB = 64.1MB > 64MB→ Doesn't fit in current chunk!PRIMARY ACTION:──────────────1. Pad current chunk to 64MB: padding = 64MB - 63.9MB = 100KB write(padding_bytes) // All zeros or random2. All secondaries do same: cs-5 → cs-12, cs-23: { type: PAD_TO_END, chunk: 0x1a2b..., padding: 100KB }3. Tell client to retry with next chunk: Primary → Client: { status: RETRY_NEXT_CHUNK, next_chunk: 6 }CLIENT RETRIES:──────────────Client → Master: "Need chunk 6 of /data/file1"Master: • Creates chunk 6 if doesn't exist • Assigns chunk handle, selects replicas • Returns metadataClient retries append with chunk 6→ Succeeds at offset 0 of new chunkRESULT:──────Chunk 5: 63.9MB data + 100KB padding = 64MB (full)Chunk 6: 200KB record at offset 0Application sees:• Record at chunk 6, offset 0• Knows to skip padding when reading• Uses record markers/checksums to identify data
APPEND WITH FAILURES───────────────────Scenario: Secondary fails during appendCLIENT REQUEST:──────────────gfs.record_append("/data/file1", data=100KB)DATA PUSH:─────────Client → cs-5 → cs-12 → cs-23cs-5: Buffered ✓cs-12: Buffered ✓cs-23: CRASH (network partition, disk failure, etc.)Client receives ACK from cs-5, cs-12 onlyAPPEND COMMAND:──────────────Client → Primary (cs-5): RECORD_APPEND(data_id, 100KB)PRIMARY APPLIES:───────────────cs-5: • Assigns offset: 60MB • Writes locally at 60MB ✓cs-5 → cs-12: APPLY_APPEND(offset=60MB)cs-12: • Writes at 60MB ✓ • ACKs to primary ✓cs-5 → cs-23: APPLY_APPEND(offset=60MB)cs-23: • TIMEOUT (offline!)PRIMARY RECEIVES:────────────────ACKs: cs-12 ✓, cs-23 ✗NOT all replicas ACKed!Primary → Client: ERROR: Append failedCURRENT STATE:─────────────cs-5: Has record at 60MB ✓cs-12: Has record at 60MB ✓cs-23: Doesn't have record ✗Chunk is now INCONSISTENT!CLIENT RETRIES:──────────────Client retries entire operation:1. Re-push data to all replicas (cs-23 may be back, or replaced)2. Primary assigns NEW offset: Now at 60.1MB (after previous record)3. Applies to all replicas4. SUCCESSRESULT:──────Chunk contents:┌──────────────────────────────┐│ ... ││ 60.0MB: Record data (100KB) │ ← First attempt (on cs-5, cs-12)│ 60.1MB: Record data (100KB) │ ← Retry (on all replicas)│ ... │└──────────────────────────────┘• DUPLICATE at 60.0MB (only on cs-5, cs-12)• Successful record at 60.1MB (on all)• Client receives offset 60.1MB• Application will see duplicate when readingAPPLICATION HANDLING:────────────────────Reader scans file:for record in read_file("/data/file1"): if not valid_checksum(record): skip // Inconsistent region (60.0MB on cs-23) if seen_id(record.id): skip // Duplicate (60.0MB duplicate) process(record) // Process 60.1MB recordApplication de-duplicates based on unique IDsApplication skips inconsistent regionsGFS guarantees:• At least once delivery• May have duplicates• May have inconsistent regions• Application handles this
MULTIPLE CONCURRENT APPENDS──────────────────────────Scenario: 3 clients append simultaneouslyCLIENT A: append(data_A, 50KB)CLIENT B: append(data_B, 75KB)CLIENT C: append(data_C, 100KB)All three:1. Push data to replicas2. Send append command to primaryPRIMARY SERIALIZES:──────────────────Primary receives three append requests:Request queue:1. Append(data_A, 50KB) - arrived first2. Append(data_B, 75KB) - arrived second3. Append(data_C, 100KB) - arrived thirdPrimary processes IN ORDER:Step 1: Process data_A──────────────────────current_offset = 60MBassign offset: 60MBwrite data_A at 60MBforward to secondaries: "write at 60MB"new_offset = 60MB + 50KB = 60.05MBStep 2: Process data_B──────────────────────current_offset = 60.05MBassign offset: 60.05MBwrite data_B at 60.05MBforward to secondaries: "write at 60.05MB"new_offset = 60.05MB + 75KB = 60.125MBStep 3: Process data_C──────────────────────current_offset = 60.125MBassign offset: 60.125MBwrite data_C at 60.125MBforward to secondaries: "write at 60.125MB"new_offset = 60.125MB + 100KB = 60.225MBRESULT:──────All replicas have identical contents:┌────────────────────────────────┐│ ... ││ 60.000MB: data_A (50KB) ││ 60.050MB: data_B (75KB) ││ 60.125MB: data_C (100KB) ││ 60.225MB: <next append here> │└────────────────────────────────┘Clients receive:• Client A: offset 60.000MB• Client B: offset 60.050MB• Client C: offset 60.125MBKEY POINT:─────────Primary serializes ALL appends→ Consistent order across all replicas→ No distributed consensus needed→ Simple, fast, scalableThroughput:• Limited by primary's processing• ~1000s of appends/second possible• Perfect for MapReduce intermediate data
READ PATH WITH CORRUPTION:─────────────────────────Client reads from cs-5:1. Chunkserver reads block from disk2. Computes checksum3. Compares with stored checksum4. MISMATCH! Corruption detectedChunkserver → Client: ERROR: Data corruptedClient response: 1. Try different replica (cs-12) 2. Report corruption to master 3. Return correct data to applicationMaster action: 1. Mark cs-5's copy as corrupted 2. Schedule re-replication from cs-12 3. Eventually delete corrupted copy from cs-5 4. Restore to 3 replicasEXAMPLE:───────Read request: chunk 0x1a2b, offset 5MB, length 1MBcs-5 reads blocks [80..95] (5MB/64KB to 6MB/64KB)Block 87: checksum mismatch!Expected: 0xabcd1234Actual: 0xabcd1235 ← One bit differentError returned to clientClient retries:cs-12 → Checksum OK → Return dataMaster notified:• cs-5 chunk 0x1a2b: CORRUPTED• Re-replicate from cs-12 or cs-23• Delete from cs-5 after re-replication
Scrubbing
Proactive Corruption Detection:
BACKGROUND SCRUBBING───────────────────Chunkserver runs background task:def scrubbing_task(): while True: for chunk in all_chunks: if chunk.idle() and low_load(): verify_chunk(chunk) sleep(random(60, 300)) # 1-5 minutesdef verify_chunk(chunk_handle): for block_index in range(num_blocks): data = read_block(chunk_handle, block_index) expected = get_checksum(block_index) actual = crc32(data) if expected != actual: # Found corruption! log_error(...) report_to_master(chunk_handle, block_index) # Don't fix locally, let master coordinateBENEFITS:────────• Detect corruption before client reads• Find rarely-read data corruption• Catch disk degradation early• Reduce read errors for clientsSCHEDULING:──────────• Low priority (idle time only)• Rate limited (1-2 chunks/minute)• Doesn't impact foreground I/O• Full cluster scrub: weeks/monthsMASTER RESPONSE:───────────────Chunkserver: "Chunk 0x1a2b is corrupted"Master: 1. Verify chunk has other good replicas 2. Increment re-replication priority 3. Schedule re-replication 4. Mark corrupted replica for deletion 5. Eventually delete from reporting chunkserver
Write-Time Optimization
Append Checksum Optimization:
APPEND CHECKSUM HANDLING───────────────────────Problem: Last block may be partialExample:────────Chunk size: 5.5MBBlocks: 0-87 (88 blocks)Last block (87): Only 32KB used (not full 64KB)Append: 100KB dataNaive approach:──────────────1. Read last block (32KB existing data)2. Append new data (100KB)3. Compute checksum for modified blocks4. Write back→ Extra read I/O! Slow!GFS Optimization:────────────────1. Don't read last block2. Append data starting at block boundary3. Pad if necessaryExample:────────Block 87: 32KB data + 32KB paddingBlock 88: 64KB new data (part of 100KB)Block 89: 36KB new data + 28KB unusedTrade-off:• Some wasted space (padding)• No extra read I/O• Faster appends• Acceptable for append-heavy workloadCHECKSUM INCREMENTAL UPDATE:───────────────────────────For record append:1. Primary assigns offset2. Writes data3. Computes checksums for new blocks only4. Sends checksums to secondaries with data5. Secondaries verify and storeNo need to re-checksum entire chunk→ Fast, scalable
Expected Answer:GFS uses pipelined data transfer to maximize network bandwidth utilization:Without Pipelining (Sequential):
Client sends to replica 1, waits for completion
Then sends to replica 2, waits
Then sends to replica 3
Time: 3× transfer time
Only one network link active at a time
With Pipelining:
Client sends to replica 1
Replica 1 forwards to replica 2 while still receiving from client
Replica 2 forwards to replica 3 while receiving from replica 1
All network links active simultaneously
Time: ~1× transfer time (plus small propagation delay)
Benefits:
3× speedup for 3 replicas
Fully utilizes network topology
Each link operates at full bandwidth
Essential for GFS’s high throughput goals
GFS chains replicas by network distance (closest first) to minimize cross-rack transfers and optimize pipelining.
Intermediate: How does record append handle concurrent writers?
Expected Answer:Record append enables multiple clients to append to the same file concurrently without coordination:Process:
Data Push: Each client pushes data to all replicas independently
Append Request: Each client sends append command to primary (not to a specific offset!)
Primary Serializes: Primary assigns offset for each append in the order received
Sequential Application: Primary applies appends sequentially, each at its assigned offset
Replica Coordination: Primary tells secondaries exact offset for each append
Offset Return: Primary returns assigned offset to each client
Key Properties:
Primary acts as serialization point (no distributed consensus needed)
All replicas apply appends in same order at same offsets
Each client learns where its record was placed
No locking or coordination between clients required
Enables thousands of concurrent appenders
Failure Handling:
If append fails on any replica, client retries
Retry may create duplicate at different offset
Application de-duplicates using unique record IDs
At-least-once delivery guaranteed
Perfect for MapReduce where thousands of mappers append to shared output files.
Advanced: Explain GFS's checksum strategy and trade-offs
Expected Answer:GFS uses 32-bit CRC32 checksums on 64KB blocks:Design:
Each 64MB chunk divided into 1024 × 64KB blocks
Each block has 32-bit checksum (4KB metadata per chunk)
Checksums stored in memory and on disk
Verified on every read
Why 64KB blocks?Smaller blocks (4KB):
✓ More precise corruption detection
✗ More checksums (higher memory overhead)
✗ More CPU for computation
✗ Higher metadata storage
Larger blocks (1MB):
✓ Less overhead
✗ Must re-read more data on corruption
✗ Less precise detection
64KB: Balance of precision and overheadWrite Optimization:
For appends: Don’t re-read last partial block
Pad to block boundary instead
Trade space for speed (append-optimized workload)
Detection Timing:
Read-time: Every read verified
Scrubbing: Background task checks idle chunks
Replication: Verified during chunk copy
Corruption Response:
Read from different replica
Report to master
Master schedules re-replication
Corrupted replica deleted
Trade-offs:
0.006% space overhead (acceptable)
CPU cost negligible (modern CRC32 instructions)
False negative: ~1 in 4 billion (acceptable for commodity hardware)
Catches virtually all bit flips, disk corruption, memory errors
System Design: How would you optimize GFS for small random writes?
Expected Answer:GFS is optimized for large sequential writes/appends. For small random writes, several optimizations possible:1. Client-Side Buffering:
Buffer multiple small writes in client
Batch into larger writes (e.g., 1MB)
Flush periodically or when buffer full
Trade-off: Latency for throughput, memory usage
2. Log-Structured Approach:
Append small writes to log file
Background compaction merges into final locations
Like LevelDB/RocksDB approach
Trade-off: Complexity, write amplification
3. Write Coalescing at Chunkserver:
Chunkserver buffers writes in memory
Coalesces overlapping/adjacent writes
Flushes in batches
Trade-off: Durability concerns, memory usage
4. Smaller Chunks:
Reduce from 64MB to 4-8MB
Less internal fragmentation
More metadata overhead (acceptable for small files)
Trade-off: More master load, more metadata
5. Separate Tier:
Small-write tier with different chunk size/strategy
Large-write tier with current 64MB chunks
Route based on access pattern
Trade-off: System complexity, two code paths
6. Hybrid Storage:
Small writes to SSD/NVME (low latency)
Large sequential to HDD (high throughput)
Background migration based on patterns
Trade-off: Cost, complexity
Real-World:
Colossus (GFS successor) uses metadata sharding and smaller chunks
HDFS has append-only model similar to GFS
Modern systems like Ceph use different strategies entirely
For truly random small writes, key-value stores (Bigtable, HBase) better fit
Walk me through GFS pipelined replication. Why is it faster than sending data to all replicas in parallel?
Strong Answer:In GFS, when a client writes data, it pushes the data to the nearest chunkserver in a chain. That chunkserver simultaneously writes to its local disk and forwards the data to the next chunkserver in the pipeline, which does the same. This is chain replication, not parallel fan-out.The key insight is network bandwidth utilization. Suppose each machine has 100 Mbps full-duplex bandwidth. With parallel fan-out, the client must send three copies of the data, splitting its outbound bandwidth three ways — each replica gets roughly 33 Mbps. Total time to replicate B bytes is 3B / 100 Mbps.With pipelining, the client sends data at full 100 Mbps to the first chunkserver. That chunkserver forwards at full 100 Mbps to the second. The second forwards to the third. Because the forwarding starts as soon as the first bytes arrive (not after the entire write is buffered), the pipeline latency is approximately B / 100 Mbps + 2 * network_latency, rather than 3B / 100 Mbps. For a 64MB chunk on a 100 Mbps network, pipelining takes roughly 5.2 seconds versus 15.6 seconds for parallel fan-out — a 3x speedup.Additionally, GFS routes the pipeline through the network topology to minimize cross-rack hops. The client sends to the nearest replica, which forwards to the next nearest, and so on. This ensures the most expensive network hops (cross-rack) happen only once.Follow-up: What happens to the pipeline if the second chunkserver in the chain crashes mid-write?The client detects the failure when it does not receive an ACK from the pipeline within the timeout. It reports the error to the master. The master selects a new chunkserver to replace the failed one and instructs the client to re-push data to a new pipeline. The chunk version number is incremented so the partially-written replica on the failed server (if it recovers later) is detected as stale and garbage collected. The write is not considered successful until all replicas have acknowledged.
GFS uses 32-bit CRC checksums on 64KB blocks. Walk me through the integrity verification process and its trade-offs.
Strong Answer:Every 64MB chunk is divided into 1,024 blocks of 64KB each. Each block has a 32-bit CRC32 checksum stored separately in memory and on disk. On every read, the chunkserver computes the CRC32 of the data read from disk and compares it to the stored checksum. If they do not match, the chunkserver reports corruption to the master and returns an error to the client. The client then reads from a different replica while the master schedules re-replication from a healthy copy.The trade-offs of this design: The 64KB block size is a sweet spot. Smaller blocks (4KB) would detect corruption more precisely and require re-reading less data on a mismatch, but would increase checksum storage overhead (16x more checksums per chunk) and CPU overhead for checksum computation. Larger blocks (1MB) would reduce overhead but mean that any corruption requires re-reading a larger amount. At 64KB, the checksum overhead is 4KB per 64MB chunk (0.006%), and a single corrupted block requires re-reading only 64KB from another replica.For appends, there is an important optimization: since appends always go to the end of the chunk, the checksum for the last partial block can be incrementally updated rather than recomputed from scratch. For writes to the middle of a chunk (which are rare in GFS workloads), the entire block checksum must be recomputed.Background scrubbing is the other critical piece: chunkservers periodically verify checksums of all stored chunks even when no reads are happening, to detect bit rot (silent corruption from aging disks) before a second replica fails.Follow-up: Why CRC32 and not a cryptographic hash like SHA-256?CRC32 is purely an integrity check, not a security mechanism. Its advantage is speed: CRC32 can be computed at memory bandwidth speeds (multiple GB/s on modern hardware) and is often hardware-accelerated. SHA-256 is roughly 10-50x slower. Since GFS is checking for accidental corruption (bit flips, disk errors), not malicious tampering, the weaker collision resistance of CRC32 is perfectly acceptable. The probability of a random corruption producing a valid CRC32 is 1 in 4 billion, which is sufficient for detecting hardware errors.
Chunks are stored as plain Linux files on chunkservers. Why not use a custom block device or raw disk access?
Strong Answer:Storing chunks as regular Linux files on ext3/ext4 was a deliberate simplicity decision. It meant that operators could use standard Linux tools (ls, df, du, strace) to inspect the storage layer. Debugging a distributed system is hard enough without also needing custom tools to read the on-disk format. It also meant leveraging decades of Linux kernel optimization for buffer management, disk scheduling, and journaling without reimplementing any of it.The trade-off was some performance overhead from the extra file system layer. Raw disk access would eliminate the file system metadata overhead and give more predictable I/O latency. But GFS workloads are throughput-oriented (large sequential reads and writes), which is exactly what Linux file systems are optimized for. The overhead of the file system layer is minimal for sequential I/O patterns.Sparse file allocation was another benefit: a chunk file can be created at 64MB nominal size but only consume disk space proportional to the actual data written. This means a 1MB file stored in a 64MB chunk only uses 1MB of disk space, not 64MB.This is a practical engineering lesson worth remembering: sometimes the best architecture decision is to not build something. By reusing the Linux file system, GFS avoided months of development and years of bug fixes in a custom storage layer.Follow-up: When would raw disk access be the right choice instead?For latency-sensitive workloads where you need deterministic I/O performance. Databases like RocksDB and ScyllaDB sometimes bypass the file system to control exactly when and how data hits the disk, eliminating the unpredictability of the OS page cache and journaling. Also, for very high-throughput systems that need to squeeze every last MB/s out of the hardware, the file system overhead (inode lookups, journaling writes) becomes measurable. But for GFS in 2003, the throughput target was achievable with ext3, so the simplicity trade-off was clearly worth it.