package org.platform.modules.elasticsearch.clean.ly; |
import java.io.IOException; |
import java.util.HashSet; |
import java.util.Map; |
import org.apache.hadoop.conf.Configuration; |
import org.apache.hadoop.fs.Path; |
import org.apache.hadoop.io.LongWritable; |
import org.apache.hadoop.io.Text; |
import org.apache.hadoop.mapreduce.Job; |
import org.apache.hadoop.mapreduce.Mapper; |
import org.apache.hadoop.mapreduce.Reducer; |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
import org.apache.hadoop.util.GenericOptionsParser; |
import org.platform.modules.elasticsearch.clean.util.CleanUtil; |
import com.google.gson.Gson; |
import com.google.gson.GsonBuilder; |
/** |
* 找出同时存在新浪账号和密码的记录,并保存 |
*/ |
public class FindSina { |
public static class Map1 extends Mapper<LongWritable, Text, Text, Text>{ |
|
public void setup(Context context) throws IOException, InterruptedException { |
super .setup(context); |
this .gson = new GsonBuilder().serializeSpecialFloatingPointValues() |
.setDateFormat( "yyyy-MM-dd HH:mm:ss" ).create(); |
} |
|
public Gson gson = null ; |
public String emailReg = "[\\w!#$%&'*+//=?^_`{|}~-]+(?:\\.[\\w!#$%&'*+//=?^_`{|}~-]+)*@sina\\.((com)|(cn))" ; |
@SuppressWarnings ( "unchecked" ) |
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ |
try { |
Map<String, Object> original = gson.fromJson(value.toString(), Map. class ); |
Map<String, Object> map = CleanUtil.replaceSpace(original); |
if (original.containsKey( "email" ) && original.containsKey( "passWord" )){ |
String email = (String)map.get( "email" ); |
String passWord = (String)map.get( "passWord" ); |
if (email.matches(emailReg) && ! "" .equals(passWord) && ! "NA" .equals(passWord) && ! "null" .equals(passWord)){ |
Text emailText = new Text(email); |
Text passWordText = new Text(passWord); |
context.write(emailText, passWordText); |
} |
} |
} catch (Exception e) { |
e.printStackTrace(); |
} |
} |
} |
public static class Reduce extends Reducer<Text, Text, Text, Text>{ |
|
public void reduce(Text key , Iterable<Text> value,Context context) throws IOException, InterruptedException{ |
StringBuffer password = new StringBuffer(); |
//考虑密码重复的记录 |
HashSet<String> set = new HashSet<String>(); |
for (Text str : value){ |
if (!set.contains(str.toString())){ |
if (password.length()== 0 ){ |
password = new StringBuffer(str.toString()); |
} else { |
password.append( "," +str.toString()); |
} |
set.add(str.toString()); |
} else { |
set.add(str.toString()); |
} |
} |
Text passwordText = new Text(password.toString()); |
context.write(key, passwordText); |
} |
} |
|
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ |
Configuration configuration = new Configuration(); |
configuration.set( "mapred.job.tracker" , "192.168.0.10" ); |
args = new String[]{ "hdfs://192.168.0.10:9000/elasticsearch_original/email/mailbox/20/records-10-m-00008" , "hdfs://192.168.0.10:9000/user/test/out/records-10-m-00008" }; |
String[] inputArgs = new GenericOptionsParser(configuration, args).getRemainingArgs(); |
if (inputArgs.length != 2 ) { |
System.out.println( "error, please input two path. input and output" ); |
System.exit( 2 ); |
} |
Job job = Job.getInstance(configuration, "FindSina" ); |
job.setJarByClass(FindSina. class ); |
|
job.setMapperClass(Map1. class ); |
job.setReducerClass(Reduce. class ); |
|
job.setOutputKeyClass(Text. class ); |
job.setOutputValueClass(Text. class ); |
|
FileInputFormat.addInputPath(job, new Path(inputArgs[ 0 ])); |
FileOutputFormat.setOutputPath(job, new Path(inputArgs[ 1 ])); |
System.exit(job.waitForCompletion( true ) ? 0 : 1 ); |
} |
|
} |