MapReduce当中Partitioner的用法

Partitioner的用法:
马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。如果现在我们的需求变成,输出放在两个文件当中,按照关键字的首个字母的26个字母来分,头13个放在一个文件当中,以此类推,这时我们就要用到partition的技术。partition和reduce的数量是一样的。



package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
 
public class WordCountPartitionerMarktw {
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("key is 马克-to-win @ 马克java社区:"+key.toString()+" value is "+value.toString());
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
/*Partitioner<KEY, VALUE>:The total number of partitions
is the same as the number of reduce tasks for the job. Hence this controls
which of the reduce tasks the intermediate key (and hence the
record) is sent for reduction.
    @param key the key to be partioned.
    @param value the entry value.
    @param numPartitions the total number of partitions.
    @return the partition number for the <code>key</code>.
*/
(购买完整教程)
        public int getPartition(Text key, IntWritable value, int numPartitions)
        // numPartitions参数值从主函数中的job.setNumReduceTasks()获得马克-to-win @ 马克java社区:
        {
            int distancemark_to_win=26/numPartitions;
            int result ;
            if((key.charAt(0) - 'a')<distancemark_to_win)
                result=0;
            else
               result = 1;
            return result;
        }
    }

     
    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
            int sum = 0;
            for (IntWritable val : values) {
                System.out.println("val is "+val.toString());
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "word count");
        job.setJarByClass(WordCountPartitionerMarktw.class);
        job.setNumReduceTasks(2);//定义2个reduce
(购买完整教程)
        job.setMapperClass(TokenizerMapper.class);
 //       job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));

        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}
生成两个文件, 一个内容是:
a    2
hello    4
mark    1

另一个内容是:
to    1
win    1




输出结果:
mytest hadoop successful
 INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
 INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
 WARN - No job jar file set.  User classes may not be found. See Job or Job#setJar(String).
 INFO - Total input paths to process : 1
 INFO - number of splits:1
 INFO - Submitting tokens for job: job_local2073634211_0001
 INFO - The url to track the job: http://localhost:8080/
 INFO - Running job: job_local2073634211_0001
 INFO - OutputCommitter set in config null
 INFO - File Output Committer Algorithm version is 1
 INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
 INFO - Waiting for map tasks
 INFO - Starting task: attempt_local2073634211_0001_m_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO - Job job_local2073634211_0001 running in uber mode : false
 INFO -  map 0% reduce 0%
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@1e1dd75
 INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
 INFO - (EQUATOR) 0 kvi 26214396(104857584)
 INFO - mapreduce.task.io.sort.mb: 100
 INFO - soft limit at 83886080
 INFO - bufstart = 0; bufvoid = 104857600
 INFO - kvstart = 26214396; length = 6553600
 INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:0 value is hello a hello win
key is 马克-to-win @ 马克java社区:19 value is hello a to
key is 马克-to-win @ 马克java社区:31 value is hello mark
 INFO -
 INFO - Starting flush of map output
 INFO - Spilling map output
 INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
 INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
 INFO - Finished spill 0
 INFO - Task:attempt_local2073634211_0001_m_000000_0 is done. And is in the process of committing
 INFO - map
 INFO - Task 'attempt_local2073634211_0001_m_000000_0' done.
 INFO - Finishing task: attempt_local2073634211_0001_m_000000_0
 INFO - map task executor complete.
 INFO - Waiting for reduce tasks
 INFO - Starting task: attempt_local2073634211_0001_r_000000_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@c0b866
 INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@1b495c9
 INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
 INFO - attempt_local2073634211_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
 INFO - localfetcher#1 about to shuffle output of map attempt_local2073634211_0001_m_000000_0 decomp: 77 len: 81 to MEMORY
 INFO - Read 77 bytes from map-output for attempt_local2073634211_0001_m_000000_0
 INFO - closeInMemoryFile -> map-output of size: 77, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->77
 INFO - EventFetcher is interrupted.. Returning
 INFO - 1 / 1 copied.
 INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 73 bytes
 INFO - Merged 1 segments, 77 bytes to disk to satisfy reduce memory limit
 INFO - Merging 1 files, 81 bytes from disk
 INFO - Merging 0 segments, 0 bytes from memory into reduce
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 73 bytes
 INFO - 1 / 1 copied.
 INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is 马克-to-win @ 马克java社区:a
val is 1
val is 1
reduce key is 马克-to-win @ 马克java社区:hello
val is 1
val is 1
val is 1
val is 1
reduce key is 马克-to-win @ 马克java社区:mark
val is 1
 INFO -  map 100% reduce 0%
 INFO - Task:attempt_local2073634211_0001_r_000000_0 is done. And is in the process of committing
 INFO - 1 / 1 copied.
 INFO - Task attempt_local2073634211_0001_r_000000_0 is allowed to commit now
 INFO - Saved output of task 'attempt_local2073634211_0001_r_000000_0' to hdfs://localhost:9000/output2/_temporary/0/task_local2073634211_0001_r_000000
 INFO - reduce > reduce
 INFO - Task 'attempt_local2073634211_0001_r_000000_0' done.
 INFO - Finishing task: attempt_local2073634211_0001_r_000000_0
 INFO - Starting task: attempt_local2073634211_0001_r_000001_0
 INFO - File Output Committer Algorithm version is 1
 INFO - ProcfsBasedProcessTree currently is supported only on Linux.
 INFO -  map 100% reduce 100%
 INFO -  Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@dec177
 INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@d0506e
 INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
 INFO - localfetcher#2 about to shuffle output of map attempt_local2073634211_0001_m_000000_0 decomp: 21 len: 25 to MEMORY
 INFO - attempt_local2073634211_0001_r_000001_0 Thread started: EventFetcher for fetching Map Completion Events
 INFO - Read 21 bytes from map-output for attempt_local2073634211_0001_m_000000_0
 INFO - closeInMemoryFile -> map-output of size: 21, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->21
 INFO - EventFetcher is interrupted.. Returning
 INFO - 1 / 1 copied.
 INFO - finalMerge called with 1 in-memory map-outputs and 0 on-disk map-outputs
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 16 bytes
 INFO - Merged 1 segments, 21 bytes to disk to satisfy reduce memory limit
 INFO - Merging 1 files, 25 bytes from disk
 INFO - Merging 0 segments, 0 bytes from memory into reduce
 INFO - Merging 1 sorted segments
 INFO - Down to the last merge-pass, with 1 segments left of total size: 16 bytes
 INFO - 1 / 1 copied.
reduce key is 马克-to-win @ 马克java社区:to
val is 1
reduce key is 马克-to-win @ 马克java社区:win
val is 1
 INFO -  map 100% reduce 50%
 INFO - Task:attempt_local2073634211_0001_r_000001_0 is done. And is in the process of committing
 INFO - 1 / 1 copied.
 INFO - Task attempt_local2073634211_0001_r_000001_0 is allowed to commit now
 INFO - Saved output of task 'attempt_local2073634211_0001_r_000001_0' to hdfs://localhost:9000/output2/_temporary/0/task_local2073634211_0001_r_000001
 INFO - reduce > reduce
 INFO - Task 'attempt_local2073634211_0001_r_000001_0' done.
 INFO - Finishing task: attempt_local2073634211_0001_r_000001_0
 INFO - reduce task executor complete.
 INFO -  map 100% reduce 100%
 INFO - Job job_local2073634211_0001 completed successfully
 INFO - Counters: 35
    File System Counters
        FILE: Number of bytes read=1057
        FILE: Number of bytes written=906358
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=123
        HDFS: Number of bytes written=49
        HDFS: Number of read operations=24
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=9
    Map-Reduce Framework
        Map input records=3
        Map output records=9
        Map output bytes=76
        Map output materialized bytes=106
        Input split bytes=97
        Combine input records=0
        Combine output records=0
        Reduce input groups=5
        Reduce shuffle bytes=106
        Reduce input records=9
        Reduce output records=5
        Spilled Records=18
        Shuffled Maps =2
        Failed Shuffles=0
        Merged Map outputs=2
        GC time elapsed (ms)=60
        Total committed heap usage (bytes)=365457408
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters
        Bytes Read=41
    File Output Format Counters
        Bytes Written=30