MapReduce基础
本章讲了围绕着Mapreduce知识点的相关14个问题,学过后可以基本胜任MapReduce编程工作。
MapReduce的输入文件是两个
对于MapReduce程序,如何输入文件是两个文件?
马 克-to-win @ 马克java社区:这一小节,我们将继续第一章大数据入门的HelloWorld例子做进一步的研究。看一看split有两个的这种情况,这里,同时我们研究如何输入文件是两个文件。
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
package com;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCountTwoFile {
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
System.out.println("key is 马克-to-win @ 马克java社区:防盗版实名手机尾号:73203"+key.toString()+" value is "+value.toString());
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
System.out.println("reduce key is "+key.toString());
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
Job job = new Job(conf, "word count");
job.setJarByClass(WordCountTwoFile.class);
job.setMapperClass(TokenizerMapper.class);
// job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/README.txt"));
(购买完整教程)
FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output2"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
当然如果程序不硬编码的话,用上一章的配置运行参数方法也可以运行。
右击项目名称,选择“run as”-“run configurations”,在“Arguments”里加入三个参数
hdfs://localhost:9000/README.txt
hdfs://localhost:9000/README1.txt
hdfs://localhost:9000/output2
然后点击“Run”即可运行。
6)结果查看
打开新生成的part-r-00000文件:
a 3
hello 6
lisi 1
mark 1
to 1
win 1
zhangsan 1
里面给出了两个文件中每个字的出现次数。
源文件readme.txt:
hello a hello win
hello a to
hello mark
源文件readme1.txt:
hello zhangsan
hello a lisi
执行结果是:
otherArgs is hdfs://localhost:9000/README.txthdfs://localhost:9000/README1.txt
mytest hadoop successful
INFO - session.id is deprecated. Instead, use dfs.metrics.session-id
INFO - Initializing JVM Metrics with processName=JobTracker, sessionId=
WARN - No job jar file set. User classes may not be found. See Job or Job#setJar(String).
INFO - Total input paths to process : 2
INFO - number of splits:2
INFO - Submitting tokens for job: job_local358187217_0001
INFO - The url to track the job: http://localhost:8080/
INFO - Running job: job_local358187217_0001
INFO - OutputCommitter set in config null
INFO - File Output Committer Algorithm version is 1
INFO - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
INFO - Waiting for map tasks
INFO - Starting task: attempt_local358187217_0001_m_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@af988b
INFO - Processing split: hdfs://localhost:9000/README.txt:0+41
INFO - Job job_local358187217_0001 running in uber mode : false
INFO - map 0% reduce 0%
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 马克-to-win @ 马克java社区:0 value is hello a hello win
key is 马克-to-win @ 马克java社区:19 value is hello a to
key is 马克-to-win @ 马克java社区:31 value is hello mark
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 76; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214364(104857456); length = 33/6553600
INFO - Finished spill 0
INFO - Task:attempt_local358187217_0001_m_000000_0 is done. And is in the process of committing
INFO - map
INFO - Task 'attempt_local358187217_0001_m_000000_0' done.
INFO - Finishing task: attempt_local358187217_0001_m_000000_0
INFO - Starting task: attempt_local358187217_0001_m_000001_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@45618a
INFO - Processing split: hdfs://localhost:9000/README1.txt:0+28
INFO - (EQUATOR) 0 kvi 26214396(104857584)
INFO - mapreduce.task.io.sort.mb: 100
INFO - soft limit at 83886080
INFO - bufstart = 0; bufvoid = 104857600
INFO - kvstart = 26214396; length = 6553600
INFO - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
key is 0 value is hello zhangsan
key is 16 value is hello a lisi
INFO -
INFO - Starting flush of map output
INFO - Spilling map output
INFO - bufstart = 0; bufend = 48; bufvoid = 104857600
INFO - kvstart = 26214396(104857584); kvend = 26214380(104857520); length = 17/6553600
INFO - Finished spill 0
INFO - Task:attempt_local358187217_0001_m_000001_0 is done. And is in the process of committing
INFO - map
INFO - Task 'attempt_local358187217_0001_m_000001_0' done.
INFO - Finishing task: attempt_local358187217_0001_m_000001_0
INFO - map task executor complete.
INFO - Waiting for reduce tasks
INFO - Starting task: attempt_local358187217_0001_r_000000_0
INFO - File Output Committer Algorithm version is 1
INFO - ProcfsBasedProcessTree currently is supported only on Linux.
INFO - Using ResourceCalculatorProcessTree : org.apache.hadoop.yarn.util.WindowsBasedProcessTree@743b96
INFO - Using ShuffleConsumerPlugin: org.apache.hadoop.mapreduce.task.reduce.Shuffle@12ec0de
INFO - MergerManager: memoryLimit=181665792, maxSingleShuffleLimit=45416448, mergeThreshold=119899424, ioSortFactor=10, memToMemMergeOutputsThreshold=10
INFO - attempt_local358187217_0001_r_000000_0 Thread started: EventFetcher for fetching Map Completion Events
INFO - localfetcher#1 about to shuffle output of map attempt_local358187217_0001_m_000001_0 decomp: 60 len: 64 to MEMORY
INFO - Read 60 bytes from map-output for attempt_local358187217_0001_m_000001_0
INFO - closeInMemoryFile -> map-output of size: 60, inMemoryMapOutputs.size() -> 1, commitMemory -> 0, usedMemory ->60
INFO - localfetcher#1 about to shuffle output of map attempt_local358187217_0001_m_000000_0 decomp: 96 len: 100 to MEMORY
INFO - Read 96 bytes from map-output for attempt_local358187217_0001_m_000000_0
INFO - closeInMemoryFile -> map-output of size: 96, inMemoryMapOutputs.size() -> 2, commitMemory -> 60, usedMemory ->156
INFO - EventFetcher is interrupted.. Returning
INFO - 2 / 2 copied.
INFO - finalMerge called with 2 in-memory map-outputs and 0 on-disk map-outputs
INFO - Merging 2 sorted segments
INFO - Down to the last merge-pass, with 2 segments left of total size: 148 bytes
INFO - Merged 2 segments, 156 bytes to disk to satisfy reduce memory limit
INFO - Merging 1 files, 158 bytes from disk
INFO - Merging 0 segments, 0 bytes from memory into reduce
INFO - Merging 1 sorted segments
INFO - Down to the last merge-pass, with 1 segments left of total size: 150 bytes
INFO - 2 / 2 copied.
INFO - mapred.skip.on is deprecated. Instead, use mapreduce.job.skiprecords
reduce key is a
val is 1
val is 1
val is 1
reduce key is hello
val is 1
val is 1
val is 1
val is 1
val is 1
val is 1
reduce key is lisi
val is 1
reduce key is mark
val is 1
reduce key is to
val is 1
reduce key is win
val is 1
reduce key is zhangsan
val is 1
INFO - map 100% reduce 0%
INFO - Task:attempt_local358187217_0001_r_000000_0 is done. And is in the process of committing
INFO - 2 / 2 copied.
INFO - Task attempt_local358187217_0001_r_000000_0 is allowed to commit now
INFO - Saved output of task 'attempt_local358187217_0001_r_000000_0' to hdfs://localhost:9000/output13/_temporary/0/task_local358187217_0001_r_000000
INFO - reduce > reduce
INFO - Task 'attempt_local358187217_0001_r_000000_0' done.
INFO - Finishing task: attempt_local358187217_0001_r_000000_0
INFO - reduce task executor complete.
INFO - map 100% reduce 100%
INFO - Job job_local358187217_0001 completed successfully
INFO - Counters: 35
File System Counters
FILE: Number of bytes read=1624
FILE: Number of bytes written=900830
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=179
HDFS: Number of bytes written=48
HDFS: Number of read operations=28
HDFS: Number of large read operations=0
HDFS: Number of write operations=5
Map-Reduce Framework
Map input records=5
Map output records=14
Map output bytes=124
Map output materialized bytes=164
Input split bytes=195
Combine input records=0
Combine output records=0
Reduce input groups=7
Reduce shuffle bytes=164
Reduce input records=14
Reduce output records=7
Spilled Records=28
Shuffled Maps =2
Failed Shuffles=0
Merged Map outputs=2
GC time elapsed (ms)=65
Total committed heap usage (bytes)=461156352
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=69
File Output Format Counters
Bytes Written=48