MapReduce实现join算法
MapReduce实现join算法
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:需求:
订单表:(order)
订单号(id),产品号(pid),数量(number)
1,p2,2
2,p3,3
2,p4,1
2,p5,4
3,p1,5
1,p1,3
产品表:(product)
产品号(id),产品名(pname),价格(price)
p1,java,11
p2,c,22
p3,c#,33
p4,python,44
p5,js,66
马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。现在数量巨大,且数据都在文本文件中,所以过去的sql不能用。 我们用大数据方法实现,
select o.id order_id, o.number, p.id , p.pname, p.price number*price sum from order o join product p on o.pid = p.id
最后的结果一定和order表的条数是一样的为6。因为product表只是一对多中的一。填充信息即可。
最终的结果希望是:
order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0
思路:马克-to-win @ 马克java社区:首先map当中,可以根据文件名知道当前这条数据是订单还是商品。但因为map里面的context.write往外写时和 reduce接收时都只能有一种格式(事先在泛型Mapper<LongWritable,Text,Text,UnionBean>当中定好),所以无论订单还是商品都只能放在一个它们的属性合集UnionBean当中。马克-to-win @ 马克java社区:如果是Order,就填一下UnionBean的Order的几个属性,其他的属性都用缺省值顶替。对于Product同理。这样放在context中的 都是残缺不全的UnionBean,但都有ProductID这个属性。一种方法是,我们找到order和product的关系,(这样这个关系需要储藏在hashMap中), 根据order找product或根据product来找order,但都比较麻烦(这就是后面的map join的方法思路)。马克-to-win @ 马克java社区:另外一种好的方法,因为所有的UnionBean都有productid,所以以productid作为key,给reducer。在reducer当中, 把已经填完整了的Order属性的UnionBean当做基础,放在一个ArrayList当中,它里面所缺的Product的属性,给填上就好了。马克-to-win @ 马克java社区:另外注意orderID为0代表order,否则为product。有关如何遍历两遍的问题,见代码里。
package com;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class UnionBean implements Writable{
private int order_id;
private String p_id;
private int number;
private String pname;
private float price;
public UnionBean() {
}
public UnionBean(int order_id, String p_id, int number, String pname, float price) {
this.order_id = order_id;
this.p_id = p_id;
this.number = number;
this.pname = pname;
this.price = price;
}
public void set(int order_id, String p_id, int number, String pname, float price) {
this.order_id = order_id;
this.p_id = p_id;
this.number = number;
this.pname = pname;
this.price = price;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public String getP_id() {
return p_id;
}
public void setP_id(String p_id) {
this.p_id = p_id;
}
public int getNumber() {
return number;
}
public void setNumber(int number) {
this.number = number;
}
public String getPname() {
return pname;
}
public void setPname(String pname) {
this.pname = pname;
}
public float getPrice() {
return price;
}
public void setPrice(float price) {
this.price = price;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(order_id);
out.writeUTF(p_id);
out.writeInt(number);
out.writeUTF(pname);
out.writeFloat(price);
}
@Override
public void readFields(DataInput in) throws IOException {
this.order_id = in.readInt();
this.p_id = in.readUTF();
this.number = in.readInt();
this.pname = in.readUTF();
this.price = in.readFloat();
}
@Override
public String toString() {
return "order_id=" + order_id +
", p_id=" + p_id +
", number=" + number +
", pname=" + pname +
", price=" + price +
", sum=" + price*number;
}
}
package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class JoinMapper extends Mapper<LongWritable,Text,Text,UnionBean>{
UnionBean unionBean = new UnionBean();
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String p_id;
// context读取切片信息,之后返回文件名
FileSplit inputSplit = (FileSplit) context.getInputSplit();
String fileName = inputSplit.getPath().getName();
System.out.println(fileName+" is fileName马克-to-win @ 马克java社区:");
if (fileName.startsWith("order")) {
String[] fields = line.split(",");
p_id = fields[1];
// 为了防止出现空指针,我们给后面不存在的变量赋予默认值
/* 举个例子:对于:p1,java,11和3,p1,5和1,p1,3来讲是:3,p1,5,"",0和1,p1,3,"",0 */
unionBean.set(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]), "", 0);
} else {
String[] fields = line.split(",");
p_id = fields[0];
// 为了防止出现空指针,我们给后面不存在的变量赋予默认值
/*举个例子:对于:p1,java,11和3,p1,5和1,p1,3来讲是:0,p1,0,java,11*/
unionBean.set(0, p_id, 0, fields[1], Integer.parseInt(fields[2]));
}
context.write(new Text(p_id), unionBean);
}
}
package com;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class JoinReducer extends Reducer<Text, UnionBean, UnionBean, NullWritable>{
@Override
protected void reduce(Text key, Iterable<UnionBean> values, Context context) throws IOException, InterruptedException {
UnionBean pBeanMiddle = new UnionBean();
List<UnionBean> resultList = new ArrayList<>();
/* 马克-to-win:注意这里的key是个pid,只是一个值。因为reducer当中不能遍历两遍,所以在第一遍遍历的时候,把所有的Order(一对多的多)先放在一个ArrayList当中,因为Product 的Bean只有一个实例(一对多的一),所以把它先存在一个实例当中,第二次遍历就发生在ArrayList的当中,把那个Product的属性给 Order Bean当中的属性就行了,举个例子:p1,java,11和3,p1,5和1,p1,3,这3条数据形成的3条unionBean(3,p1,5,"",0和1,p1,3,"", 0和0,p1,0,java,11)就同时进入到reduce当中,p1,java,11就进入pBeanMiddle, 3,p1,5和1,p1,3就进入两个oBeanMiddle.即resultList*/
for (UnionBean value : values) {
if (0==value.getOrder_id()) {
try {
BeanUtils.copyProperties(pBeanMiddle, value);
} catch (Exception e) {
e.printStackTrace();
}
} else {
UnionBean oBeanMiddle = new UnionBean();
try {
BeanUtils.copyProperties(oBeanMiddle, value);
/*这里是一对多中的多, 所以要用一个List来处理*/
resultList.add(oBeanMiddle);
} catch (Exception e) {
e.printStackTrace();
}
}
}
for (UnionBean resultBean : resultList) {
resultBean.setPname(pBeanMiddle.getPname());
resultBean.setPrice(pBeanMiddle.getPrice());
context.write(resultBean, NullWritable.get());
}
}
}
package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class JoinTestMark_to_win {
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(JoinTest.class);
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(UnionBean.class);
job.setOutputKeyClass(UnionBean.class);
job.setOutputValueClass(NullWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
/*inputJoin放了两个文件, product.txt和order.txt*/
FileInputFormat.setInputPaths(job, new Path("e:/temp/inputJoin"));
/*或者用下面的本章前面的方法也可以。*/
// FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/order.txt"));
// FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/product.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("马克-to-win @马克java社区 successful");
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
文件输出结果是:
order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0