MapReduce当中Partitioner的用法
Partitioner的用法:
马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。如果现在我们的需求变成,输出放在两个文件当中,按照关键字的首个字母的26个字母来分,头13个放在一个文件当中,以此类推,这时我们就要用到partition的技术。partition和reduce的数量是一样的。
package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountPartitionerMarktw {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is 马克-to-win @ 马克java社区:"+key.toString()+" value is "+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
/*Partitioner<KEY, VALUE>:The total number of partitions
is the same as the number of reduce tasks for the job. Hence this controls
which of the reduce tasks the intermediate key (and hence the
record) is sent for reduction.
@param key the key to be partioned.
@param value the entry value.
@param numPartitions the total number of partitions.
@return the partition number for the <code>key</code>.
*/
(购买完整教程)
public int getPartition(Text key, IntWritable value, int numPartitions)
// numPartitions参数值从主函数中的job.setNumReduceTasks()获得马克-to-win @ 马克java社区:
{
int distancemark_to_win=26/numPartitions;
int result ;
if((key.charAt(0) - 'a')<distancemark_to_win)
result=0;
else
result = 1;
return result;
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
int sum = 0;
for (IntWritable val : values) {
System.out.println("val is "+val.toString());
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountPartitionerMarktw.class);
job.setNumReduceTasks(2);//定义2个reduce
(购买完整教程)
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
生成两个文件, 一个内容是:
a 2
hello 4
mark 1
另一个内容是:
to 1
win 1
输出结果:
mytest hadoop successful
INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 1
INFO - number of splits:1
INFO - Submitting tokens for job: job_local2073634211_0001
INFO - The url to track the job: http://localhost:8080/
INFO - Running job: job_local2073634211_0001
INFO - OutputCommitter set in config null
INFO - File Output Committer Algorithm version is 1
INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO - Waiting for map tasks
INFO - Starting task: attempt_local2073634211_0001_m_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Job job_local2073634211_0001 running in uber mode : false
INFO - map 0% reduce 0%
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@1e1dd75
INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:0 value is hello a hello win
key is 马克-to-win @ 马克java社区:19 value is hello a to
key is 马克-to-win @ 马克java社区:31 value is hello mark
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
INFO - Finished spill 0
INFO - Task:attempt_local2073634211_0001_m_000000_0 is done. And is in the process of committing
INFO - map
INFO - Task 'attempt_local2073634211_0001_m_000000_0' done.
INFO - Finishing task: attempt_local2073634211_0001_m_000000_0
INFO - map task executor complete.
INFO - Waiting for reduce tasks
INFO - Starting task: attempt_local2073634211_0001_r_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@c0b866
INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@1b495c9
INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO - attempt_local2073634211_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO - localfetcher#1 about to shuffle output of map attempt_local2073634211_0001_m_000000_0 decomp: 77 len: 81 to MEMORY
INFO - Read 77 bytes from map-output for attempt_local2073634211_0001_m_000000_0
INFO - closeInMemoryFile -> map-output of size: 77, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->77
INFO - EventFetcher is interrupted.. Returning
INFO - 1 / 1 copied.
INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 73 bytes
INFO - Merged 1 segments, 77 bytes to disk to satisfy reduce memory limit
INFO - Merging 1 files, 81 bytes from disk
INFO - Merging 0 segments, 0 bytes from memory into reduce
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 73 bytes
INFO - 1 / 1 copied.
INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is 马克-to-win @ 马克java社区:a
val is 1
val is 1
reduce key is 马克-to-win @ 马克java社区:hello
val is 1
val is 1
val is 1
val is 1
reduce key is 马克-to-win @ 马克java社区:mark
val is 1
INFO - map 100% reduce 0%
INFO - Task:attempt_local2073634211_0001_r_000000_0 is done. And is in the process of committing
INFO - 1 / 1 copied.
INFO - Task attempt_local2073634211_0001_r_000000_0 is allowed to commit now
INFO - Saved output of task 'attempt_local2073634211_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local2073634211_0001_r_000000
INFO - reduce > reduce
INFO - Task 'attempt_local2073634211_0001_r_000000_0' done.
INFO - Finishing task: attempt_local2073634211_0001_r_000000_0
INFO - Starting task: attempt_local2073634211_0001_r_000001_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - map 100% reduce 100%
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@dec177
INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@d0506e
INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO - localfetcher#2 about to shuffle output of map attempt_local2073634211_0001_m_000000_0 decomp: 21 len: 25 to MEMORY
INFO - attempt_local2073634211_0001_r_000001_0 Thread started: EventFetcher for fetching Map Completion Events
INFO - Read 21 bytes from map-output for attempt_local2073634211_0001_m_000000_0
INFO - closeInMemoryFile -> map-output of size: 21, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->21
INFO - EventFetcher is interrupted.. Returning
INFO - 1 / 1 copied.
INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 16 bytes
INFO - Merged 1 segments, 21 bytes to disk to satisfy reduce memory limit
INFO - Merging 1 files, 25 bytes from disk
INFO - Merging 0 segments, 0 bytes from memory into reduce
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 16 bytes
INFO - 1 / 1 copied.
reduce key is 马克-to-win @ 马克java社区:to
val is 1
reduce key is 马克-to-win @ 马克java社区:win
val is 1
INFO - map 100% reduce 50%
INFO - Task:attempt_local2073634211_0001_r_000001_0 is done. And is in the process of committing
INFO - 1 / 1 copied.
INFO - Task attempt_local2073634211_0001_r_000001_0 is allowed to commit now
INFO - Saved output of task 'attempt_local2073634211_0001_r_000001_0' to hdfs://localhost:9000/output2/_temporary/0/task_local2073634211_0001_r_000001
INFO - reduce > reduce
INFO - Task 'attempt_local2073634211_0001_r_000001_0' done.
INFO - Finishing task: attempt_local2073634211_0001_r_000001_0
INFO - reduce task executor complete.
INFO - map 100% reduce 100%
INFO - Job job_local2073634211_0001 completed successfully
INFO - Counters: 35
File System Counters
FILE: Number of bytes read=1057
FILE: Number of bytes written=906358
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=123
HDFS: Number of bytes written=49
HDFS: Number of read operations=24
HDFS: Number of large read operations=0
HDFS: Number of write operations=9
Map-Reduce Framework
Map input records=3
Map output records=9
Map output bytes=76
Map output materialized bytes=106
Input split bytes=97
Combine input records=0
Combine output records=0
Reduce input groups=5
Reduce shuffle bytes=106
Reduce input records=9
Reduce output records=5
Spilled Records=18
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=60
Total committed heap usage (bytes)=365457408
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=41
File Output Format Counters
Bytes Written=30