MapReduce实现join算法

MapReduce实现join算法
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
马克-to-win @ 马克java社区:需求:



订单表:(order)
订单号(id),产品号(pid),数量(number)
1,p2,2
2,p3,3
2,p4,1
2,p5,4
3,p1,5
1,p1,3

产品表:(product)
产品号(id),产品名(pname),价格(price)

p1,java,11
p2,c,22
p3,c#,33
p4,python,44
p5,js,66

马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。现在数量巨大,且数据都在文本文件中,所以过去的sql不能用。 我们用大数据方法实现,
select o.id order_id,  o.number, p.id , p.pname,  p.price  number*price sum from order o join product p on o.pid = p.id

最后的结果一定和order表的条数是一样的为6。因为product表只是一对多中的一。填充信息即可。

最终的结果希望是:

order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0

思路:马克-to-win @ 马克java社区:首先map当中,可以根据文件名知道当前这条数据是订单还是商品。但因为map里面的context.write往外写时和 reduce接收时都只能有一种格式(事先在泛型Mapper<LongWritable,Text,Text,UnionBean>当中定好),所以无论订单还是商品都只能放在一个它们的属性合集UnionBean当中。马克-to-win @ 马克java社区:如果是Order,就填一下UnionBean的Order的几个属性,其他的属性都用缺省值顶替。对于Product同理。这样放在context中的 都是残缺不全的UnionBean,但都有ProductID这个属性。一种方法是,我们找到order和product的关系,(这样这个关系需要储藏在hashMap中), 根据order找product或根据product来找order,但都比较麻烦(这就是后面的map join的方法思路)。马克-to-win @ 马克java社区:另外一种好的方法,因为所有的UnionBean都有productid,所以以productid作为key,给reducer。在reducer当中, 把已经填完整了的Order属性的UnionBean当做基础,放在一个ArrayList当中,它里面所缺的Product的属性,给填上就好了。马克-to-win @ 马克java社区:另外注意orderID为0代表order,否则为product。有关如何遍历两遍的问题,见代码里。




package com;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class UnionBean implements Writable{
    private int order_id;
    private String p_id;
    private int number;
    private String pname;
    private float price;

    public UnionBean() {
    }

    public UnionBean(int order_id, String p_id, int number, String pname, float price) {
        this.order_id = order_id;
        this.p_id = p_id;
        this.number = number;
        this.pname = pname;
        this.price = price;

    }
    public void set(int order_id, String p_id, int number, String pname, float price) {
        this.order_id = order_id;
        this.p_id = p_id;
        this.number = number;
        this.pname = pname;
        this.price = price;

    }
    public int getOrder_id() {
        return order_id;
    }

    public void setOrder_id(int order_id) {
        this.order_id = order_id;
    }

    public String getP_id() {
        return p_id;
    }

    public void setP_id(String p_id) {
        this.p_id = p_id;
    }

    public int getNumber() {
        return number;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getPname() {
        return pname;
    }

    public void setPname(String pname) {
        this.pname = pname;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(order_id);
        out.writeUTF(p_id);
        out.writeInt(number);
        out.writeUTF(pname);
        out.writeFloat(price);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.order_id = in.readInt();
        this.p_id = in.readUTF();
        this.number = in.readInt();
        this.pname = in.readUTF();
        this.price = in.readFloat();

    }

    @Override
    public String toString() {
        return  "order_id=" + order_id  +
                ", p_id=" + p_id +
                ", number=" + number +
                ", pname=" + pname +
                ", price=" + price +
                ", sum=" + price*number;
    }
}



package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;

public class JoinMapper extends Mapper<LongWritable,Text,Text,UnionBean>{
    UnionBean unionBean = new UnionBean();

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String p_id;
        // context读取切片信息,之后返回文件名
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        String fileName = inputSplit.getPath().getName();
        System.out.println(fileName+" is fileName马克-to-win @ 马克java社区:");
        if (fileName.startsWith("order")) {
            String[] fields = line.split(",");
            p_id = fields[1];
            // 为了防止出现空指针,我们给后面不存在的变量赋予默认值
/*  举个例子:对于:p1,java,11和3,p1,5和1,p1,3来讲是:3,p1,5,"",0和1,p1,3,"",0  */
            unionBean.set(Integer.parseInt(fields[0]), fields[1], Integer.parseInt(fields[2]), "", 0);
        } else {
            String[] fields = line.split(",");
            p_id = fields[0];
            // 为了防止出现空指针,我们给后面不存在的变量赋予默认值
/*举个例子:对于:p1,java,11和3,p1,5和1,p1,3来讲是:0,p1,0,java,11*/
            unionBean.set(0, p_id, 0, fields[1], Integer.parseInt(fields[2]));
        }
        context.write(new Text(p_id), unionBean);
    }
}



package com;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class JoinReducer extends Reducer<Text, UnionBean, UnionBean, NullWritable>{
    @Override
    protected void reduce(Text key, Iterable<UnionBean> values, Context context) throws IOException, InterruptedException {
        UnionBean pBeanMiddle = new UnionBean();
        List<UnionBean> resultList = new ArrayList<>();




/* 马克-to-win:注意这里的key是个pid,只是一个值。因为reducer当中不能遍历两遍,所以在第一遍遍历的时候,把所有的Order(一对多的多)先放在一个ArrayList当中,因为Product 的Bean只有一个实例(一对多的一),所以把它先存在一个实例当中,第二次遍历就发生在ArrayList的当中,把那个Product的属性给 Order Bean当中的属性就行了,举个例子:p1,java,11和3,p1,5和1,p1,3,这3条数据形成的3条unionBean(3,p1,5,"",0和1,p1,3,"", 0和0,p1,0,java,11)就同时进入到reduce当中,p1,java,11就进入pBeanMiddle, 3,p1,5和1,p1,3就进入两个oBeanMiddle.即resultList*/
        for (UnionBean value : values) {
            if (0==value.getOrder_id()) {
                try {            
                    BeanUtils.copyProperties(pBeanMiddle, value);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } else {
                UnionBean oBeanMiddle = new UnionBean();
                try {
                    BeanUtils.copyProperties(oBeanMiddle, value);
/*这里是一对多中的多, 所以要用一个List来处理*/                  
                    resultList.add(oBeanMiddle);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        for (UnionBean resultBean : resultList) {
            resultBean.setPname(pBeanMiddle.getPname());
            resultBean.setPrice(pBeanMiddle.getPrice());
            context.write(resultBean, NullWritable.get());
        }
    }
}


package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class JoinTestMark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(JoinTest.class);
        job.setMapperClass(JoinMapper.class);
        job.setReducerClass(JoinReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(UnionBean.class);
        job.setOutputKeyClass(UnionBean.class);
        job.setOutputValueClass(NullWritable.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
/*inputJoin放了两个文件, product.txt和order.txt*/      
        FileInputFormat.setInputPaths(job, new Path("e:/temp/inputJoin"));

/*或者用下面的本章前面的方法也可以。*/      
  //      FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/order.txt"));
  //      FileInputFormat.addInputPath(job, new Path("e:/temp/inputJoin/product.txt"));

        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("马克-to-win @马克java社区 successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}

文件输出结果是:

order_id=1, p_id=p1, number=3, pname=java, price=11.0, sum=33.0
order_id=3, p_id=p1, number=5, pname=java, price=11.0, sum=55.0
order_id=1, p_id=p2, number=2, pname=c, price=22.0, sum=44.0
order_id=2, p_id=p3, number=3, pname=c#, price=33.0, sum=99.0
order_id=2, p_id=p4, number=1, pname=python, price=44.0, sum=44.0
order_id=2, p_id=p5, number=4, pname=js, price=66.0, sum=264.0