[java]代码库
package org.platform.modules.elasticsearch.clean.ly;
import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.platform.modules.elasticsearch.clean.util.CleanUtil;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
/**
* 找出同时存在新浪账号和密码的记录,并保存
*/
public class FindSina {
public static class Map1 extends Mapper<LongWritable, Text, Text, Text>{
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
this.gson = new GsonBuilder().serializeSpecialFloatingPointValues()
.setDateFormat("yyyy-MM-dd HH:mm:ss").create();
}
public Gson gson = null;
public String emailReg = "[\\w!#$%&'*+//=?^_`{|}~-]+(?:\\.[\\w!#$%&'*+//=?^_`{|}~-]+)*@sina\\.((com)|(cn))";
@SuppressWarnings("unchecked")
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
try {
Map<String, Object> original = gson.fromJson(value.toString(), Map.class);
Map<String, Object> map = CleanUtil.replaceSpace(original);
if(original.containsKey("email") && original.containsKey("passWord")){
String email = (String)map.get("email");
String passWord = (String)map.get("passWord");
if(email.matches(emailReg) && !"".equals(passWord) && !"NA".equals(passWord) && !"null".equals(passWord)){
Text emailText = new Text(email);
Text passWordText = new Text(passWord);
context.write(emailText, passWordText);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key , Iterable<Text> value,Context context) throws IOException, InterruptedException{
StringBuffer password = new StringBuffer();
//考虑密码重复的记录
HashSet<String> set = new HashSet<String>();
for(Text str : value){
if(!set.contains(str.toString())){
if(password.length()==0){
password = new StringBuffer(str.toString());
}else{
password.append(","+str.toString());
}
set.add(str.toString());
}else{
set.add(str.toString());
}
}
Text passwordText = new Text(password.toString());
context.write(key, passwordText);
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
Configuration configuration = new Configuration();
configuration.set("mapred.job.tracker", "192.168.0.10");
args = new String[]{"hdfs://192.168.0.10:9000/elasticsearch_original/email/mailbox/20/records-10-m-00008","hdfs://192.168.0.10:9000/user/test/out/records-10-m-00008"};
String[] inputArgs = new GenericOptionsParser(configuration, args).getRemainingArgs();
if (inputArgs.length != 2) {
System.out.println("error, please input two path. input and output");
System.exit(2);
}
Job job = Job.getInstance(configuration, "FindSina");
job.setJarByClass(FindSina.class);
job.setMapperClass(Map1.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(inputArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(inputArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}