MapReduce当中的reduce当中的cleanup的用法
reduce当中的cleanup的用法:
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:上面的topN是解决每个组里的topN,比如每个订单中的最小的。但如果需要横向的比较所有的key(初学者忽略:cleanup方法慎用, 如果所有的key的数据巨大量怎么办?Map map = new HashMap();内存都不够了,所以考虑多步mapreduce或聪明的算法做法,每处理完一个key,就删除一些没用的空间,比如把不是topN的给删除了,这样可以释放一部分空间),选出topN,得用cleanup。
马克-to-win @ 马克java社区:从现在开始,我们讲一些特殊用法,我们知道,map是读一行执行一次,reduce是对每一key,或一组,执行一次。但是如果需求是当我们得到全部数据之后,需要再做一些处理,之后再输出怎么办?这时候setUp或cleanUp就登场了,他们像servlet的init和 destroy一样都只执行一次。map和reduce都有setUp或cleanUp,原理一样。我们只拿reduce做例子。
马克-to-win @ 马克java社区:这样对于最终数据的过滤筛选和输出步骤,要放在cleanUp中。前面我们的例子都是一行一行(对于map),一组一组(对于 reduce)输出,借助cleanup,我们可以全部拿到数据,完全按照java过去的算法,最后过滤输出。下面我们用它解决topN问题。
还以wordcount为例,求出单词出现数量前三名。
package com;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ReduceSetupTestMark_to_win {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private IntWritable one = new IntWritable(1);
private Text word = new Text();
/*
hello a hello win
hello a to
hello mark
*/
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is " + key.toString() + " value is " + value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
Map map = new HashMap();
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is " + key.toString());
int sum = 0;
for (IntWritable val : values) {
System.out.println("val is " + val.toString());
sum += val.get();
}
String keyString = key.toString();
map.put(keyString, sum);
// System.out.println("val is " + val.toString());
// result.set(sum);
// context.write(key, result);
}
protected void cleanup(Context context) throws IOException, InterruptedException {
/* 放在一个List 当中, 之后排这个list, */
/*1)List<Integer> list = new ArrayList<Integer>();
2)public ArrayList(Collection<? extends E> c)
有两种类型的构造函数。
*/
List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<String, Integer>>() {
public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
return -1 * (o1.getValue().compareTo(o2.getValue()));
}
});
for (int i = 0; i < 3; i++) {
context.write(new Text(list.get(i).getKey()), new IntWritable(list.get(i).getValue()));
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "word count");
job.setJarByClass(ReduceSetupTest.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/cleanup.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("mytest hadoop successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
输出结果:
reduce key is a
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
输出的文件 :
hello 4
a 2
to 1