通过剖析源码单步调试详解MapReduce分组group遍历

通过剖析源码单步调试详解MapReduce分组group遍历:
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:mapreduce的group知识点是最难理解的,本小节将通过仔细剖析源码,单步调试,来详解之。

另外注意:数据文件写时一定注意:结尾不能有回车和空格,通过在map里面加断点,F8(resume),一轮一轮,调试一行一行的数据,才发现最后一行数据出毛病了,只有是多了一个换行符的毛病。



现在需求变成:
源文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0


文件输出结果:(每组中最小的)
o1    200.0
o2    100.0
o3    150.0


package com;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/*一般情况下,OrderBean必须是一个WritableComparable,因为MyGroupComparato的构造函数:super(OrderBean.class, true);要求是个 WritableComparable*/

public class OrderBean implements WritableComparable<OrderBean>{
    private String orderId;
    private Double amount;

    public OrderBean() {
    }

    public OrderBean(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public void set(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    @Override
    public int compareTo(OrderBean o) {
/*这里必须加下面这段比较规则(下面的4 句话),这样就成为1)o1,250,2)o1, 200, 3)o2, 700,4) o2,500,5)o2,100, 6) o3,150,
  准确的这6 个顺序的进入reduce, 才可以分组。如果像下面一样return 45;则顺序是乱的。则进入的顺序和源文件一样,是1)o1,250,2)o2,500, 3)o2, 100,4)o2,700, 5) o3,150, 6)o1, 200,这时就错了,
分组那里全都乱了。因为有一个nextKeyIsSame来一个一个key测试是否为一个组的, 所以同样的key必须排在一起。输出和源文件很像,成了4组, 完全是不对的。

o1    250.0
o2    700.0
o3    150.0
o1    200.0

原因见视频。




那为什么要-Double.compare(this.getAmount(), o.getAmount());呢? 为什么也要比较一下amount呢?因为 你的需求是找出每个key最小的amount,
所以在reduce中什么都不干, 只要遍历一下就行, 自然轮一圈, 输出的就是最小的。比如如果reduce中什么都不干, 也不遍历, 输出的就是最大的。
如果没有-Double.compare(this.getAmount(), o.getAmount());key能挨一块, 但后边的amount的顺序就是不定的了。

但是如果需求改了,只要把一组的全部输出就行, 那我们就可以不用加  return -Double.compare(this.getAmount(), o.getAmount());这句话。

结论就是:OrderBean里面compareTo就是把是一组的对象都排在一起。需要按顺序就按,没必要就没必要。
*/
        int cmp = this.getOrderId().compareTo(o.getOrderId());
        if (0 == cmp) {
            return -Double.compare(this.getAmount(), o.getAmount());
        }
        return cmp;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(amount);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.amount = in.readDouble();
    }
/*最后reduce输出时用这里。*/
    public String toString() {
        return orderId + "\t" + amount;
    }
}



package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String orderId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(orderId, amount);
        context.write(bean, NullWritable.get());
    }
}




package com;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{

    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
 




马克-to-win @ 马克java社区: 用如下的方法作group的遍历,断点加在这for (NullWritable val : values) {,按F5step into几次就会发现ReduceContextImpl,对于以上这组数据,第一group数据是:o1,250.0和o1,200.0, Mapreduce会把第一个key:o1,250.0,传给输入参数key指针,同时也会传给context指向的ReduceContextImpl 的实例的一个属性指针, 也叫key。马克-to-win @ 马克java社区:这样当ReduceContextImpl的Key改变了,这边的key也会变。另外,ReduceContextImpl里面有个属性nextKeyIsSame, 对于第一组(实际上任何一组处理方式都相同)有两个值这种情况,nextKeyIsSame的初值会变成真。而对于第一组只有一个值这种情况,nextKeyIsSame的初值会变成假。这是由mapreduce智能识别的。马克-to-win @ 马克java社区:当F5,单步调试时,(注意要反复按F5,因为for迭代会在ReduceContextImpl中变成Iteraor语句,具体不深入研究),由于nextKeyIsSame是真,所以可以第二次进入For循环,在ReduceContextImpl中执行nextKeyValue(),用 input.getKey()得到下一个key,之后把值给了ReduceContextImpl的key属性,(这时我们for中的key值也变了)之后nextKey = input.getKey();还会紧接着
 取出下一个key,注意这时可是o2了,已经不是同一个key了,之后 nextKeyIsSame = comparator.compare(currentRawKey..., nextKey....)==0, 会调用我们的MyGroupComparator的compare方法,返回-1,不是0 ,这样就会断定o1和o2不是一个key。nextKeyIsSame就变成false了。马克-to-win @ 马克java社区:循环到for的下一轮,判断进不进去时,又一次进入ReduceContextImpl,由于nextKeyIsSame是 false,所以就不进入for循环了,在这个过程中,大家也看到,在遍历迭代过程中,key的值也改变了。而且也看到了我们的MyGroupComparator的 compare是怎么被调用的。  */      
(购买完整教程)
            System.out.println("val is "+val.toString());
            System.out.println("inside for key is "+key.toString());
        }
/*上面一段和下面一段代码的作用相同*/
//        Iterator it= values.iterator();
//        while(it.hasNext())
//        {
//            System.out.println("inside iterator"+it.next());
//            System.out.println("inside for key is "+key.toString());
//     //         it.remove();
//        }
        context.write(key, NullWritable.get());
    }
}



package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class GroupTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(GroupTest.class);
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
/*setGroupingComparatorClass的参数必须是个RawComparator,
而我们的MyGroupComparator extends WritableComparator
而WritableComparator implements RawComparator
而interface RawComparator<T> extends Comparator<T>,实现compare方法*/      
        job.setGroupingComparatorClass(MyGroupComparator.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
  
    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}



package com;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroupComparator extends WritableComparator{
    protected MyGroupComparator() {
/*下面的话必须要有, 否则报空指针异常, true指底层代码中指是否需要要创建key的实例,一般为true,在我们这种用法的情况下,为false就会空指针。没必要关心细节 */  
        super(OrderBean.class, true);
    }
/* 下面这段话是从reducer摘录出来的: MyGroupComparator的compare方法,返回-1,不是0 ,这样就会断定o1和o2不是一个key。nextKeyIsSame就变成
 false了。循环到for的下一轮,判断进不进去时,又一次进入ReduceContextImpl,由于
 nextKeyIsSame是false, 所以就不进入for循环了,在这个过程中,大家也看到,在遍历迭代过
 程中,key的值也改变了。而且也看到了我们的MyGroupComparator的compare是怎么被调用的。
 可以判断出:想让谁和谁一组一起进入reduce,就返回0。
 
 如果我们的程序变成:return  0;则输出 结果是:o3    150.0, 为什么? 听我视频
   */
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean bean1 = (OrderBean) a;
        OrderBean bean2 = (OrderBean) b;
        return bean1.getOrderId().compareTo(bean2.getOrderId());
     //   return  0;
    }
}

输出结果:
reduce key is 马克-to-win @ 马克java社区:o1    250.0
val is (null)
inside for key is o1    250.0
val is (null)
inside for key is o1    200.0
reduce key is 马克-to-win @ 马克java社区:o2    700.0
val is (null)
inside for key is o2    700.0
val is (null)
inside for key is o2    500.0
val is (null)
inside for key is o2    100.0
 INFO - Job job_local1313255223_0001 running in uber mode : false
reduce key is 马克-to-win @ 马克java社区:o3    150.0
val is (null)
inside for key is o3    150.0



文件输出结果:(每组中最小的)
o1    200.0
o2    100.0
o3    150.09