MapReduce当中全局变量的用法

马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。如想传递变量,程序可以在main函数中,利用Congfiguraion类的set函数将一些简单的数据结构放到到Congfiguraion中,map或reduce task任务启动的过程中(比如setup函数)通过Configuration类的get函数读取即可。

一切的代码和上面都一样,只是加了一个全局变量的特性。



package com;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GlobalTestMark_to_win {

    public static class TokenizerMapper extends Mapper<Object, Text, Text, DoubleWritable> {
        String name;
        protected void setup(Context context)
                throws IOException, InterruptedException {
                //从全局配置获取配置参数
                Configuration conf = context.getConfiguration();
                name = conf.get("name"); //这样就拿到了,但不能设置。想设置用其他方法。这里不深究了
        }
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
*/
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            System.out.println("name is "+name+"key is " + key.toString() + " value is " + value.toString());
            String line = value.toString();
            String[] fields = line.split(",");
            String orderId = fields[0];
            double amount = Double.parseDouble(fields[2]);
            DoubleWritable amountDouble = new DoubleWritable(amount);
            context.write(new Text(orderId), amountDouble);
        }
    }

    public static class IntSumReducer extends Reducer<Text, DoubleWritable, Text, DoubleWritable> {
        String name1;
        DoubleWritable resultDouble = new DoubleWritable(0.0);
        protected void setup(Context context)
                throws IOException, InterruptedException {
                //从全局配置获取配置参数
                Configuration conf = context.getConfiguration();
                name1 = conf.get("name"); //这样就拿到了
        }
        public void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            System.out.println("name1 is "+name1+"reduce key is " + key.toString());
            double max = Double.MIN_VALUE;
            for (DoubleWritable v2 : values) {
                if (v2.get() > max) {
                    max = v2.get();
                }
            }
            resultDouble.set(max);
            context.write(key, resultDouble);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("name", "马克-to-win");
        Job job = new Job(conf, "word count");
        job.setJarByClass(GlobalTestMark_to_win.class);
        job.setMapperClass(TokenizerMapper.class);
        // job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }

}





map里的输出是:

name is 马克-to-winkey is 0 value is o1,p2,250.0
name is 马克-to-winkey is 13 value is o2,p3,500.0
name is 马克-to-winkey is 26 value is o2,p4,100.0
name is 马克-to-winkey is 39 value is o2,p5,700.0
name is 马克-to-winkey is 52 value is o3,p1,150.0
name is 马克-to-winkey is 65 value is o1,p1,200.0

reduce里的输出是:
name1 is 马克-to-winreduce key is o1
name1 is 马克-to-winreduce key is o2
name1 is 马克-to-winreduce key is o3