MapReduce当中排序sort的方法


马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。注意:想自己实现Sort得不偿失,但如想借助Hadoop MapReduce技术框架排序,key必须实现WritableComparable接口。具体做法见下。需求是先按id比,再按amount比。






package com;

import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*public  interface  WritableComparable<T> extends Writable, Comparable<T>
 而且 public interface Comparable<T>
{
public int compareTo(T o);
}
*/
public class OrderBeanSort implements WritableComparable<OrderBeanSort>{
    private String orderId;
    private Double amount;
    private Double average=0.0;

    public Double getAverage() {
        return average;
    }

    public void setAverage(Double average) {
        this.average = average;
    }

    public OrderBeanSort() {
    }

    public OrderBeanSort(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public void set(String orderId, Double amount) {
        this.orderId = orderId;
        this.amount = amount;
    }
    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(amount);
        out.writeDouble(average);
    }

    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.amount = in.readDouble();
        this.average = in.readDouble();
    }
/*最后reduce输出时用这里。*/
    public String toString() {
        return orderId + "\t" + amount+ "\t" +average;
    }
  
    @Override
    public int compareTo(OrderBeanSort o) {
        /*马克-to-win: 如orderId相等,则比较amount,否则比较id即可,这样id相同的连在一起了。因为id先比,amount后比,就是如下排的。
o1    250.0    0.0
o1    200.0    0.0
o2    700.0    0.0
o2    500.0    0.0
o2    100.0    0.0
o3    150.0    0.0
*/
        int cmp = this.getOrderId().compareTo(o.getOrderId());
        if (0 == cmp) {
            return -Double.compare(this.getAmount(), o.getAmount());
        }
        return cmp;
    }
}



package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text,OrderBeanSort, NullWritable>{
    OrderBeanSort bean = new OrderBeanSort();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String orderId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(orderId, amount);
        context.write(bean,NullWritable.get());
    }
}



package com;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class SortReducer extends Reducer<OrderBeanSort, NullWritable, OrderBeanSort, NullWritable>{

    protected void reduce(OrderBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
 
 */      
        for (NullWritable val : values) {
            System.out.println("val is "+val.toString());
            System.out.println("inside for key is "+key.toString());
        }
/*上面一段和下面一段代码的作用相同*/
//        Iterator it= values.iterator();
//        while(it.hasNext())
//        {
//            System.out.println("inside iterator"+it.next());
//            System.out.println("inside for key is "+key.toString());
//     //         it.remove();
//        }
        context.write(key, NullWritable.get());
    }
}





package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SortTestMark_to_win {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(SortTest.class);
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        job.setMapOutputKeyClass(OrderBeanSort.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(OrderBeanSort.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}


输出结果:
reduce key is 马克-to-win @ 马克java社区:o1    250.0    0.0
val is (null)
inside for key is o1    250.0    0.0
reduce key is 马克-to-win @ 马克java社区:o1    200.0    0.0
val is (null)
inside for key is o1    200.0    0.0
reduce key is 马克-to-win @ 马克java社区:o2    700.0    0.0
val is (null)
inside for key is o2    700.0    0.0
reduce key is 马克-to-win @ 马克java社区:o2    500.0    0.0
val is (null)
inside for key is o2    500.0    0.0
reduce key is 马克-to-win @ 马克java社区:o2    100.0    0.0
val is (null)
inside for key is o2    100.0    0.0
reduce key is 马克-to-win @ 马克java社区:o3    150.0    0.0
val is (null)
inside for key is o3    150.0    0.0


输出文件:
o1    250.0    0.0
o1    200.0    0.0
o2    700.0    0.0
o2    500.0    0.0
o2    100.0    0.0
o3    150.0    0.0

输入文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0


分析:

先排好序了,6条数据,在Mapreduce系统当中,然后依次进入reduce,reduce被调用了6次。补充讲一句:如果是o1,p2,250.0和 o1,p1,250.0,这样两条数据,他们就属于key完全相同的两条数据。将会同时进入reduce方法, 和咱们当时的 hello[1,1,1,1]是一样的。而现在的o1,p2,250.0和o1,p1,200.0由于compareTo结果不同,所以是分别进入的reduce方法.