MapReduce当中寻找用户间的共同好友

马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
寻找用户间的共同好友


马克-to-win @ 马克java社区:下面我们给出一个经典的案例:寻找用户间的共同好友。(有意思的是:网上讨论这个案例的虽多,但都有这那的错误,不是数据错就是程序错, 总有同学和我比对,实际和我的是不一样的)马克-to-win @ 马克java社区:下面给出用户的好友关系列表(注意是单向的, 腾讯QQ单向好友的算法经常改动,曾经有一段时间,是这样规定的。我可以让你加我为单向好友,这样的话我的好友列表当中没有你,但你的好友列表当中却有我,对我的好处就是可以节省我的资源,那一段儿时间规定单向好友可以5000甚至更多,但是双向好友最多只能加500个。于是单向好友就非常流行 ),每一行代表一个用户和他的好友列表。



A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J




马克-to-win @ 马克java社区:防盗版实名手机尾号:73203。现在需要找出用户间的共同好友。目测:A:B,C,D,F,E,O。 A的好友有这些人。 B:A,C,E,F, B的好友有这些人。A和B的共同好友是C,E,F。其实如果不用大数据的思维,很简单,知道:A:B,C,D,F,E,O。而且B:A,C,E,F得出A -B:  C,E,F, 这算法并不难。之后再穷尽一下。(A-B,A-C,A-D。。。。之后再B-C,B-D。。。。二维数组就可以搞定)。但如果数据量大的话,内存就崩了,为什么? 因为假设字母有非常多,整个走一遍, 才能知道A到底和谁有关系,A-B,A-C,A-D, 所以走一遍只能处理一个字母,想走第2遍,需要把第一轮的数据整个存在一个hashmap中,崩溃怎么办? 那B怎么办?所以这种方法不行.

马克-to-win @ 马克java社区:A-B:  C,E,F  这结论用大数据来做是这样:第一步:要知道C在A的好友列表,C也在B的好友列表。这就要知道C到底在多少人的好友列表?答案是:C    H-K-B-A-G-E-F(即C在这许多人的好友列表),怎么做,见下?第二步,有了如上的数据,就好做穷尽,AB好友列表都有C。AE好友列表页都有 C。以AB为键,马上会发现 AB好友列表也都有E,而且也都有F。这样,最终的结论就有了:A -B:  C,E,F。

马克-to-win @ 马克java社区:这件事用一个MapReduce不行,得用两个,才能搞定。思路是这样:

马克-to-win @ 马克java社区:在第一个mapper中:读一行A:B,C,D,F,E,O(A的好友列表有这些,反过来拆开)    输出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>,(前者在后者的好友列表)再读一行B:A,C,E,F ,输出<A,B> <C,B> <E,B> <F,B>,

马克-to-win @ 马克java社区:之后在第一个Reducer中:key相同的会分到一组,例如:<C,A><C,B><C,E><C,F><C,G>......(还是前者在后者的好友列表)
Key:C
value: [ A, B, E, F, G ]

马克-to-win @ 马克java社区:输出为:C    H-K-B-A-G-E-F。什么意思呢? C在H-K-B-A-G-E-F的好友列表。
有一个疑问:原始数据:C:F,A,D,I意味着c的单向好友列表有这四个人。二者有区别, 注意单向问题。


最后整体输出为 :

A    F-H-D-G-B-K-C-I-O
B    E-A-F-J
C    H-K-B-A-G-E-F
D    E-C-L-K-A-H-G-F
E    G-F-H-L-M-D-B-A
F    B-M-L-A-G-D-C
G    M
H    O
I    C-O
J    O
L    D-E
M    E-F
O    A-H-I-J-F

马克-to-win @ 马克java社区:之后在第二个mapper中,拿一行举例:输入是:    A    F-H-D-G-B-K-C-I-O 意味着:A在F-H-D-G-B-K-C-I-O好友列表。
经过遍历,穷尽,希望输出结果是:
B-C     A      (意味着,B和C都有A这个好友。蓝色)
B-D     A
B-F     A
B-G     A
B-H     A
B-I     A
B-K     A
B-O     A
C-D     A
C-F     A
C-G     A
C-H     A
C-I     A
C-K     A
C-O     A
D-F     A
D-G     A
D-H     A
D-I     A
D-K     A
D-O     A
F-G     A
F-H     A
F-I     A
F-K     A
F-O     A
G-H     A
G-I     A
G-K     A
G-O     A
H-I     A
H-K     A
H-O     A
I-K     A
I-O     A
K-O     A
A-E     B
A-F     B
A-J     B
E-F     B
E-J     B
。。。。。

G-L     F
G-M     F
L-M     F
C-O     I
D-E     L
E-F     M
A-F     O
A-H     O
A-I     O
A-J     O
F-H     O
F-I     O
F-J     O
H-I     O
H-J     O
I-J     O



在第二个mapper中:
读入数据,key相同的在一组  <A-B,C><A-B,E><A-B,F>
最后输出的结果是:A-B   C,E,F,  代表C,E,F都在A和B的好友列表里,A和B 都有好友C,E,F




总结一下:第一步,找出都有谁有A这个好友,第二步,找出都有谁和谁同时有A这个好友,第三步,找出都有谁和谁,同时有谁谁谁这些好友。这也正是需求。

package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CommonFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(":");
        String user = fields[0];
        String[] friends = fields[1].split(",");
(购买完整教程)
            k.set(friend);
            v.set(user);
/*读一行A:B,C,D,F,E,O(A的好友有这些,反过来拆开,这些人中的每一个都是A的好友)
输出<B,A> <C,A> <D,A> <F,A> <E,A> <O,A>*/
            context.write(k, v);
        }
    }
}




package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*输入为比如:<C,A><C,B><C,E><C,F><C,G>*/      
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append("-");
        }
        // 去掉最后一个字符(-)
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
/*输出为 C    H-K-B-A-G-E-F*/      
        context.write(key, v);
    }
}




package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTestMark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(CommonFriendsTest.class);
        job.setMapperClass(CommonFriendsMapper.class);
        job.setReducerClass(CommonFriendsReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("马克-to-win @马克java社区 successful");
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
  
    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}



输入文件:

A:B,C,D,F,E,O
B:A,C,E,F
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J




输出:

A    F-H-D-G-B-K-C-I-O
B    E-A-F-J
C    H-K-B-A-G-E-F
D    E-C-L-K-A-H-G-F
E    G-F-H-L-M-D-B-A
F    B-M-L-A-G-D-C
G    M
H    O
I    C-O
J    O
L    D-E
M    E-F
O    A-H-I-J-F





把第一个Mapreduce的输出作为输入。

package com;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.Arrays;
public class CommonFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
       /* 马克-to-win @ 马克java社区:输入是:    A    F-H-D-G-B-K-C-I-O(默认分隔符\t)意味着:F-H-D-G-B-K-C-I-O都有A这个好友。
        或者最后输入是:     O    A-H-I-J-F
        */
        String line = value.toString();
        String[] fields = line.split("\t");
        String user = fields[0];
        String[] friends = fields[1].split("-");
/*比如前面的F-H-D-G-B-K-C-I-O*/      
        Arrays.sort(friends);
/*变成B-C-D-F-G-H-I-K-O*/   
/*最后一行变成:A-F-H-I-J*/       
/*遍历*/      
        for (int i = 0; i < friends.length - 1; i++) {
            for (int j = i + 1; j < friends.length; j++) {
                k.set(friends[i] + "-" + friends[j]);
                v.set(user);
                System.out.println(friends[i] + "-" + friends[j]+" \t"+user);
/*马克-to-win @ 马克java社区:输出结果是:意味着,B和C都有A这个好友。
B-C     A
B-D     A
B-F     A
B-G     A
B-H     A
B-I     A
B-K     A
B-O     A
C-D     A
C-F     A
C-G     A
C-H     A
C-I     A
C-K     A
C-O     A
D-F     A
D-G     A
D-H     A
D-I     A
D-K     A
D-O     A
F-G     A
F-H     A
F-I     A
F-K     A
F-O     A
G-H     A
G-I     A
G-K     A
G-O     A
H-I     A
H-K     A
H-O     A
I-K     A
I-O     A
K-O     A
A-E     B
A-F     B
A-J     B
E-F     B
E-J     B
。。。。。

G-L     F
G-M     F
L-M     F
C-O     I
D-E     L
E-F     M
A-F     O
A-H     O
A-I     O
A-J     O
F-H     O
F-I     O
F-J     O
H-I     O
H-J     O
I-J     O
*/
              
                context.write(k, v);
            }
        }
    }
}




package com;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CommonFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
/*读入数据,key相同的在一组
<A-B,C><A-B,E><A-B,F>
*/      
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append(",");
        }
        /*删除最后一个,*/
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
/*最后输出的结果是:A-B   C,E,F,  代表A和B 都有好友C,E,F*/      
        context.write(key, v);
    }
}






package com;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CommonFriendsTest2Mark_to_win {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(CommonFriendsTest2.class);
        job.setMapperClass(CommonFriendsMapper2.class);
        job.setReducerClass(CommonFriendsReducer2.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/friend1.txt"));
        FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
        System.out.println("马克-to-win @马克java社区 successful");
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
  
    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}



输出结果是:

A-B    C,E,F
A-C    D,F
A-D    E,F
A-E    C,D,B
A-F    C,O,D,B,E
A-G    F,C,D,E
A-H    C,D,E,O
A-I    O
A-J    O,B
A-K    D,C
A-L    E,D,F
A-M    F,E
B-C    F,A
B-D    F,E,A
B-E    C
B-F    C,A,E
B-G    E,C,F,A
B-H    C,A,E
B-I    A
B-K    C,A
B-L    F,E
B-M    E,F
B-O    A
C-D    F,A
C-E    D
C-F    D,A
C-G    F,D,A
C-H    D,A
C-I    A
C-K    A,D
C-L    F,D
C-M    F
C-O    A,I
D-E    L
D-F    A,E
D-G    F,A,E
D-H    A,E
D-I    A
D-K    A
D-L    E,F
D-M    F,E
D-O    A
E-F    D,M,C,B
E-G    C,D
E-H    C,D
E-J    B
E-K    C,D
E-L    D
F-G    D,C,E,A
F-H    A,C,O,D,E
F-I    O,A
F-J    B,O
F-K    C,D,A
F-L    E,D
F-M    E
F-O    A
G-H    C,D,E,A
G-I    A
G-K    C,A,D
G-L    D,E,F
G-M    E,F
G-O    A
H-I    A,O
H-J    O
H-K    A,C,D
H-L    E,D
H-M    E
H-O    A
I-J    O
I-K    A
I-O    A
K-L    D
K-O    A
L-M    F,E