MapReduce当中Combiner的用法
马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。在上一章的helloworld例子中,每一个map都可能会产生大量的本地输出,这些输出会通过网络到达reducer端,这样会非常浪费带宽。解决这个问题可以通过Combiner。Combiner的作用就是对map端的输出先做一次合并,是 MapReduce的一种优化手段之一。
package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountCombinerMark_to_win {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
/*org.apache.hadoop.mapreduce.Mapper.Context,java.lang.InterruptedException, 想看map的源代码,按control,点击,出现Attach Source Code,点击External Location/External File,找到源代码,就在Source目录下*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString()); StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
System.out.println("combiner key is "+key.toString());
int sum = 0;
for (IntWritable val : values)
{
System.out.println("combine val is "+val.toString());
sum += val.get();//对相同的单词数量进行累加
}
result.set(sum);
context.write(key, result);
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is "+key.toString());
int sum = 0;
for (IntWritable val : values) {
System.out.println("val is "+val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountCombinerMark_to_win.class);
job.setMapperClass(TokenizerMapper.class);
(购买完整教程)
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
输出结果:
mytest hadoop successful
INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 1
INFO - number of splits:1
INFO - Submitting tokens for job: job_local1963823567_0001
INFO - The url to track the job: http://localhost:8080/
INFO - OutputCommitter set in config null
INFO - Running job: job_local1963823567_0001
INFO - File Output Committer Algorithm version is 1
INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO - Waiting for map tasks
INFO - Starting task: attempt_local1963823567_0001_m_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@af988b
INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello a hello win
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320319 value is hello a to
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320331 value is hello mark
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
combiner key is a
combine val is 1
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combine val is 1
combine val is 1
combiner key is mark
combine val is 1
combiner key is to
combine val is 1
combiner key is win
combine val is 1
INFO - Finished spill 0
INFO - Task:attempt_local1963823567_0001_m_000000_0 is done. And is in the process of committing
INFO - map
INFO - Task 'attempt_local1963823567_0001_m_000000_0' done.
INFO - Finishing task: attempt_local1963823567_0001_m_000000_0
INFO - map task executor complete.
INFO - Waiting for reduce tasks
INFO - Starting task: attempt_local1963823567_0001_r_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Job job_local1963823567_0001 running in uber mode : false
INFO - map 100% reduce 0%
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@7ec6b2
INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@15fb15a
INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO - attempt_local1963823567_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO - localfetcher#1 about to shuffle output of map attempt_local1963823567_0001_m_000000_0 decomp: 52 len: 56 to MEMORY
INFO - Read 52 bytes from map-output for attempt_local1963823567_0001_m_000000_0
INFO - closeInMemoryFile -> map-output of size: 52, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->52
INFO - EventFetcher is interrupted.. Returning
INFO - 1 / 1 copied.
INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 48 bytes
INFO - Merged 1 segments, 52 bytes to disk to satisfy reduce memory limit
INFO - Merging 1 files, 56 bytes from disk
INFO - Merging 0 segments, 0 bytes from memory into reduce
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 48 bytes
INFO - 1 / 1 copied.
INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
val is 2
reduce key is hello
val is 4
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
INFO - Task:attempt_local1963823567_0001_r_000000_0 is done. And is in the process of committing
INFO - 1 / 1 copied.
INFO - Task attempt_local1963823567_0001_r_000000_0 is allowed to commit now
INFO - Saved output of task 'attempt_local1963823567_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local1963823567_0001_r_000000
INFO - reduce > reduce
INFO - Task 'attempt_local1963823567_0001_r_000000_0' done.
INFO - Finishing task: attempt_local1963823567_0001_r_000000_0
INFO - reduce task executor complete.
INFO - map 100% reduce 100%
INFO - Job job_local1963823567_0001 completed successfully
INFO - Counters: 35
File System Counters
FILE: Number of bytes read=454
FILE: Number of bytes written=604038
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=82
HDFS: Number of bytes written=30
HDFS: Number of read operations=13
HDFS: Number of large read operations=0
HDFS: Number of write operations=4
Map-Reduce Framework
Map input records=3
Map output records=9
Map output bytes=76
Map output materialized bytes=56
Input split bytes=97
Combine input records=9
Combine output records=5
Reduce input groups=5
Reduce shuffle bytes=56
Reduce input records=5
Reduce output records=5
Spilled Records=10
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=50
Total committed heap usage (bytes)=243384320
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=41
File Output Format Counters
Bytes Written=30
当两个map和combiner相遇:
package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountTwoFileCombiner {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class CombinerClass extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException
{
System.out.println("combiner key is "+key.toString());
int sum = 0;
for (IntWritable val : values)
{
System.out.println("combine val is "+val.toString());
sum += val.get();//对相同的单词数量进行累加
}
result.set(sum);
context.write(key, result);
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is "+key.toString());
int sum = 0;
for (IntWritable val : values) {
System.out.println("reduce val is "+val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountTwoFileCombiner.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(CombinerClass.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README1.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
输出结果:
INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 2
INFO - number of splits:2
INFO - Submitting tokens for job: job_local244211701_0001
INFO - The url to track the job: http://localhost:8080/
INFO - Running job: job_local244211701_0001
INFO - OutputCommitter set in config null
INFO - File Output Committer Algorithm version is 1
INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO - Waiting for map tasks
INFO - Starting task: attempt_local244211701_0001_m_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@9a1b89
INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello a hello win
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320319 value is hello a to
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320331 value is hello mark
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
combiner key is a
combine val is 1
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combine val is 1
combine val is 1
combiner key is mark
combine val is 1
combiner key is to
combine val is 1
combiner key is win
combine val is 1
INFO - Finished spill 0
INFO - Task:attempt_local244211701_0001_m_000000_0 is done. And is in the process of committing
INFO - map
INFO - Task 'attempt_local244211701_0001_m_000000_0' done.
INFO - Finishing task: attempt_local244211701_0001_m_000000_0
INFO - Starting task: attempt_local244211701_0001_m_000001_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@147ad13
INFO - Processing split: hdfs://localhost:9000/README1.txt:0+28
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:732030 value is hello zhangsan
key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:7320316 value is hello a lisi
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 48; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
combiner key is a
combine val is 1
combiner key is hello
combine val is 1
combine val is 1
combiner key is lisi
combine val is 1
combiner key is zhangsan
combine val is 1
INFO - Finished spill 0
INFO - Job job_local244211701_0001 running in uber mode : false
INFO - Task:attempt_local244211701_0001_m_000001_0 is done. And is in the process of committing
INFO - map 50% reduce 0%
INFO - map
INFO - Task 'attempt_local244211701_0001_m_000001_0' done.
INFO - Finishing task: attempt_local244211701_0001_m_000001_0
INFO - map task executor complete.
INFO - Waiting for reduce tasks
INFO - Starting task: attempt_local244211701_0001_r_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@1a2d165
INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@c45226
INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO - attempt_local244211701_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO - localfetcher#1 about to shuffle output of map attempt_local244211701_0001_m_000000_0 decomp: 52 len: 56 to MEMORY
INFO - Read 52 bytes from map-output for attempt_local244211701_0001_m_000000_0
INFO - closeInMemoryFile -> map-output of size: 52, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->52
INFO - localfetcher#1 about to shuffle output of map attempt_local244211701_0001_m_000001_0 decomp: 48 len: 52 to MEMORY
INFO - Read 48 bytes from map-output for attempt_local244211701_0001_m_000001_0
INFO - closeInMemoryFile -> map-output of size: 48, inMemoryMapOutputs.size() -> 2, commitMemory -> 52, usedMemory ->100
INFO - EventFetcher is interrupted.. Returning
INFO - 2 / 2 copied.
INFO - finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
INFO - Merging 2 sorted segments
INFO - Down to the last merge-pass, with 2 segments left of total size: 92 bytes
INFO - Merged 2 segments, 100 bytes to disk to satisfy reduce memory limit
INFO - Merging 1 files, 102 bytes from disk
INFO - Merging 0 segments, 0 bytes from memory into reduce
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 94 bytes
INFO - 2 / 2 copied.
INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
reduce val is 1
reduce val is 2
reduce key is hello
reduce val is 4
reduce val is 2
reduce key is lisi
reduce val is 1
reduce key is mark
reduce val is 1
reduce key is to
reduce val is 1
reduce key is win
reduce val is 1
reduce key is zhangsan
reduce val is 1
INFO - Task:attempt_local244211701_0001_r_000000_0 is done. And is in the process of committing
INFO - 2 / 2 copied.
INFO - Task attempt_local244211701_0001_r_000000_0 is allowed to commit now
INFO - Saved output of task 'attempt_local244211701_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local244211701_0001_r_000000
INFO - reduce > reduce
INFO - Task 'attempt_local244211701_0001_r_000000_0' done.
INFO - Finishing task: attempt_local244211701_0001_r_000000_0
INFO - reduce task executor complete.
INFO - map 100% reduce 100%
INFO - Job job_local244211701_0001 completed successfully
INFO - Counters: 35
File System Counters
FILE: Number of bytes read=1512
FILE: Number of bytes written=902094
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=179
HDFS: Number of bytes written=48
HDFS: Number of read operations=28
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=5
Map output records=14
Map output bytes=124
Map output materialized bytes=108
Input split bytes=195
Combine input records=14
Combine output records=9
Reduce input groups=7
Reduce shuffle bytes=108
Reduce input records=9
Reduce output records=7
Spilled Records=18
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=63
Total committed heap usage (bytes)=460161024
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=69
File Output Format Counters
Bytes Written=48