MapReduce is the distributed data processing framework that sits at the heart of Hadoop. Inspired by Google’s MapReduce paper, it provides a simple yet powerful programming model that abstracts away the complexities of parallel programming, fault tolerance, and data distribution.
Chapter Goals:
Understand the MapReduce programming model
Master map and reduce phases in detail
Learn the complete execution flow from job submission to completion
Explore shuffle and sort mechanisms
Study fault tolerance and speculative execution
Compare Hadoop MapReduce with Google’s original implementation
MAP FUNCTION SIGNATURE─────────────────────map(K1 key, V1 value) → list<K2, V2>Input:• K1: Input key type (e.g., line number, file offset)• V1: Input value type (e.g., line of text)Output:• Emit zero or more (K2, V2) pairs• K2: Intermediate key (determines grouping)• V2: Intermediate valueEXAMPLE: Word Count Map───────────────────────Input: (offset, "hello world hello")Process:1. Split into words: ["hello", "world", "hello"]2. Emit for each word: emit("hello", 1) emit("world", 1) emit("hello", 1)Output: [("hello", 1), ("world", 1), ("hello", 1)]KEY INSIGHTS:────────────• Maps are stateless and independent• Each map processes ONE input record• Maps run in parallel across cluster• No communication between mappers• Can emit 0, 1, or many outputs per input
Map in Java (Hadoop API):
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split("\\s+"); for (String w : words) { word.set(w); context.write(word, one); // emit(word, 1) } }}
Reduce: Aggregate Values per Key
REDUCE FUNCTION SIGNATURE────────────────────────reduce(K2 key, Iterator<V2> values) → list<K3, V3>Input:• K2: Intermediate key (from map)• values: ALL values for this key (grouped by framework)Output:• Emit zero or more (K3, V3) pairs• Typically K3 = K2, V3 is aggregated resultEXAMPLE: Word Count Reduce──────────────────────────Input: ("hello", [1, 1, 1, 1, 1])Process:1. Sum all values: 1 + 1 + 1 + 1 + 1 = 52. Emit result: emit("hello", 5)Output: ("hello", 5)Another Input: ("world", [1, 1])Process:1. Sum: 1 + 1 = 22. Emit: emit("world", 2)Output: ("world", 2)KEY INSIGHTS:────────────• Reducer receives ALL values for a key• Keys are sorted before reduce• Reducers run in parallel (different keys)• Each reducer processes multiple keys• Values arrive as an iterator (may not fit in memory)
Reduce in Java (Hadoop API):
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}
Full Word Count MapReduce Job
public class WordCount { // Map Phase public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } // Reduce Phase public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } // Main: Configure and submit job public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); }}
PARTITIONING ALGORITHM─────────────────────For each emitted (key, value) from mapper:partitionId = hash(key) % numReducersExample with 3 Reducers:───────────────────────Mapper emits:("apple", 1) → hash("apple") % 3 = 2 → Reducer 2("banana", 1) → hash("banana") % 3 = 0 → Reducer 0("cherry", 1) → hash("cherry") % 3 = 1 → Reducer 1("apple", 1) → hash("apple") % 3 = 2 → Reducer 2("banana", 1) → hash("banana") % 3 = 0 → Reducer 0KEY PROPERTY: Consistent Hashing────────────────────────────────• Same key always goes to same reducer• All values for a key end up together• Enables grouping in reduce phaseCUSTOM PARTITIONER:──────────────────Default partitioner is HashPartitioner.You can provide custom logic:public class CustomPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // Example: partition by first letter char firstLetter = key.toString().charAt(0); return (firstLetter - 'a') % numPartitions; }}This ensures all words starting with same lettergo to same reducer.LOAD BALANCING CONCERN:──────────────────────• Hash may create unbalanced partitions (skew)• Some reducers get more data than others• Solution: Better partitioning or use Combiners
In-Memory Buffer and Spill Files
MAP OUTPUT BUFFER────────────────Configuration: io.sort.mb = 100 (default 100MB)┌─────────────────────────────────────┐│ Circular Memory Buffer ││ (100MB) │├─────────────────────────────────────┤│ (key1, val1, partition1, metadata) ││ (key2, val2, partition2, metadata) ││ (key3, val3, partition3, metadata) ││ ... ││ ... │└─────────────────────────────────────┘ │ │ When 80% full (configurable) ↓BACKGROUND SPILL TO DISK───────────────────────1. Sort by (partition, key): - Primary sort: partition ID - Secondary sort: key2. Optionally run Combiner (mini-reduce): - Local aggregation to reduce data - E.g., for word count: ("hello",[1,1,1]) → ("hello",3)3. Write sorted partitions to disk: spill_0.out: ┌──────────────────────────────┐ │ Partition 0 (sorted by key) │ │ ("apple", 2) │ │ ("banana", 1) │ ├──────────────────────────────┤ │ Partition 1 (sorted by key) │ │ ("cherry", 3) │ │ ("date", 1) │ ├──────────────────────────────┤ │ Partition 2 (sorted by key) │ │ ("elderberry", 1) │ │ ("fig", 2) │ └──────────────────────────────┘MULTIPLE SPILLS:───────────────If map produces lots of output:- spill_0.out- spill_1.out- spill_2.out- ...These must be merged before shuffle.FINAL MERGE:───────────Merge all spill files into single output file:- Multi-way merge (up to io.sort.factor files at once)- Maintains sort order- Run combiner again if configured- Result: Single sorted file per map task
Combiner: Local Reduce for Optimization
COMBINER OPTIMIZATION────────────────────Problem:• Map emits many duplicate keys• Lots of network transfer during shuffle• Reducers receive massive inputExample (Word Count without Combiner):─────────────────────────────────────Mapper 1 output:("hello", 1), ("hello", 1), ("hello", 1), ("world", 1)Mapper 2 output:("hello", 1), ("hello", 1), ("world", 1), ("world", 1)Shuffle transfers: 8 (key, value) pairsSolution: COMBINER (local aggregation)──────────────────────────────────────Combiner runs on map node BEFORE shuffle:Mapper 1 output after combiner:("hello", 3), ("world", 1)Mapper 2 output after combiner:("hello", 2), ("world", 2)Shuffle transfers: 4 (key, value) pairs→ 50% reduction!COMBINER FUNCTION:─────────────────Often SAME as reducer function:public class WordCountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> { @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}// Configure jobjob.setCombinerClass(WordCountCombiner.class);WHEN TO USE COMBINER:────────────────────✓ Associative and commutative operations: - Sum, count, max, min - reduce(reduce(A, B), C) = reduce(A, reduce(B, C))✗ NOT suitable for: - Average (need count and sum separately) - Median, percentiles - Operations where order mattersCOMBINER GUARANTEES:───────────────────• May run 0, 1, or multiple times• Framework decides when to run• Your code must work with or without combiner• Must produce same result regardless
SHUFFLE PHASE (from Reducer's perspective)─────────────────────────────────────────Reducer Knows:• Which mappers to fetch from (all of them)• Which partition to fetch (based on reducer ID)FETCH PROTOCOL:──────────────For each completed map task:1. Reducer sends HTTP GET request: http://tasktracker1:50060/mapOutput? jobId=job_202401241200_0001& mapId=task_0001_m_000001& reduce=22. TaskTracker serves partition 2 from map output3. Reducer saves to local disk or memoryFETCH TIMELINE:──────────────Reducers start fetching AS SOON AS first map completes.Don't wait for all maps to finish!Timeline:───────────────────────────────────────────────────Map 1 done → Reduce starts fetching from Map 1Map 2 done → Reduce fetches from Map 2...Map N done → Reduce fetches from Map NAll maps done → Reduce proceeds to merge/sort───────────────────────────────────────────────────FETCH STRATEGIES:────────────────Memory Buffer:• Small map outputs kept in memory (default 70% of heap)• Fast access, no disk I/ODisk Spill:• Large outputs spilled to disk• Merged later with on-disk mergeConfiguration:• mapreduce.reduce.shuffle.parallelcopies = 5 (fetch from 5 mappers simultaneously)• mapreduce.reduce.shuffle.memory.limit.percent = 0.25 (use 25% of heap for in-memory shuffle)FETCH FAILURES:──────────────If fetch fails:• Retry same TaskTracker (transient error)• After N retries, inform JobTracker• JobTracker may re-run map task elsewhere
The Critical Data Transfer Phase:The shuffle is often the bottleneck in MapReduce jobs. Understanding its internals is key to performance optimization.
SHUFFLE INTERNALS: MAP-SIDE────────────────────────────1. Memory Buffer (io.sort.mb, default 100MB):- Circular buffer where (key,value) pairs are initially written- When buffer reaches `io.sort.spill.percent` (default 80%), a "spill thread" kicks in- Spill sorts by (partition, key) using quicksort- If combiner is configured, it runs here to reduce output2. Spill to Disk:- Sorted data is written to a spill file: `map-00001.out`- Each spill file contains data for all partitions, but sorted within each partition- Multiple spills may occur if the map task processes large amounts of data3. Final Merge:- Before the map task finishes, all spill files are merged into a single output file- This is a multi-way merge, respecting the `io.sort.factor` (default 10) limit on simultaneous merges- The final file is named `map-00001.out` and stored locally on the DataNode4. Index File:- Alongside the output file, an index file `map-00001.out.index` is created- This index maps each partition ID to its location and length in the output file- Crucial for the reduce-side fetch processSHUFFLE INTERNALS: REDUCE-SIDE──────────────────────────────1. Fetch Initiation:- The reduce task starts fetching map outputs as soon as any map task completes- It connects to the HTTP server running on each TaskTracker that ran a map task- Fetches are performed in parallel, up to `mapreduce.reduce.shuffle.parallelcopies` (default 5)2. In-Memory Buffering:- Fetched data is kept in a memory buffer (size controlled by `mapreduce.reduce.shuffle.input.buffer.percent`)- If data arrives faster than it can be processed, it spills to disk3. On-Disk Merge:- Multiple fetched files are merged on the reducer's local disk- This uses the same multi-way merge algorithm as the map-side final merge- Result is a single, sorted file per reduce task4. Final Sort and Group:- All data is sorted by key (secondary sort during merge ensures this)- Values for each key are grouped together- The user's reduce function is called for each key-groupPERFORMANCE BOTTLENECKS:──────────────────────- Network Bandwidth: Shuffle is often network-bound- Disk I/O: Writing/reading spill files can be slow- Memory Pressure: Both map and reduce sides need adequate heap- Skewed Keys: A few keys with disproportionately large value lists can slow the entire reduce task
Merging All Map Outputs
MERGE PHASE──────────Reducer has fetched outputs from ALL mappers:In Memory:• 20 small files (each sorted)On Disk:• 50 larger files (each sorted)Goal: Merge into single sorted streamMULTI-LEVEL MERGE:─────────────────Configuration: io.sort.factor = 10(merge up to 10 files at once)Round 1: Merge on-disk files┌─────────────────────────────┐│ Merge files 1-10 → merged1 ││ Merge files 11-20 → merged2 ││ Merge files 21-30 → merged3 ││ Merge files 31-40 → merged4 ││ Merge files 41-50 → merged5 │└─────────────────────────────┘Round 2:┌─────────────────────────────┐│ Merge merged1-merged5 ││ + in-memory files ││ → Final sorted stream │└─────────────────────────────┘MERGE ALGORITHM (K-way merge):──────────────────────────────Use min-heap to efficiently merge K sorted files:Input Files:File 1: [1, 5, 9, 13]File 2: [2, 6, 10, 14]File 3: [3, 7, 11, 15]File 4: [4, 8, 12, 16]Min-Heap (track next element from each file):┌───────────────────────────┐│ (1, file1) ← root (min) ││ (2, file2) ││ (3, file3) ││ (4, file4) │└───────────────────────────┘Algorithm:1. Pop min from heap (1, file1)2. Output: 13. Read next from file1 (5), insert to heap4. Repeat...Result: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]Time Complexity: O(N log K)where N = total elements, K = number of filesFINAL SORTED OUTPUT:───────────────────(key, [values]) grouped by key:("apple", [1, 2, 1, 3, 1])("banana", [1, 1, 4, 1])("cherry", [2, 1, 1])...Ready for reduce() function!
MAP TASK MEMORY:───────────────mapreduce.map.memory.mb = 1024(1GB physical memory for map container)mapreduce.map.java.opts = -Xmx800m(800MB JVM heap for map task)Why less than 1024?→ Leave room for off-heap memory, OS overheadREDUCE TASK MEMORY:──────────────────mapreduce.reduce.memory.mb = 2048(2GB physical memory for reduce container)mapreduce.reduce.java.opts = -Xmx1600m(1.6GB JVM heap for reduce task)Reducers often need more memory (shuffle, sort)SORT BUFFER:───────────mapreduce.task.io.sort.mb = 100(100MB for map output buffer)Larger buffer → fewer spills → better performanceBut: Takes memory from heapmapreduce.task.io.sort.factor = 10(Merge up to 10 files at once)Higher factor → fewer merge roundsSHUFFLE BUFFER:──────────────mapreduce.reduce.shuffle.input.buffer.percent = 0.70(Use 70% of reduce heap for shuffle)mapreduce.reduce.shuffle.merge.percent = 0.66(Spill to disk when 66% full)RECOMMENDATIONS:───────────────Small Jobs (< 100GB):• map.memory.mb: 1024• reduce.memory.mb: 2048Medium Jobs (100GB - 1TB):• map.memory.mb: 1536• reduce.memory.mb: 3072Large Jobs (> 1TB):• map.memory.mb: 2048• reduce.memory.mb: 4096• sort.mb: 200
Reducing I/O with Compression
WHY COMPRESS:────────────✓ Less disk I/O (spills, shuffle)✓ Less network transfer✓ Faster job completion✗ CPU overhead (compression/decompression)COMPRESSION CODECS:──────────────────┌──────────────┬──────────┬──────────┬───────────┐│ Codec │ Ratio │ Speed │ Splittable│├──────────────┼──────────┼──────────┼───────────┤│ Gzip │ High │ Slow │ No ││ Bzip2 │ Highest │ Slowest │ Yes ││ LZO │ Medium │ Fast │ Yes* ││ Snappy │ Medium │ Fastest │ No │└──────────────┴──────────┴──────────┴───────────┘* LZO requires indexing for splittabilityWHERE TO COMPRESS:─────────────────1. Input Files: - Reduces map input I/O - Must be splittable or small files - Configure: InputFormat handles automatically2. Map Output (Intermediate): - Reduces shuffle traffic (BIG win!) - CPU overhead acceptable Configuration: mapreduce.map.output.compress=true mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec3. Final Output: - Saves HDFS space - Slows down if consumed by non-Hadoop tools Configuration: mapreduce.output.fileoutputformat.compress=true mapreduce.output.fileoutputformat.compress.codec= org.apache.hadoop.io.compress.GzipCodecRECOMMENDATION:──────────────ALWAYS compress intermediate data (map output):→ 30-50% reduction in shuffle time→ Minimal CPU overhead with Snappy/LZOExample:────────Configuration conf = new Configuration();conf.setBoolean("mapreduce.map.output.compress", true);conf.setClass("mapreduce.map.output.compress.codec", SnappyCodec.class, CompressionCodec.class);COMPRESSION IMPACT:──────────────────Uncompressed shuffle: 100 GBSnappy compressed: 40 GBNetwork time (1 Gbps): 100s → 40s (60% faster!)
Local Aggregation
COMBINER EFFECTIVENESS:──────────────────────Word Count Example:──────────────────WITHOUT Combiner:Map output: 10 million (word, 1) pairsShuffle: 10 million pairs transferredReduce input: 10 million pairsWITH Combiner:Map output: 10 million (word, 1) pairsAfter combiner: 100,000 (word, count) pairsShuffle: 100,000 pairs transferred (99% reduction!)Reduce input: 100,000 pairsWHEN COMBINER HELPS:───────────────────High Reduction Factor:• Many duplicate keys• Simple aggregation (sum, count, max)• Word count, click counting, log aggregationLow Reduction Factor:• Few duplicate keys• Unique IDs, timestamps• Combiner overhead not worth itCOMBINER USAGE PATTERNS:───────────────────────1. Same as Reducer (common): job.setCombinerClass(MyReducer.class);2. Different from Reducer: job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReducer.class);3. In-Mapper Combining (advanced): // Maintain HashMap in mapper // Emit aggregated results in cleanup()CONFIGURATION:─────────────mapreduce.job.combine.class = MyReducerCOMBINER FREQUENCY:──────────────────Combiner runs:• During spills (map side)• During merge (map side)• Possibly during shuffle merge (reduce side)→ May run 0, 1, or multiple times!→ Must be idempotent and associative
Balancing Reducer Load
PARTITIONING STRATEGIES:───────────────────────1. Hash Partitioning (Default): partition = hash(key) % numReducers Pros: Simple, uniform for random keys Cons: Poor for skewed keys2. Range Partitioning: partition based on key range Example: • Reducer 0: A-F • Reducer 1: G-M • Reducer 2: N-S • Reducer 3: T-Z Pros: Output is globally sorted Cons: Requires sampling, skew issues3. Custom Partitioning: public class FirstLetterPartitioner extends Partitioner<Text, IntWritable> { @Override public int getPartition(Text key, IntWritable value, int numPartitions) { // Partition by first letter char first = key.toString().toLowerCase().charAt(0); if (first >= 'a' && first <= 'm') { return 0 % numPartitions; } else { return 1 % numPartitions; } } }SKEW PROBLEM:────────────Unbalanced partitions:Reducer 0: 1 GB (finishes fast)Reducer 1: 10 GB (straggles)Reducer 2: 2 GBReducer 3: 1 GBJob completion time = slowest reducerSKEW SOLUTIONS:──────────────1. Better Partitioner: • Sample data first • Learn key distribution • Assign ranges to balance load2. Increase Number of Reducers: • More parallelism • Smaller data per reducer • job.setNumReduceTasks(20);3. Salting: • Add random suffix to skewed keys • key → key_0, key_1, key_2, ... • Process in two MapReduce rounds4. Combiners: • Reduce data before shuffle • Less data to partition
PATTERN: FilterGOAL: Keep only records matching criteriaUse Case: Extract error logs from all logsMAP FUNCTION:────────────map(offset, line): if line.contains("ERROR"): emit(line, null) // Don't emit non-error linesREDUCE FUNCTION:───────────────NONE (map-only job)job.setNumReduceTasks(0);EXAMPLE:───────Input:2024-01-24 10:00:00 INFO Started service2024-01-24 10:00:05 ERROR Connection failed2024-01-24 10:00:10 INFO Retrying...2024-01-24 10:00:15 ERROR TimeoutOutput:2024-01-24 10:00:05 ERROR Connection failed2024-01-24 10:00:15 ERROR TimeoutOPTIMIZATION:────────────• No reducer needed (map-only)• Each mapper writes directly to HDFS• Very efficient for simple filtering
Top N
Find Top N Items
PATTERN: Top NGOAL: Find N largest/smallest itemsUse Case: Top 10 most frequent wordsAPPROACH 1: In-Mapper Top N───────────────────────────map(offset, line): words = line.split() for word in words: localCounts[word] += 1 // In cleanup(): topN = localCounts.topN(10) for (word, count) in topN: emit(word, count)reduce(word, counts): totalCount = sum(counts) emit(word, totalCount)Final: Sort all reducer output, take top 10APPROACH 2: Two-Stage─────────────────────Job 1: Word Count → Output: All (word, count) pairsJob 2: Sort and Top N → Mapper: emit(count, word) // flip key-value → Reducer: emit top NAPPROACH 3: Single Reducer──────────────────────────map(offset, line): // Normal word count map for word in line.split(): emit(word, 1)reduce(word, counts): emit(sum(counts), word) // Emit (count, word) instead of (word, count)Configuration:• Set numReducers = 1• Total sort in reducer• Emit top NTRADE-OFFS:──────────Approach 1: Less data shuffled, but complexApproach 2: Simpler, but two jobsApproach 3: Simple, but single reducer bottleneck
Join
Joining Two Datasets
PATTERN: JoinGOAL: Combine records from two datasetsUse Case: Join users with their ordersDataset 1 (users.txt):user123,Aliceuser456,BobDataset 2 (orders.txt):order001,user123,100order002,user456,200order003,user123,50REDUCE-SIDE JOIN:────────────────map(filename, line): if filename == "users.txt": userId, name = line.split(',') emit(userId, ("USER", name)) else: // orders.txt orderId, userId, amount = line.split(',') emit(userId, ("ORDER", orderId, amount))reduce(userId, values): userName = null orders = [] for value in values: if value[0] == "USER": userName = value[1] else: // ORDER orders.append(value) for order in orders: emit(userId, userName, order)Output:user123,Alice,order001,100user123,Alice,order003,50user456,Bob,order002,200MAP-SIDE JOIN (if one dataset is small):───────────────────────────────────────Setup:• Load small dataset (users) into memory• Use Distributed Cachemap(offset, line): // Parse order orderId, userId, amount = line.split(',') // Lookup user in memory userName = usersMap.get(userId) emit(userId, userName + "," + orderId + "," + amount)No Reducer Needed!TRADE-OFFS:──────────Reduce-Side Join:✓ Works for any dataset size✗ Expensive shuffle✗ All data must go through reduceMap-Side Join:✓ Very efficient (no shuffle)✓ Happens in map phase✗ Requires small dataset✗ Dataset must fit in memory
Secondary Sort
Control Value Order in Reduce
PATTERN: Secondary SortGOAL: Control order of values arriving at reducerUse Case: Process user events in chronological orderPROBLEM:───────Default: Reducer receives values in ARBITRARY orderreduce(userId, events): // events might arrive as: [event3, event1, event2] // Want chronological: [event1, event2, event3]SOLUTION: Composite Key──────────────────────1. Create composite key: (userId, timestamp)2. Custom Partitioner: // Partition by userId only partition = hash(userId) % numReducers3. Custom Comparator: // Sort by userId, then timestamp compare (key1, key2): if key1.userId != key2.userId: return compare(key1.userId, key2.userId) else: return compare(key1.timestamp, key2.timestamp)4. Grouping Comparator: // Group by userId only compare(key1, key2): return compare(key1.userId, key2.userId)IMPLEMENTATION:──────────────public class CompositeKey implements WritableComparable { String userId; Long timestamp; @Override public int compareTo(CompositeKey other) { int cmp = userId.compareTo(other.userId); if (cmp != 0) return cmp; return timestamp.compareTo(other.timestamp); }}job.setPartitionerClass(UserPartitioner.class);job.setSortComparatorClass(CompositeKeyComparator.class);job.setGroupingComparatorClass(UserGroupingComparator.class);RESULT:──────Reducer receives:userId: user123values: [event1@t1, event2@t2, event3@t3] // sorted!
Distributed Cache
Sharing Read-Only Data
PATTERN: Distributed CacheGOAL: Share small files across all mappers/reducersUse Case: Load lookup table in map-side joinPROBLEM:───────Need small dataset available to all tasks:• IP → Country mapping• User ID → Name mapping• Product catalogDon't want to:• Embed in JAR (inflexible)• Read from HDFS for each record (slow)SOLUTION: Distributed Cache──────────────────────────1. Upload file to HDFS: hdfs dfs -put users.txt /shared/users.txt2. Add to job configuration: job.addCacheFile(new URI("/shared/users.txt"));3. In mapper setup, load into memory: public class MyMapper extends Mapper { Map<String, String> usersMap = new HashMap<>(); @Override protected void setup(Context context) { // Read cached file Path[] cacheFiles = context.getLocalCacheFiles(); BufferedReader reader = new BufferedReader( new FileReader(cacheFiles[0].toString())); String line; while ((line = reader.readLine()) != null) { String[] parts = line.split(","); usersMap.put(parts[0], parts[1]); } reader.close(); } @Override public void map(K key, V value, Context context) { // Use usersMap for lookups String userId = ...; String userName = usersMap.get(userId); ... } }DISTRIBUTED CACHE PROPERTIES:────────────────────────────✓ Files copied to each node once✓ Cached on local disk✓ Read-only (no modifications)✓ Efficient for small files (< 100MB)✗ Not for large datasets✗ All tasks share same copy (no per-task data)USE CASES:─────────• Lookup tables• Configuration files• Small dimension tables (star schema)• Stop words lists• Machine learning models
Google's Choice (C++):─────────────────────• Full control over performance• Tight integration with internal systems• Systems programming expertise available• Don't need cross-platform compatibilityHadoop's Choice (Java):──────────────────────• Write once, run anywhere (JVM)• Larger developer community• Easier to attract contributors• Faster development cycle• Good-enough performanceResult:──────Hadoop's Java choice enabled:✓ Rapid adoption across diverse environments✓ Rich ecosystem (Hive, Pig, HBase in Java)✓ Lower barrier to entry✗ Some performance overhead (GC pauses)
Community vs Control
Google's Approach:─────────────────• Publish papers, keep code internal• Full control over evolution• Optimize for Google's workloads• No backward compatibility burdenHadoop's Approach:─────────────────• Open source from day one• Community-driven development• Serve diverse use cases• Maintain backward compatibilityImpact:──────• Hadoop became industry standard• Google's ideas reached millions• Ecosystem innovations (Spark, Hive)• Multiple commercial distributions
Public vs Private Evolution
Google's Evolution (Opaque):───────────────────────────MapReduce (2004) ↓Flume (streaming, internal) ↓MillWheel (streaming) ↓Cloud Dataflow (public product)Likely deprecated internally, replaced by newer systems.Hadoop's Evolution (Transparent):─────────────────────────────────MapReduce (2006) ↓YARN (2012) - Generic resource management ↓Multiple frameworks: Spark, Flink, Tez ↓Modern: Spark dominates, MapReduce legacyEvery step documented and community-driven.Lessons:───────• Open evolution allows external innovation• Community found MapReduce limitations• Led to better alternatives (Spark)• Google's internal evolution hidden
Simple Model, Powerful Abstraction: Map and reduce are simple functions, but framework handles all complexity (distribution, fault tolerance, optimization)
Shuffle is the Bottleneck: Most optimization focuses on reducing shuffle data (combiners, compression, partitioning)
Data Locality is Critical: Moving computation to data (not vice versa) is fundamental to MapReduce efficiency
Fault Tolerance via Stateless Tasks: Re-execution works because tasks are stateless and deterministic
Speculative Execution Handles Stragglers: Don’t wait for slow tasks—run duplicates and take first result
Map-Only Jobs for Simple Cases: Not everything needs reduce—filtering, transformation can be map-only
Combiners are Free Performance: If your reduce is associative/commutative, always use combiner
Java Enabled the Ecosystem: Performance trade-off was worth it for portability and community growth
Best Practice: Keep enabled for most production workloads, but ensure tasks are idempotent.
System Design: Design a MapReduce job to find top 10 most frequent words
Expected Answer:Approach 1: Two-Job PipelineJob 1: Word Count
Map: (offset, line) → emit(word, 1) for each wordReduce: (word, [1,1,1,...]) → emit(word, count)Output: (word, count) pairs in HDFS
Job 2: Top 10
Map: (word, count) → emit(count, word) // flip key-valueReduce: - Single reducer (numReduceTasks=1) - Sort all (count, word) pairs descending - Emit top 10
Approach 2: Single Job with In-Mapper Aggregation
Map: setup(): Initialize HashMap<String, Integer> localCounts Initialize PriorityQueue<Pair> topN (size 10) map(offset, line): for word in line.split(): localCounts[word] += 1 cleanup(): for (word, count) in localCounts: if topN.size() < 10 or count > topN.peek(): topN.add((count, word)) if topN.size() > 10: topN.poll() for (count, word) in topN: emit(count, word)Reduce: - Single reducer - Maintain global topN heap - Emit final top 10
Trade-offs:
Approach 1: Simpler, reusable word count, but two jobs
Approach 2: More efficient, less shuffle data, but more complex
Optimizations:
Use combiner in Approach 1 to reduce shuffle
Consider top 100 per mapper, then top 10 in reducer (reduce network)
If only need approximate top 10, use sampling
Deep Dive: How would you optimize a MapReduce job that's running slowly?
Expected Answer:1. Identify BottleneckCheck job counters and logs:
Map time vs reduce time vs shuffle time
Data skew (some reducers much slower)
Spill counts (too many disk writes)
GC time (memory pressure)
2. Map Phase Optimization
Input Splits: Ensure 1 split = 1 block for locality
Combiner: Add combiner to reduce map output
Compression: Compress map output (Snappy)
Memory: Increase sort buffer (io.sort.mb)
Avoid Small Files: Combine small files before processing
3. Shuffle Optimization
Compression: Always compress intermediate data
Combiner: Reduces shuffle volume dramatically
Fetch Parallelism: Increase parallel copies
Memory: Increase shuffle buffer percentage
4. Reduce Phase Optimization
Number of Reducers:
Too few: Reducers become bottleneck
Too many: Overhead, small output files
Rule of thumb: 0.95 or 1.75 × (nodes × max containers per node)
Generic resource management for multiple frameworks
ApplicationMaster pattern
How YARN enables Spark, Flink, and other frameworks
We’ve mastered MapReduce, the original Hadoop processing model. Next, we’ll see how YARN generalized resource management to support any distributed application, not just MapReduce.
The shuffle phase is often the biggest bottleneck in MapReduce. Walk me through why, and how you would optimize it.
Strong Answer:The shuffle is expensive because it is the only phase that requires all-to-all network communication. Every reducer must pull data from every mapper. With M mappers and R reducers, there are M * R network connections. For a job with 10,000 mappers and 1,000 reducers, that is 10 million network transfers. Each transfer involves: reading from mapper local disk, network transmission, and writing to reducer local disk. The network and disk I/O during shuffle often dominates the total job runtime.Optimization strategies: First, use combiners aggressively. A combiner runs a local reduce on each mapper output before the shuffle, reducing the data volume that needs to be transferred. For word count, a combiner can reduce “hello:1, hello:1, hello:1” to “hello:3” on the mapper side, cutting shuffle data by 60-90% for skewed key distributions.Second, compress intermediate data. Snappy or LZO compression on mapper output reduces both disk I/O (spill files are smaller) and network transfer (less data to shuffle). The CPU cost of compression is almost always justified by the I/O savings.Third, tune the number of reducers. Too few reducers means each one processes too much data (straggler risk). Too many means excessive overhead from small file creation and scheduling. A common heuristic is to set reducers so each processes 1-2GB of data.Fourth, address key skew. If one key has disproportionately many values (the “celebrity problem”), one reducer becomes a straggler. Solutions include salting the key (appending a random suffix to distribute the key across multiple reducers) or using a two-pass approach (first pass to count, second pass to redistribute).Follow-up: Why did Spark largely replace MapReduce for most workloads?Spark eliminated the two biggest performance problems with MapReduce. First, MapReduce materializes all intermediate data to disk between map and reduce phases. Spark keeps intermediate data in memory (RDDs/DataFrames), which is 10-100x faster for iterative algorithms that reprocess the same data. Second, MapReduce requires a separate job for every stage. A multi-stage pipeline (filter -> join -> aggregate) requires three MapReduce jobs, each with its own shuffle. Spark executes the entire pipeline as a single DAG with pipelined stages, reducing the number of shuffles and eliminating unnecessary disk writes.
Explain speculative execution in MapReduce. When does it help, and when does it actually hurt performance?
Strong Answer:Speculative execution detects “straggler” tasks — tasks that are running significantly slower than the average — and launches duplicate copies on different nodes. Whichever copy finishes first is used, and the other is killed. This mitigates the “tail latency” problem where one slow task delays an entire job.It helps when stragglers are caused by transient issues: a node with a degraded disk, temporary network congestion, or competing workloads on a shared cluster. In these cases, the duplicate task on a healthy node finishes quickly, and the overall job completes much sooner.It hurts in three scenarios. First, when the slowness is caused by data skew rather than hardware problems. If one map task has 10x more data than others, launching a speculative copy does not help — the copy has the same amount of data to process. Second, when the cluster is fully utilized. Speculative tasks consume resources that could be used for other jobs. On a busy cluster, speculative execution can cause a cascade where every job launches speculative tasks, consuming resources and making all jobs slower. Third, for tasks with side effects. If a map task writes to an external system (database, message queue), the speculative copy may cause duplicate writes.The default in Hadoop is speculative execution enabled for map tasks and disabled for reduce tasks (because reduce tasks are more expensive to re-run). In practice, many production clusters disable speculative execution and instead invest in better cluster monitoring to identify and fix hardware problems proactively.Follow-up: How would you detect whether speculative execution is helping or hurting on a production cluster?Monitor two metrics: the speculative task launch rate and the speculative task kill rate. If most speculative tasks are being killed (meaning the original task finished first anyway), speculative execution is wasting resources. If speculative tasks are winning frequently, it is saving time. Also compare total job runtime with and without speculative execution on representative workloads. The break-even point is when the resource cost of speculative tasks equals the time savings from faster stragglers.
MapReduce forces everything into a map-then-reduce pattern. What workloads does this fit poorly, and what alternatives emerged?
Strong Answer:MapReduce fits poorly for three categories of workloads. First, iterative algorithms (machine learning, graph processing). PageRank, for example, requires multiple passes over the data, and each iteration is a separate MapReduce job that reads from and writes to HDFS. The I/O overhead of reading and writing the full dataset for each iteration makes MapReduce 10-100x slower than in-memory frameworks for iterative workloads.Second, interactive queries. A SQL query like “SELECT COUNT(*) FROM sales WHERE region = ‘US’” should take seconds, not minutes. MapReduce has high job startup latency (allocating containers, initializing JVMs, scheduling tasks) that makes sub-minute queries impractical.Third, stream processing. MapReduce is fundamentally batch-oriented — it processes a fixed input dataset and produces a fixed output. For continuous event streams (click streams, sensor data, log monitoring), you need a framework that processes records as they arrive.Alternatives that emerged: Apache Spark (in-memory iterative processing with DAG execution), Apache Tez (DAG-based job execution that eliminates unnecessary disk materializations between stages), Apache Flink (true stream processing with event-time semantics), and Presto/Trino (distributed SQL engine for interactive queries). All of these run on YARN, sharing the same cluster resources as MapReduce.Follow-up: Given all these alternatives, is there any workload where MapReduce is still the best choice in 2026?MapReduce is still reasonable for simple, one-pass ETL jobs on very large datasets where the overhead of MapReduce startup is negligible compared to the processing time. For example, a job that reads 100TB of raw logs, filters and transforms each record independently, and writes the output to HDFS — this is an embarrassingly parallel workload where MapReduce map-only jobs (no reduce phase needed) work well and are simpler to debug than Spark. But in practice, most organizations have standardized on Spark because it handles both simple and complex workloads, and the operational cost of maintaining two processing frameworks is not worth the marginal simplicity of MapReduce for simple jobs.