如何做大表和大表的关联?
马克-to-win @ 马克java社区:如何做大表和大表的关联? 对于大表和大表的关联: 1.reducejoin可以解决关联问题,但不完美,有数据倾斜的可能,如前所述。 2.思路:将其中一个大表进行切分,成多个小表再进行关联。
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
package com;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> dictMap = new HashMap<>();
Text k = new Text();
protected void setup(Context context) throws IOException, InterruptedException {
String path = context.getCacheFiles()[0].getPath();
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())) {
String[] fields = line.split(",");
dictMap.put(fields[0], fields[1]+","+fields[2]);
}
br.close();
}
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String orderLine = value.toString();
String[] fields = orderLine.split(",");
String pNameprice = dictMap.get(fields[1]);
String[] fieldspNameprice = pNameprice.split(",");
k.set(orderLine + "," + fieldspNameprice[0]+","+fieldspNameprice[1]);
context.write(k, NullWritable.get());
}
}
package com;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.File;
import java.net.URI;
public class MapJoinTestMark_to_win {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinTest.class);
job.setMapperClass(MapJoinMapper.class);
// 这里省略reduce(因为map端已经完成)
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
File file = new File("e:/temp/output");
if (file.exists() && file.isDirectory()) {
deleteFile(file);
}
FileInputFormat.setInputPaths(job, new Path("e:/temp/input/order.txt"));
FileOutputFormat.setOutputPath(job, new Path("e:/temp/output"));
System.out.println("马克-to-win @马克java社区 successful");
/* 马克-to-win @ 马克java社区:Add a file to be localized:这样缓存hdfs或文件系统的文件到task运行节点的local了,
uri: The uri of the cache to be localized:比如:o.addCacheFile(new URI("hdfs://url:port/filename"));或者本地文件:new URI("file:/E:/temp/input/product.txt")
public void addCacheFile(URI uri) {
*/
job.addCacheFile(new URI("file:/E:/temp/input/product.txt"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static boolean deleteFile(File dirFile) {
if (!dirFile.exists()) {
return false;
}
if (dirFile.isFile()) {
return dirFile.delete();
} else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
for (File file : dirFile.listFiles()) {
deleteFile(file);
}
}
return dirFile.delete();
}
}
文件输出结果:
1,p1,3,java,11
1,p2,2,c,22
2,p3,3,c#,33
2,p4,1,python,44
2,p5,4,js,66
3,p1,5,java,11