通过剖析源码单步调试详解MapReduce分组group遍历
通过剖析源码单步调试详解MapReduce分组group遍历:
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:mapreduce的group知识点是最难理解的,本小节将通过仔细剖析源码,单步调试,来详解之。
另外注意:数据文件写时一定注意:结尾不能有回车和空格,通过在map里面加断点,F8(resume),一轮一轮,调试一行一行的数据,才发现最后一行数据出毛病了,只有是多了一个换行符的毛病。
现在需求变成:
源文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
文件输出结果:(每组中最小的)
o1 200.0
o2 100.0
o3 150.0
package com;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*一般情况下,OrderBean必须是一个WritableComparable,因为MyGroupComparato的构造函数:super(OrderBean.class, true);要求是个 WritableComparable*/
public class OrderBean implements WritableComparable<OrderBean>{
private String orderId;
private Double amount;
public OrderBean() {
}
public OrderBean(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
@Override
public int compareTo(OrderBean o) {
/*这里必须加下面这段比较规则(下面的4 句话),这样就成为1)o1,250,2)o1, 200, 3)o2, 700,4) o2,500,5)o2,100, 6) o3,150,
准确的这6 个顺序的进入reduce, 才可以分组。如果像下面一样return 45;则顺序是乱的。则进入的顺序和源文件一样,是1)o1,250,2)o2,500, 3)o2, 100,4)o2,700, 5) o3,150, 6)o1, 200,这时就错了,
分组那里全都乱了。因为有一个nextKeyIsSame来一个一个key测试是否为一个组的, 所以同样的key必须排在一起。输出和源文件很像,成了4组, 完全是不对的。
o1 250.0
o2 700.0
o3 150.0
o1 200.0
原因见视频。
那为什么要-Double.compare(this.getAmount(), o.getAmount());呢? 为什么也要比较一下amount呢?因为 你的需求是找出每个key最小的amount,
所以在reduce中什么都不干, 只要遍历一下就行, 自然轮一圈, 输出的就是最小的。比如如果reduce中什么都不干, 也不遍历, 输出的就是最大的。
如果没有-Double.compare(this.getAmount(), o.getAmount());key能挨一块, 但后边的amount的顺序就是不定的了。
但是如果需求改了,只要把一组的全部输出就行, 那我们就可以不用加 return -Double.compare(this.getAmount(), o.getAmount());这句话。
结论就是:OrderBean里面compareTo就是把是一组的对象都排在一起。需要按顺序就按,没必要就没必要。
*/
int cmp = this.getOrderId().compareTo(o.getOrderId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
}
/*最后reduce输出时用这里。*/
public String toString() {
return orderId + "\t" + amount;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(bean, NullWritable.get());
}
}
package com;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
马克-to-win @ 马克java社区: 用如下的方法作group的遍历,断点加在这for (NullWritable val : values) {,按F5step into几次就会发现ReduceContextImpl,对于以上这组数据,第一group数据是:o1,250.0和o1,200.0, Mapreduce会把第一个key:o1,250.0,传给输入参数key指针,同时也会传给context指向的ReduceContextImpl 的实例的一个属性指针, 也叫key。马克-to-win @ 马克java社区:这样当ReduceContextImpl的Key改变了,这边的key也会变。另外,ReduceContextImpl里面有个属性nextKeyIsSame, 对于第一组(实际上任何一组处理方式都相同)有两个值这种情况,nextKeyIsSame的初值会变成真。而对于第一组只有一个值这种情况,nextKeyIsSame的初值会变成假。这是由mapreduce智能识别的。马克-to-win @ 马克java社区:当F5,单步调试时,(注意要反复按F5,因为for迭代会在ReduceContextImpl中变成Iteraor语句,具体不深入研究),由于nextKeyIsSame是真,所以可以第二次进入For循环,在ReduceContextImpl中执行nextKeyValue(),用 input.getKey()得到下一个key,之后把值给了ReduceContextImpl的key属性,(这时我们for中的key值也变了)之后nextKey = input.getKey();还会紧接着
取出下一个key,注意这时可是o2了,已经不是同一个key了,之后 nextKeyIsSame = comparator.compare(currentRawKey..., nextKey....)==0, 会调用我们的MyGroupComparator的compare方法,返回-1,不是0 ,这样就会断定o1和o2不是一个key。nextKeyIsSame就变成false了。马克-to-win @ 马克java社区:循环到for的下一轮,判断进不进去时,又一次进入ReduceContextImpl,由于nextKeyIsSame是 false,所以就不进入for循环了,在这个过程中,大家也看到,在遍历迭代过程中,key的值也改变了。而且也看到了我们的MyGroupComparator的 compare是怎么被调用的。 */
(购买完整教程)
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
/*上面一段和下面一段代码的作用相同*/
// Iterator it= values.iterator();
// while(it.hasNext())
// {
// System.out.println("inside iterator"+it.next());
// System.out.println("inside for key is "+key.toString());
// // it.remove();
// }
context.write(key, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class GroupTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(GroupTest.class);
job.setMapperClass(GroupMapper.class);
job.setReducerClass(GroupReducer.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
/*setGroupingComparatorClass的参数必须是个RawComparator,
而我们的MyGroupComparator extends WritableComparator
而WritableComparator implements RawComparator
而interface RawComparator<T> extends Comparator<T>,实现compare方法*/
job.setGroupingComparatorClass(MyGroupComparator.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
package com;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroupComparator extends WritableComparator{
protected MyGroupComparator() {
/*下面的话必须要有, 否则报空指针异常, true指底层代码中指是否需要要创建key的实例,一般为true,在我们这种用法的情况下,为false就会空指针。没必要关心细节 */
super(OrderBean.class, true);
}
/* 下面这段话是从reducer摘录出来的: MyGroupComparator的compare方法,返回-1,不是0 ,这样就会断定o1和o2不是一个key。nextKeyIsSame就变成
false了。循环到for的下一轮,判断进不进去时,又一次进入ReduceContextImpl,由于
nextKeyIsSame是false, 所以就不进入for循环了,在这个过程中,大家也看到,在遍历迭代过
程中,key的值也改变了。而且也看到了我们的MyGroupComparator的compare是怎么被调用的。
可以判断出:想让谁和谁一组一起进入reduce,就返回0。
如果我们的程序变成:return 0;则输出 结果是:o3 150.0, 为什么? 听我视频
*/
public int compare(WritableComparable a, WritableComparable b) {
OrderBean bean1 = (OrderBean) a;
OrderBean bean2 = (OrderBean) b;
return bean1.getOrderId().compareTo(bean2.getOrderId());
// return 0;
}
}
输出结果:
reduce key is 马克-to-win @ 马克java社区:o1 250.0
val is (null)
inside for key is o1 250.0
val is (null)
inside for key is o1 200.0
reduce key is 马克-to-win @ 马克java社区:o2 700.0
val is (null)
inside for key is o2 700.0
val is (null)
inside for key is o2 500.0
val is (null)
inside for key is o2 100.0
INFO - Job job_local1313255223_0001 running in uber mode : false
reduce key is 马克-to-win @ 马克java社区:o3 150.0
val is (null)
inside for key is o3 150.0
文件输出结果:(每组中最小的)
o1 200.0
o2 100.0
o3 150.09