Skip to main content

Documentation Index

Fetch the complete documentation index at: https://resources.devweekends.com/llms.txt

Use this file to discover all available pages before exploring further.

Chapter 4: Chunkservers and Data Flow

Chunkservers are the workhorses of GFS, storing actual data and serving client requests. While the master gets most of the attention in system design discussions, the chunkserver design contains equally important lessons. The decision to store chunks as plain Linux files (rather than using a custom block device or raw disk access) dramatically simplified implementation and debugging — an operator could use standard Linux tools like ls, df, and strace to inspect the storage layer. The data flow optimizations, particularly the pipelined chain replication and network-topology-aware routing, are textbook examples of how to squeeze maximum throughput out of commodity hardware. These same techniques appear in modern systems: chain replication is used in HDFS and Azure Storage, and topology-aware data placement is standard in every major cloud provider’s infrastructure. This chapter explores how chunks are stored, how data flows through the system, and how GFS ensures data integrity despite commodity hardware failures.
Chapter Goals:
  • Understand chunk storage implementation
  • Master the replication pipeline and data flow optimization
  • Learn detailed read, write, and record append flows
  • Explore checksumming and data integrity mechanisms
  • Grasp failure handling at the data layer

Chunk Storage

Each chunkserver is a simple Linux process storing chunks as regular files. This simplicity was intentional and strategic. By leveraging the host operating system’s file system (ext3/ext4) rather than managing raw disks directly, GFS avoided reinventing buffer management, journaling, and disk scheduling. The trade-off was some performance overhead from the extra file system layer, but the engineering velocity gained from simpler code, easier debugging, and leveraging years of Linux kernel optimization more than compensated. This is a practical lesson worth remembering: sometimes the best engineering decision is to not build something.

Physical Storage Layout

CHUNKSERVER DISK LAYOUT
──────────────────────

Linux Filesystem (ext3/ext4):
/
├── ...
├── gfs/
│   ├── chunks/
│   │   ├── 0x1a2b3c4d5e6f7g8h      (64MB max)
│   │   ├── 0x2b3c4d5e6f7g8h9i      (64MB max)
│   │   ├── 0x3c4d5e6f7g8h9i0j      (22MB actual)
│   │   ├── ...
│   │   └── 0xaabbccddeeff1122      (64MB max)
│   │
│   └── metadata/
│       ├── checksums.db            (chunk → checksum map)
│       └── versions.db             (chunk → version map)

└── ...

Chunk File Structure:
────────────────────

Each chunk = Linux file named by chunk handle

Properties:
• Sparse allocation (only actual data uses disk)
• Can grow up to 64MB
• Standard Linux file operations
• Checksummed in 64KB blocks

Example:
────────
Chunk handle: 0x1a2b3c4d5e6f7g8h
File path: /gfs/chunks/0x1a2b3c4d5e6f7g8h
Size on disk: 45MB (even though max is 64MB)
Blocks: 45MB / 64KB = 720 blocks
Checksums: 720 × 4 bytes = 2.8KB


SPARSE ALLOCATION:
─────────────────

Small file (1MB) in 64MB chunk:

Without sparse allocation:
• Allocates 64MB on disk
• Wastes 63MB
• 98.4% waste!

With sparse allocation:
• Allocates ~1MB on disk
• Only actual data uses space
• Linux handles efficiently

Implementation:
───────────────
fd = open("/gfs/chunks/0x1a2b...", O_CREAT)
ftruncate(fd, 64MB)  // Set max size
write(fd, data, len)  // Only allocates used blocks

Chunk Metadata

Data Integrity Protection:
CHECKSUM STRUCTURE
─────────────────

Each chunk divided into 64KB blocks:

┌─────────────────────────────────────────┐
│ Chunk (64MB max)                        │
├─────────────────────────────────────────┤
│ Block 0 (64KB)  → Checksum 0 (4 bytes)  │
│ Block 1 (64KB)  → Checksum 1 (4 bytes)  │
│ Block 2 (64KB)  → Checksum 2 (4 bytes)  │
│ ...                                     │
│ Block 1023 (64KB) → Checksum 1023       │
└─────────────────────────────────────────┘

Checksum = 32-bit CRC32

For full 64MB chunk:
• 1024 blocks
• 1024 checksums
• 4KB checksum metadata

Stored: In-memory + on disk


CHECKSUM COMPUTATION:
────────────────────

On Write:
─────────
def write_block(block_index, data):
    checksum = crc32(data)
    write_data_to_disk(block_index, data)
    update_checksum(block_index, checksum)
    persist_checksum()

On Read:
────────
def read_block(block_index):
    data = read_data_from_disk(block_index)
    expected_checksum = get_checksum(block_index)
    actual_checksum = crc32(data)

    if actual_checksum != expected_checksum:
        report_corruption()
        read_from_different_replica()
    else:
        return data


WHY 64KB BLOCKS?
───────────────

Trade-off analysis:

Smaller blocks (e.g., 4KB):
✓ Detect corruption precisely
✓ Re-read smaller amount
✗ More checksums (higher overhead)
✗ More CPU for checksum computation

Larger blocks (e.g., 1MB):
✓ Fewer checksums
✓ Less CPU overhead
✗ Must re-read larger amount on corruption
✗ Less precise error detection

64KB: Sweet spot
• 0.006% overhead (4KB per 64MB)
• Fast CRC32 computation
• Reasonable re-read size
• Matches common I/O sizes

Data Flow Optimization

GFS decouples data flow from control flow and optimizes for network topology. This section describes what is arguably GFS’s most clever performance optimization. The pipeline push mechanism achieves near-theoretical-maximum bandwidth utilization by exploiting full-duplex network links: each chunkserver in the chain simultaneously receives data from its predecessor and forwards to its successor. The key mathematical insight is that total transfer time for N replicas approaches the time for a single transfer (plus small propagation delays), rather than scaling linearly with replica count.

Pipelined Data Push

PIPELINED TRANSFER
─────────────────

Goal: Fully utilize network bandwidth

Problem with Sequential Transfer:
─────────────────────────────────

Client sends to all replicas sequentially:

Client → cs-1 (100MB) → done
      → cs-2 (100MB) → done
      → cs-3 (100MB) → done

Time: 3 × transfer_time
Network: Only one link used at a time
Utilization: 33%


GFS Solution: Pipeline + Chain
──────────────────────────────

Client sends to closest replica
Each replica forwards while receiving

Client ──→ cs-1 ──→ cs-2 ──→ cs-3

Timing:
───────

Time 0s:    Client → cs-1 (start)
Time 0.1s:  Client → cs-1 (continue)
            cs-1 → cs-2 (start)

Time 0.2s:  Client → cs-1 (continue)
            cs-1 → cs-2 (continue)
            cs-2 → cs-3 (start)

Time 1.0s:  Client → cs-1 (done)
Time 1.1s:  cs-1 → cs-2 (done)
Time 1.2s:  cs-2 → cs-3 (done)

Total time: ~1.2s (vs 3.0s sequential!)

Bandwidth utilization:
• Client → cs-1 link: 100%
• cs-1 → cs-2 link: 100%
• cs-2 → cs-3 link: 100%
• ALL LINKS UTILIZED SIMULTANEOUSLY!


PIPELINING IMPLEMENTATION:
─────────────────────────

Buffer size: 64KB (one checksum block)

Client:
───────
buffer = []
for each 64KB chunk in data:
    send_to_chunkserver_1(chunk)
    # Don't wait for ACK
    # Send next chunk immediately

Chunkserver:
───────────
while receiving:
    chunk = receive_64KB()
    write_to_disk(chunk)
    forward_to_next_chunkserver(chunk)
    # Parallel: write and forward!

Last chunkserver:
────────────────
    chunk = receive_64KB()
    write_to_disk(chunk)
    # No forwarding needed

All send ACK when fully received

Network Topology Awareness

Optimal Chain Formation:
CHAIN ORDERING
─────────────

Replicas: cs-1 (rack A), cs-2 (rack B), cs-3 (rack A)
Client: In rack A

Bad Chain:
──────────
Client (rack A) → cs-2 (rack B) → cs-1 (rack A) → cs-3 (rack A)
          ^               ^               ^
     cross-rack     cross-rack      same-rack

• 2 cross-rack transfers
• Slow uplink saturated twice
• Poor performance

Good Chain:
───────────
Client (rack A) → cs-1 (rack A) → cs-2 (rack B) → cs-3 (rack A)
          ^               ^               ^
     same-rack      cross-rack     cross-rack

• 1 same-rack transfer first (fast)
• Then cross-rack in parallel
• Better utilization

Best Chain (if possible):
────────────────────────
Client (rack A) → cs-1 (rack A) → cs-3 (rack A) → cs-2 (rack B)
          ^               ^               ^
     same-rack       same-rack      cross-rack

• 2 same-rack transfers
• Only 1 cross-rack (at end)
• Optimal performance


DISTANCE METRIC:
───────────────

Distance calculation:

def network_distance(node1, node2):
    if same_machine(node1, node2):
        return 0
    elif same_rack(node1, node2):
        return 1
    elif same_datacenter(node1, node2):
        return 2
    else:
        return 3

Chaining algorithm:
──────────────────
1. Start with closest replica to client
2. Each replica forwards to closest remaining
3. Forms efficient chain automatically

Read Operation

Reads are straightforward—client talks directly to chunkserver.

Read Flow Detailed

1

Application Read Request

APPLICATION:
───────────

data = gfs.read("/data/file1", offset=100MB, length=1MB)

GFS CLIENT LIBRARY:
──────────────────

1. Convert offset to chunk index:
   offset 100MB ÷ 64MB/chunk = chunk 1
   within-chunk offset = 100MB % 64MB = 36MB

2. Check if read spans multiple chunks:
   start: chunk 1, offset 36MB
   end: 36MB + 1MB = 37MB (still in chunk 1)
   → Single chunk read

3. Check metadata cache:
   cache.lookup("/data/file1", chunk_index=1)
2

Metadata Lookup

CACHE MISS → Contact Master:
────────────────────────────

Client → Master:
  "Get chunk locations for /data/file1, chunks [1, 2, 3]"
  (Prefetch multiple chunks for sequential reads)

Master → Client:
  Chunk 1:
    handle: 0x1a2b3c4d5e6f7g8h
    version: 5
    locations: [cs-5, cs-12, cs-23]

  Chunk 2:
    handle: 0x2b3c4d5e6f7g8h9i
    version: 3
    locations: [cs-7, cs-15, cs-21]

  Chunk 3:
    handle: 0x3c4d5e6f7g8h9i0j
    version: 2
    locations: [cs-3, cs-9, cs-18]

Client caches this metadata (timeout: few minutes)
3

Chunkserver Selection

SELECT CLOSEST CHUNKSERVER:
──────────────────────────

Chunk 1 locations: [cs-5, cs-12, cs-23]

Client calculates network distance:
cs-5:  same rack → distance 1
cs-12: different rack → distance 2
cs-23: different rack → distance 2

Select: cs-5 (closest)

Fallback strategy:
If cs-5 fails → try cs-12
If cs-12 fails → try cs-23
If all fail → report error to master
4

Data Transfer

CLIENT → CHUNKSERVER-5:
──────────────────────

Request:
────────
{
    chunk_handle: 0x1a2b3c4d5e6f7g8h,
    chunk_version: 5,
    offset: 36MB,
    length: 1MB
}

CHUNKSERVER-5 PROCESSING:
────────────────────────

1. Verify chunk version:
   stored_version = get_version(0x1a2b...)
   if stored_version != 5:
       return ERROR_STALE_CHUNK

2. Read from disk:
   fd = open("/gfs/chunks/0x1a2b...")
   lseek(fd, 36MB)
   data = read(fd, 1MB)

3. Verify checksums:
   start_block = 36MB / 64KB = block 576
   end_block = 37MB / 64KB = block 592
   for block in [576..592]:
       expected = get_checksum(block)
       actual = crc32(data[block])
       if expected != actual:
           return ERROR_CORRUPTED

4. Return data:
   send_to_client(data)
5

Client Validation

CLIENT RECEIVES DATA:
────────────────────

1. Verify length:
   if len(received) != requested_length:
       retry_from_different_replica()

2. Optional: Re-verify checksums
   (paranoid mode)

3. Return to application:
   return data

Error handling:
──────────────

If chunkserver reports corruption:
1. Try different replica (cs-12 or cs-23)
2. Report corruption to master
3. Master schedules re-replication from good replica
4. Master marks corrupted replica for deletion

Read Optimizations

Metadata Caching

Reduce Master Load:
  • Cache chunk locations for minutes
  • Batch requests for multiple chunks
  • Prefetch for sequential reads
  • Cache hit rate: 95%+
  • Master only sees 5% of reads

Replica Selection

Network Optimization:
  • Choose closest chunkserver
  • Reduces latency and bandwidth
  • Load balance across replicas
  • Automatic failover to backup
  • Reduces cross-rack traffic

Large Reads

Throughput Focus:
  • Optimize for 1MB+ reads
  • Amortize connection setup
  • Pipeline multiple chunks
  • Sustained 100+ MB/s per client
  • Linear scaling with clients

Checksum Validation

Data Integrity:
  • Verify checksums on read
  • Detect disk corruption
  • Silent data corruption caught
  • Transparent retry from replica
  • Report to master for repair

Record Append: Failure and Duplication

The “Record Append” is GFS’s most unique operation and arguably the single feature that made MapReduce practical at scale. It guarantees that data is appended atomically at least once. Without record append, thousands of MapReduce map tasks writing to shared intermediate files would need distributed locking or external coordination — a prohibitive overhead. Record append solved this by letting the primary chunkserver choose the offset, eliminating the need for clients to coordinate. The “at least once” semantics (with possible duplicates) were acceptable because MapReduce was already designed to be idempotent. This co-design between GFS and its primary consumer is a masterclass in API design for distributed systems.

Handling Append Failures

What happens if an append succeeds on the primary and one secondary, but fails on another?
  1. The Failure: If any replica fails the append, the primary returns an error to the client.
  2. The Inconsistency: At this point, the replicas are inconsistent. Some have the record, others don’t.
  3. The Retry: The client retries the operation.
  4. The Resolution:
    • The primary picks a new offset for the retry.
    • It tells all replicas (including those that succeeded the first time) to write the data at this new offset.
    • The original “successful” data in the first attempt now exists at an offset that is effectively “garbage” or “padding” in the failed replicas.
Result:
  • At-least-once: The record is guaranteed to be present in all replicas at the same offset.
  • Duplicates: Some replicas may contain the record multiple times (once at the failed offset, once at the retry offset).
  • Undefined regions: The failed offset contains partial or duplicate data, which the client library must filter out.

Data Integrity: The Scrubbing Process

Beyond checking data during reads, chunkservers perform background “scrubbing” to detect silent data corruption (bit rot) in rarely accessed chunks. This is critically important because disk drives can silently corrupt data without reporting errors — a phenomenon known as “bit rot” or “silent data corruption.” Studies by CERN and others have shown that undetected bit errors occur at rates of roughly 1 in 10^14 to 10^15 bits, which means a petabyte-scale cluster will encounter silent corruption regularly. Without proactive scrubbing, corruption in cold data could go undetected for months or years, until the data is finally read and discovered to be unrecoverable.

Background Scanning

  • Process: A background thread continuously cycles through all chunks stored on the server.
  • Action: It reads each 64KB block, computes the CRC32, and compares it to the stored checksum.
  • Detection: If corruption is found, the chunkserver notifies the master.
  • Repair: The master treats this as a replica loss and initiates a re-replication from a healthy replica.
This process ensures that the target replication count (e.g., 3x) represents verified, healthy copies, not just filenames on a disk.

Write Operation

Writes are more complex, involving multiple replicas and consistency guarantees.

Write Flow Detailed

COMPLETE WRITE OPERATION
────────────────────────

CLIENT WRITE REQUEST:
────────────────────

gfs.write("/data/file1", offset=50MB, data=1MB)

PHASE 1: LEASE ACQUISITION
──────────────────────────

Client → Master:
  "Need to write to /data/file1, chunk 0"

Master checks:
  1. Chunk 0 exists? Yes
  2. Has current primary? No
  3. Select primary: cs-5 (lowest load)
  4. Grant lease to cs-5 (60 seconds)
  5. Increment version: 4 → 5

Master → cs-5, cs-12, cs-23:
  "Increment version to 5"

All chunkservers ACK

Master → Client:
  {
    chunk_handle: 0x1a2b...,
    version: 5,
    primary: cs-5,
    secondaries: [cs-12, cs-23],
    lease_expiration: T+60
  }

PHASE 2: DATA PUSH
─────────────────

Client forms chain: cs-5 → cs-12 → cs-23
(based on network topology)

Client → cs-5:
  {
    type: DATA_PUSH,
    chunk_handle: 0x1a2b...,
    data_id: uuid_123,  // Unique ID for this data
    data: [1MB payload]
  }

cs-5:
  1. Buffer data in memory (don't write to disk yet!)
  2. Forward to cs-12 while receiving from client

  → cs-12: DATA_PUSH with same data

cs-12:
  1. Buffer data in memory
  2. Forward to cs-23

  → cs-23: DATA_PUSH with same data

cs-23:
  1. Buffer data in memory
  2. Send ACK to cs-12

cs-12 → ACK to cs-5
cs-5 → ACK to client

Client now knows all replicas have buffered the data

PHASE 3: WRITE COMMAND
─────────────────────

Client → Primary (cs-5):
  {
    type: WRITE_COMMAND,
    chunk_handle: 0x1a2b...,
    data_id: uuid_123,  // Reference to buffered data
    offset: 50MB,
    length: 1MB
  }

PRIMARY (cs-5) PROCESSING:
─────────────────────────

1. Assign serial number:
   (If multiple concurrent writes, serialize them)
   serial_number = next_serial++  // e.g., 42

2. Apply write to local disk:
   fd = open("/gfs/chunks/0x1a2b...")
   lseek(fd, 50MB)
   write(fd, buffered_data[uuid_123])

3. Update checksums:
   start_block = 50MB / 64KB
   end_block = 51MB / 64KB
   for block in [start_block..end_block]:
       checksum[block] = crc32(data[block])

4. Forward write command to secondaries:

   cs-5 → cs-12, cs-23:
     {
       type: APPLY_WRITE,
       chunk_handle: 0x1a2b...,
       data_id: uuid_123,
       offset: 50MB,
       length: 1MB,
       serial_number: 42
     }

SECONDARIES (cs-12, cs-23):
──────────────────────────

1. Apply in serial number order:
   (Wait if earlier serial numbers not yet applied)

2. Write to disk:
   Same as primary

3. Update checksums:
   Same as primary

4. Send ACK to primary:
   cs-12 → cs-5: ACK(serial_number=42)
   cs-23 → cs-5: ACK(serial_number=42)

PRIMARY RECEIVES ACKS:
─────────────────────

If ALL secondaries ACK:
  cs-5 → Client: SUCCESS

If ANY secondary fails:
  cs-5 → Client: ERROR
  (Client retries entire operation)

Client writes are atomic: all replicas or none

FAILURE SCENARIOS:
─────────────────

Scenario 1: cs-23 fails during data push
────────────────────────────────────────
• Client doesn't receive ACK from all replicas
• Client retries entire write
• Master may detect cs-23 failure
• Master re-replicates chunk elsewhere

Scenario 2: cs-12 fails during apply
────────────────────────────────────
• Primary doesn't receive ACK from cs-12
• Primary returns ERROR to client
• Client retries
• Chunk now inconsistent:
  - cs-5: has write
  - cs-12: may or may not have write
  - cs-23: has write
• But: All replicas have same version (5)
• Next write will succeed and create defined region
• GFS consistency model allows this (relaxed)

Scenario 3: Primary fails after apply
─────────────────────────────────────
• Primary wrote locally and sent to secondaries
• Crashed before receiving ACKs
• Client sees timeout
• Client retries
• Master's lease expires (60 sec max wait)
• Master grants lease to different replica
• New primary has the data (it was applied)
• Client retry may create duplicate (application handles)

Record Append Operation

The atomic record append is GFS’s signature feature.

Record Append Flow

SUCCESSFUL RECORD APPEND
───────────────────────

CLIENT REQUEST:
──────────────

offset = gfs.record_append("/data/file1", data=100KB)

CLIENT → MASTER:
───────────────

"Need to append to /data/file1"

Master returns last chunk (let's say chunk 5):
{
  chunk_handle: 0x1a2b...,
  version: 3,
  primary: cs-5,
  secondaries: [cs-12, cs-23],
  current_size: 60MB  // Approximate
}

DATA PUSH PHASE:
───────────────

(Same as write - push to all replicas)

Client → cs-5 → cs-12 → cs-23
Data buffered in memory on all

APPEND COMMAND:
──────────────

Client → Primary (cs-5):
{
  type: RECORD_APPEND,
  chunk_handle: 0x1a2b...,
  data_id: uuid_456,
  length: 100KB
}

PRIMARY PROCESSING:
──────────────────

1. Check current offset:
   current_offset = get_chunk_size(0x1a2b...)
   current_offset = 60MB

2. Check if record fits:
   new_offset = 60MB + 100KB
   if new_offset <= 64MB:
       # Fits! Proceed
   else:
       # Doesn't fit, pad and use next chunk

3. Assign offset:
   append_offset = 60MB

4. Assign serial number:
   serial = next_serial++

5. Apply locally:
   fd = open("/gfs/chunks/0x1a2b...")
   lseek(fd, 60MB)
   write(fd, buffered_data[uuid_456])

6. Update checksums

7. Forward to secondaries:
   cs-5 → cs-12, cs-23:
   {
     type: APPLY_APPEND,
     data_id: uuid_456,
     offset: 60MB,  // SAME offset for all!
     length: 100KB,
     serial: serial
   }

SECONDARIES APPLY:
─────────────────

cs-12:
  lseek(60MB)
  write(data)
  update_checksums()
  ACK to primary

cs-23:
  Same

PRIMARY → CLIENT:
────────────────

{
  status: SUCCESS,
  offset: 60MB
}

Client now knows record is at offset 60MB
All replicas identical!

Data Integrity

GFS uses checksumming to detect data corruption from disk/memory/network errors.

Checksum Implementation

CHECKSUM SYSTEM
──────────────

Data Structure (per chunk):
──────────────────────────

chunk_checksums = {
    chunk_handle: {
        block_0: checksum_0,    # 32-bit CRC32
        block_1: checksum_1,
        ...
        block_1023: checksum_1023
    }
}

For 64MB chunk:
• 1024 blocks of 64KB each
• 1024 checksums of 4 bytes each
• Total: 4KB metadata per chunk


CHECKSUM COMPUTATION:
────────────────────

CRC32 algorithm (fast, good detection)

On Write:
─────────

def write_block(chunk_handle, block_index, data):
    # Write data
    offset = block_index * 64KB
    fd = open(f"/gfs/chunks/{chunk_handle}")
    lseek(fd, offset)
    write(fd, data)

    # Compute and store checksum
    checksum = crc32(data)
    chunk_checksums[chunk_handle][block_index] = checksum

    # Persist checksum
    save_checksums_to_disk()

On Read:
────────

def read_block(chunk_handle, block_index):
    # Read data
    offset = block_index * 64KB
    fd = open(f"/gfs/chunks/{chunk_handle}")
    lseek(fd, offset)
    data = read(fd, 64KB)

    # Verify checksum
    expected = chunk_checksums[chunk_handle][block_index]
    actual = crc32(data)

    if expected != actual:
        # CORRUPTION DETECTED!
        log_error(f"Corruption in chunk {chunk_handle}, block {block_index}")
        report_to_master(chunk_handle)
        raise CorruptionError()

    return data


CHECKSUM STORAGE:
────────────────

In memory: Hash map for fast access
On disk: Append-only log

File: /gfs/metadata/checksums.db
Format:
───────
chunk_handle | block_index | checksum
0x1a2b...    | 0           | 0xabcd1234
0x1a2b...    | 1           | 0xef567890
...

Loaded into memory at startup
Updated on every write
Periodically compacted


CORRUPTION DETECTION RATE:
─────────────────────────

CRC32 properties:
• 32-bit checksum
• Detects all single-bit errors
• Detects all double-bit errors
• Detects bursts up to 32 bits
• Miss rate: ~1 in 4 billion for random corruption

In practice:
• Disk bit flips: Detected 99.9999%+
• Memory corruption: Detected reliably
• Network corruption: Also caught by TCP checksums
• Enough for GFS's reliability requirements

Corruption Handling

Detecting Corruption on Read:
READ PATH WITH CORRUPTION:
─────────────────────────

Client reads from cs-5:

1. Chunkserver reads block from disk
2. Computes checksum
3. Compares with stored checksum
4. MISMATCH! Corruption detected

Chunkserver → Client:
  ERROR: Data corrupted

Client response:
  1. Try different replica (cs-12)
  2. Report corruption to master
  3. Return correct data to application

Master action:
  1. Mark cs-5's copy as corrupted
  2. Schedule re-replication from cs-12
  3. Eventually delete corrupted copy from cs-5
  4. Restore to 3 replicas


EXAMPLE:
───────

Read request: chunk 0x1a2b, offset 5MB, length 1MB

cs-5 reads blocks [80..95] (5MB/64KB to 6MB/64KB)

Block 87: checksum mismatch!
Expected: 0xabcd1234
Actual:   0xabcd1235  ← One bit different

Error returned to client

Client retries:
cs-12 → Checksum OK → Return data

Master notified:
• cs-5 chunk 0x1a2b: CORRUPTED
• Re-replicate from cs-12 or cs-23
• Delete from cs-5 after re-replication
Proactive Corruption Detection:
BACKGROUND SCRUBBING
───────────────────

Chunkserver runs background task:

def scrubbing_task():
    while True:
        for chunk in all_chunks:
            if chunk.idle() and low_load():
                verify_chunk(chunk)
        sleep(random(60, 300))  # 1-5 minutes

def verify_chunk(chunk_handle):
    for block_index in range(num_blocks):
        data = read_block(chunk_handle, block_index)
        expected = get_checksum(block_index)
        actual = crc32(data)

        if expected != actual:
            # Found corruption!
            log_error(...)
            report_to_master(chunk_handle, block_index)
            # Don't fix locally, let master coordinate


BENEFITS:
────────

• Detect corruption before client reads
• Find rarely-read data corruption
• Catch disk degradation early
• Reduce read errors for clients

SCHEDULING:
──────────

• Low priority (idle time only)
• Rate limited (1-2 chunks/minute)
• Doesn't impact foreground I/O
• Full cluster scrub: weeks/months

MASTER RESPONSE:
───────────────

Chunkserver: "Chunk 0x1a2b is corrupted"

Master:
  1. Verify chunk has other good replicas
  2. Increment re-replication priority
  3. Schedule re-replication
  4. Mark corrupted replica for deletion
  5. Eventually delete from reporting chunkserver
Append Checksum Optimization:
APPEND CHECKSUM HANDLING
───────────────────────

Problem: Last block may be partial

Example:
────────
Chunk size: 5.5MB
Blocks: 0-87 (88 blocks)
Last block (87): Only 32KB used (not full 64KB)

Append: 100KB data

Naive approach:
──────────────
1. Read last block (32KB existing data)
2. Append new data (100KB)
3. Compute checksum for modified blocks
4. Write back

→ Extra read I/O! Slow!

GFS Optimization:
────────────────

1. Don't read last block
2. Append data starting at block boundary
3. Pad if necessary

Example:
────────
Block 87: 32KB data + 32KB padding
Block 88: 64KB new data (part of 100KB)
Block 89: 36KB new data + 28KB unused

Trade-off:
• Some wasted space (padding)
• No extra read I/O
• Faster appends
• Acceptable for append-heavy workload


CHECKSUM INCREMENTAL UPDATE:
───────────────────────────

For record append:

1. Primary assigns offset
2. Writes data
3. Computes checksums for new blocks only
4. Sends checksums to secondaries with data
5. Secondaries verify and store

No need to re-checksum entire chunk
→ Fast, scalable

Interview Questions

Expected Answer:GFS uses pipelined data transfer to maximize network bandwidth utilization:Without Pipelining (Sequential):
  • Client sends to replica 1, waits for completion
  • Then sends to replica 2, waits
  • Then sends to replica 3
  • Time: 3× transfer time
  • Only one network link active at a time
With Pipelining:
  • Client sends to replica 1
  • Replica 1 forwards to replica 2 while still receiving from client
  • Replica 2 forwards to replica 3 while receiving from replica 1
  • All network links active simultaneously
  • Time: ~1× transfer time (plus small propagation delay)
Benefits:
  • 3× speedup for 3 replicas
  • Fully utilizes network topology
  • Each link operates at full bandwidth
  • Essential for GFS’s high throughput goals
GFS chains replicas by network distance (closest first) to minimize cross-rack transfers and optimize pipelining.
Expected Answer:Record append enables multiple clients to append to the same file concurrently without coordination:Process:
  1. Data Push: Each client pushes data to all replicas independently
  2. Append Request: Each client sends append command to primary (not to a specific offset!)
  3. Primary Serializes: Primary assigns offset for each append in the order received
  4. Sequential Application: Primary applies appends sequentially, each at its assigned offset
  5. Replica Coordination: Primary tells secondaries exact offset for each append
  6. Offset Return: Primary returns assigned offset to each client
Key Properties:
  • Primary acts as serialization point (no distributed consensus needed)
  • All replicas apply appends in same order at same offsets
  • Each client learns where its record was placed
  • No locking or coordination between clients required
  • Enables thousands of concurrent appenders
Failure Handling:
  • If append fails on any replica, client retries
  • Retry may create duplicate at different offset
  • Application de-duplicates using unique record IDs
  • At-least-once delivery guaranteed
Perfect for MapReduce where thousands of mappers append to shared output files.
Expected Answer:GFS uses 32-bit CRC32 checksums on 64KB blocks:Design:
  • Each 64MB chunk divided into 1024 × 64KB blocks
  • Each block has 32-bit checksum (4KB metadata per chunk)
  • Checksums stored in memory and on disk
  • Verified on every read
Why 64KB blocks?Smaller blocks (4KB):
  • ✓ More precise corruption detection
  • ✗ More checksums (higher memory overhead)
  • ✗ More CPU for computation
  • ✗ Higher metadata storage
Larger blocks (1MB):
  • ✓ Less overhead
  • ✗ Must re-read more data on corruption
  • ✗ Less precise detection
64KB: Balance of precision and overheadWrite Optimization:
  • For appends: Don’t re-read last partial block
  • Pad to block boundary instead
  • Trade space for speed (append-optimized workload)
Detection Timing:
  1. Read-time: Every read verified
  2. Scrubbing: Background task checks idle chunks
  3. Replication: Verified during chunk copy
Corruption Response:
  • Read from different replica
  • Report to master
  • Master schedules re-replication
  • Corrupted replica deleted
Trade-offs:
  • 0.006% space overhead (acceptable)
  • CPU cost negligible (modern CRC32 instructions)
  • False negative: ~1 in 4 billion (acceptable for commodity hardware)
  • Catches virtually all bit flips, disk corruption, memory errors
Expected Answer:GFS is optimized for large sequential writes/appends. For small random writes, several optimizations possible:1. Client-Side Buffering:
  • Buffer multiple small writes in client
  • Batch into larger writes (e.g., 1MB)
  • Flush periodically or when buffer full
  • Trade-off: Latency for throughput, memory usage
2. Log-Structured Approach:
  • Append small writes to log file
  • Background compaction merges into final locations
  • Like LevelDB/RocksDB approach
  • Trade-off: Complexity, write amplification
3. Write Coalescing at Chunkserver:
  • Chunkserver buffers writes in memory
  • Coalesces overlapping/adjacent writes
  • Flushes in batches
  • Trade-off: Durability concerns, memory usage
4. Smaller Chunks:
  • Reduce from 64MB to 4-8MB
  • Less internal fragmentation
  • More metadata overhead (acceptable for small files)
  • Trade-off: More master load, more metadata
5. Separate Tier:
  • Small-write tier with different chunk size/strategy
  • Large-write tier with current 64MB chunks
  • Route based on access pattern
  • Trade-off: System complexity, two code paths
6. Hybrid Storage:
  • Small writes to SSD/NVME (low latency)
  • Large sequential to HDD (high throughput)
  • Background migration based on patterns
  • Trade-off: Cost, complexity
Real-World:
  • Colossus (GFS successor) uses metadata sharding and smaller chunks
  • HDFS has append-only model similar to GFS
  • Modern systems like Ceph use different strategies entirely
  • For truly random small writes, key-value stores (Bigtable, HBase) better fit

Key Takeaways

Chunkserver & Data Flow Summary:
  1. Simple Storage: Chunks as regular Linux files with sparse allocation
  2. Pipelined Replication: Chain forwarding maximizes network bandwidth
  3. Network-Aware: Topology-aware chaining reduces cross-rack traffic
  4. Checksums: 64KB block granularity balances precision and overhead
  5. Record Append: Primary serializes concurrent appends without locks
  6. Corruption Handling: Read-time verification + background scrubbing
  7. Write Optimization: Avoid re-reading partial blocks on append
  8. At-Least-Once: Retries may create duplicates, application handles

Up Next

In Chapter 5: Consistency Model, we’ll explore:
  • GFS’s relaxed consistency guarantees
  • Defined vs undefined vs inconsistent regions
  • Atomic record append semantics in detail
  • How applications handle the consistency model
  • Implications for distributed systems design
We’ve seen how data flows—now we’ll understand what guarantees GFS provides to applications.

Interview Deep-Dive

Strong Answer:In GFS, when a client writes data, it pushes the data to the nearest chunkserver in a chain. That chunkserver simultaneously writes to its local disk and forwards the data to the next chunkserver in the pipeline, which does the same. This is chain replication, not parallel fan-out.The key insight is network bandwidth utilization. Suppose each machine has 100 Mbps full-duplex bandwidth. With parallel fan-out, the client must send three copies of the data, splitting its outbound bandwidth three ways — each replica gets roughly 33 Mbps. Total time to replicate B bytes is 3B / 100 Mbps.With pipelining, the client sends data at full 100 Mbps to the first chunkserver. That chunkserver forwards at full 100 Mbps to the second. The second forwards to the third. Because the forwarding starts as soon as the first bytes arrive (not after the entire write is buffered), the pipeline latency is approximately B / 100 Mbps + 2 * network_latency, rather than 3B / 100 Mbps. For a 64MB chunk on a 100 Mbps network, pipelining takes roughly 5.2 seconds versus 15.6 seconds for parallel fan-out — a 3x speedup.Additionally, GFS routes the pipeline through the network topology to minimize cross-rack hops. The client sends to the nearest replica, which forwards to the next nearest, and so on. This ensures the most expensive network hops (cross-rack) happen only once.Follow-up: What happens to the pipeline if the second chunkserver in the chain crashes mid-write?The client detects the failure when it does not receive an ACK from the pipeline within the timeout. It reports the error to the master. The master selects a new chunkserver to replace the failed one and instructs the client to re-push data to a new pipeline. The chunk version number is incremented so the partially-written replica on the failed server (if it recovers later) is detected as stale and garbage collected. The write is not considered successful until all replicas have acknowledged.
Strong Answer:Every 64MB chunk is divided into 1,024 blocks of 64KB each. Each block has a 32-bit CRC32 checksum stored separately in memory and on disk. On every read, the chunkserver computes the CRC32 of the data read from disk and compares it to the stored checksum. If they do not match, the chunkserver reports corruption to the master and returns an error to the client. The client then reads from a different replica while the master schedules re-replication from a healthy copy.The trade-offs of this design: The 64KB block size is a sweet spot. Smaller blocks (4KB) would detect corruption more precisely and require re-reading less data on a mismatch, but would increase checksum storage overhead (16x more checksums per chunk) and CPU overhead for checksum computation. Larger blocks (1MB) would reduce overhead but mean that any corruption requires re-reading a larger amount. At 64KB, the checksum overhead is 4KB per 64MB chunk (0.006%), and a single corrupted block requires re-reading only 64KB from another replica.For appends, there is an important optimization: since appends always go to the end of the chunk, the checksum for the last partial block can be incrementally updated rather than recomputed from scratch. For writes to the middle of a chunk (which are rare in GFS workloads), the entire block checksum must be recomputed.Background scrubbing is the other critical piece: chunkservers periodically verify checksums of all stored chunks even when no reads are happening, to detect bit rot (silent corruption from aging disks) before a second replica fails.Follow-up: Why CRC32 and not a cryptographic hash like SHA-256?CRC32 is purely an integrity check, not a security mechanism. Its advantage is speed: CRC32 can be computed at memory bandwidth speeds (multiple GB/s on modern hardware) and is often hardware-accelerated. SHA-256 is roughly 10-50x slower. Since GFS is checking for accidental corruption (bit flips, disk errors), not malicious tampering, the weaker collision resistance of CRC32 is perfectly acceptable. The probability of a random corruption producing a valid CRC32 is 1 in 4 billion, which is sufficient for detecting hardware errors.
Strong Answer:Storing chunks as regular Linux files on ext3/ext4 was a deliberate simplicity decision. It meant that operators could use standard Linux tools (ls, df, du, strace) to inspect the storage layer. Debugging a distributed system is hard enough without also needing custom tools to read the on-disk format. It also meant leveraging decades of Linux kernel optimization for buffer management, disk scheduling, and journaling without reimplementing any of it.The trade-off was some performance overhead from the extra file system layer. Raw disk access would eliminate the file system metadata overhead and give more predictable I/O latency. But GFS workloads are throughput-oriented (large sequential reads and writes), which is exactly what Linux file systems are optimized for. The overhead of the file system layer is minimal for sequential I/O patterns.Sparse file allocation was another benefit: a chunk file can be created at 64MB nominal size but only consume disk space proportional to the actual data written. This means a 1MB file stored in a 64MB chunk only uses 1MB of disk space, not 64MB.This is a practical engineering lesson worth remembering: sometimes the best architecture decision is to not build something. By reusing the Linux file system, GFS avoided months of development and years of bug fixes in a custom storage layer.Follow-up: When would raw disk access be the right choice instead?For latency-sensitive workloads where you need deterministic I/O performance. Databases like RocksDB and ScyllaDB sometimes bypass the file system to control exactly when and how data hits the disk, eliminating the unpredictability of the OS page cache and journaling. Also, for very high-throughput systems that need to squeeze every last MB/s out of the hardware, the file system overhead (inode lookups, journaling writes) becomes measurable. But for GFS in 2003, the throughput target was achievable with ext3, so the simplicity trade-off was clearly worth it.