MapReduce当中排序sort的方法
马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。注意:想自己实现Sort得不偿失,但如想借助Hadoop MapReduce技术框架排序,key必须实现WritableComparable接口。具体做法见下。需求是先按id比,再按amount比。
package com;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/*public interface WritableComparable<T> extends Writable, Comparable<T>
而且 public interface Comparable<T>
{
public int compareTo(T o);
}
*/
public class OrderBeanSort implements WritableComparable<OrderBeanSort>{
private String orderId;
private Double amount;
private Double average=0.0;
public Double getAverage() {
return average;
}
public void setAverage(Double average) {
this.average = average;
}
public OrderBeanSort() {
}
public OrderBeanSort(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public void set(String orderId, Double amount) {
this.orderId = orderId;
this.amount = amount;
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getAmount() {
return amount;
}
public void setAmount(Double amount) {
this.amount = amount;
}
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(amount);
out.writeDouble(average);
}
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.amount = in.readDouble();
this.average = in.readDouble();
}
/*最后reduce输出时用这里。*/
public String toString() {
return orderId + "\t" + amount+ "\t" +average;
}
@Override
public int compareTo(OrderBeanSort o) {
/*马克-to-win: 如orderId相等,则比较amount,否则比较id即可,这样id相同的连在一起了。因为id先比,amount后比,就是如下排的。
o1 250.0 0.0
o1 200.0 0.0
o2 700.0 0.0
o2 500.0 0.0
o2 100.0 0.0
o3 150.0 0.0
*/
int cmp = this.getOrderId().compareTo(o.getOrderId());
if (0 == cmp) {
return -Double.compare(this.getAmount(), o.getAmount());
}
return cmp;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class SortMapper extends Mapper<LongWritable, Text,OrderBeanSort, NullWritable>{
OrderBeanSort bean = new OrderBeanSort();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
String orderId = fields[0];
double amount = Double.parseDouble(fields[2]);
bean.set(orderId, amount);
context.write(bean,NullWritable.get());
}
}
package com;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class SortReducer extends Reducer<OrderBeanSort, NullWritable, OrderBeanSort, NullWritable>{
protected void reduce(OrderBeanSort key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
System.out.println("reduce key is 马克-to-win @ 马克java社区:"+key.toString());
/*
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
*/
for (NullWritable val : values) {
System.out.println("val is "+val.toString());
System.out.println("inside for key is "+key.toString());
}
/*上面一段和下面一段代码的作用相同*/
// Iterator it= values.iterator();
// while(it.hasNext())
// {
// System.out.println("inside iterator"+it.next());
// System.out.println("inside for key is "+key.toString());
// // it.remove();
// }
context.write(key, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SortTest.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setMapOutputKeyClass(OrderBeanSort.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(OrderBeanSort.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/serial.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
输出结果:
reduce key is 马克-to-win @ 马克java社区:o1 250.0 0.0
val is (null)
inside for key is o1 250.0 0.0
reduce key is 马克-to-win @ 马克java社区:o1 200.0 0.0
val is (null)
inside for key is o1 200.0 0.0
reduce key is 马克-to-win @ 马克java社区:o2 700.0 0.0
val is (null)
inside for key is o2 700.0 0.0
reduce key is 马克-to-win @ 马克java社区:o2 500.0 0.0
val is (null)
inside for key is o2 500.0 0.0
reduce key is 马克-to-win @ 马克java社区:o2 100.0 0.0
val is (null)
inside for key is o2 100.0 0.0
reduce key is 马克-to-win @ 马克java社区:o3 150.0 0.0
val is (null)
inside for key is o3 150.0 0.0
输出文件:
o1 250.0 0.0
o1 200.0 0.0
o2 700.0 0.0
o2 500.0 0.0
o2 100.0 0.0
o3 150.0 0.0
输入文件:
o1,p2,250.0
o2,p3,500.0
o2,p4,100.0
o2,p5,700.0
o3,p1,150.0
o1,p1,200.0
分析:
先排好序了,6条数据,在Mapreduce系统当中,然后依次进入reduce,reduce被调用了6次。补充讲一句:如果是o1,p2,250.0和 o1,p1,250.0,这样两条数据,他们就属于key完全相同的两条数据。将会同时进入reduce方法, 和咱们当时的 hello[1,1,1,1]是一样的。而现在的o1,p2,250.0和o1,p1,200.0由于compareTo结果不同,所以是分别进入的reduce方法.