DBOutputFormat把MapReduce结果输出到mysql中

DBOutputFormat把MapReduce结果输出到mysql中
马克- to-win:马克 java社区:防盗版实名手机尾号: 73203。
现在有一个需求:就是如何使用DBOutputFormat把MapReduce产生的结果输出到mysql中。



package com;
import java.io.File;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class mysqlTest {
    public static class MytableWritable implements DBWritable {
        String name;
        int age;
 
        public MytableWritable() {
        }
 
        public MytableWritable(String name, int age) {
            this.name = name;
            this.age = age;
        }
 
        public void write(PreparedStatement statement) throws SQLException {
            System.out.println("here a flag");
/*DBOutputFormat.setOutput(job, "hadoop", "name", "age"); 下面的 1 和2都是参照以上的代码来的*/          
            statement.setString(1, this.name);
            statement.setInt(2, this.age);
        }

        public void readFields(ResultSet resultSet) throws SQLException {
/*因为不用从数据库中读,所以可以这里屏蔽代码, 因为上面是接口,所以必须实现这个方法*/          
//            System.out.println("readFields ResultSet");
//            this.name = resultSet.getString(1);
//            this.age = resultSet.getInt(2);
        }
/*下一段代码在此程序中没用到。 */      
        public String toString() {
            return new String(this.name + " " + this.age);
        }
    }    
  
    public static class TokenizerMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
/* 输入数据
o1abc    45
o2kkk    77
*/
        protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static class IntSumReducer extends Reducer<LongWritable, Text, MytableWritable, MytableWritable> {
        String name1;
        protected void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
/*这是第一次见到key是map中原封不动的传到这里来,还是文件中的位置。*/          
            StringBuilder value = new StringBuilder();
            for(Text text : values){
                System.out.println("text is "+text);
                value.append(text);
                System.out.println("value is "+value);
            }
            System.out.println("for 外面了");
            String[] fieldArr = value.toString().split("\t");
            String name = fieldArr[0].trim();
            int age = 0;
            try{
               age = Integer.parseInt(fieldArr[1].trim());
            }catch(NumberFormatException e){
            }
            context.write(new MytableWritable(name, age), null);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
                "jdbc:mysql://localhost:3306/test", "root", "1234");
        conf.set("name", "马克-to-win");
        Job job = new Job(conf, "word count");
        job.setJarByClass(mysqlTest.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setReducerClass(IntSumReducer.class);

/*
void org.apache.hadoop.mapreduce.Job.setOutputFormatClass(Class<? extends OutputFormat> cls)
Set the OutputFormat for the job.
Parameters:cls the OutputFormat to use
*/      
        job.setOutputFormatClass(DBOutputFormat.class);    
/*
void org.apache.hadoop.mapreduce.lib.db.DBOutputFormat.setOutput(Job job, String tableName, String... fieldNames) throws IOException
Parameters:job
The jobtableName
The table to insert data into
fieldNames The field names in the table.*/      
        DBOutputFormat.setOutput(job, "hadoop", "name", "age");
        File file = new File("e:/temp/output");
        if (file.exists() && file.isDirectory()) {
            deleteFile(file);
        }
        FileInputFormat.setInputPaths(job, new Path("e:/temp/input/mysql.txt"));
        System.out.println("mytest hadoop successful");
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static boolean deleteFile(File dirFile) {
        if (!dirFile.exists()) {
            return false;
        }
        if (dirFile.isFile()) {
            return dirFile.delete();
        } else { /* 空目录就不进入for循环了, 进入到下一句dirFile.delete(); */
            for (File file : dirFile.listFiles()) {
                deleteFile(file);
            }
        }
        return dirFile.delete();
    }
}




输入文件:

o1abc    45
o2kkk    77


输出到数据库是: