import org.apache.spark.Partitioner; |
import org.apache.spark.SparkConf; |
import org.apache.spark.api.java.JavaPairRDD; |
import org.apache.spark.api.java.JavaRDD; |
import org.apache.spark.api.java.JavaSparkContext; |
import org.apache.spark.api.java.function.FlatMapFunction; |
import org.apache.spark.api.java.function.Function2; |
import org.apache.spark.api.java.function.PairFunction; |
import scala.Tuple2; |
|
import java.util.ArrayList; |
import java.util.Arrays; |
import java.util.Iterator; |
import java.util.List; |
import java.util.Scanner; |
//Driver |
public class SparkWordCount1{ |
public static void main(String[] args) { |
SparkConf sparkConf = new SparkConf(). |
setMaster( "local" ). |
setAppName( "wordcount" ); |
|
sparkConf.set( "spark.default.parallelism" , "4" ); //设置分区数 默认分区器是Hash分区器 |
JavaSparkContext ctx = new JavaSparkContext(sparkConf); |
ctx.setLogLevel( "ERROR" ); |
|
|
//final JavaRDD<String> linesRdd = ctx.textFile(args[0]); |
ArrayList<String> lines = new ArrayList<String>(); |
lines.add( "A" ); |
lines.add( "D" ); |
lines.add( "C" ); |
lines.add( "F" ); |
lines.add( "E" ); |
// lines.add("Hello Java Hi Ok"); |
// lines.add("Ok No House Hello"); |
// lines.add("Yes I Like Java"); |
// lines.add("No I Dislike Java"); |
|
|
JavaRDD<String> linesRdd = ctx.parallelize(lines, 4 ); //干预分区数 |
|
|
System.out.println( "linesRdd part num:" + linesRdd.getNumPartitions()); |
System.out.println( "linesRdd partitioner:" + linesRdd.partitioner()); |
System.out.println( "linesRdd:" +linesRdd.glom().collect()); |
|
|
JavaRDD<String> words = linesRdd.flatMap( |
(s) -> Arrays.asList(s.split( " " )).iterator()); |
System.out.println( "words part num:" + words.getNumPartitions()); |
System.out.println( "words partitioner:" + words.partitioner()); |
System.out.println( "words:" + words.glom().collect()); |
|
JavaPairRDD<String, Integer> ones = words.mapToPair( |
s-> new Tuple2<String, Integer>(s, 1 )); |
|
//ones.repartition(5); //第一种人为干预分区数 , 优先级高于前面的 |
|
|
|
//第二种人为干预分区数 , 优先级高于前面的 |
ones = ones.partitionBy( new Partitioner() { |
|
@Override |
public int numPartitions() { |
return 3 ; |
} |
|
@Override |
public int getPartition(Object key) { |
// TODO Auto-generated method stub |
int hc = key.hashCode(); |
int index = hc % numPartitions(); |
|
return index; |
} |
}); |
|
System.out.println( "ones part num:" + ones.getNumPartitions()); |
System.out.println( "ones partitioner:" + ones.partitioner()); |
System.out.println( "ones:" +ones.glom().collect()); |
|
|
|
|
|
JavaPairRDD<String, Integer> counts = ones.reduceByKey((x,y)->x+y); |
System.out.println( "counts part num:" + counts.getNumPartitions()); |
System.out.println( "counts partitioner:" + counts.partitioner()); |
System.out.println( "counts:" + counts.glom().collect()); |
|
|
//List<Tuple2<String, Integer>> results = counts.collect(); |
//System.out.println(results.toString()); |
|
//Scanner sc =new Scanner(System.in); |
//sc.next(); |
|
|
|
//counts.foreach(System.out::println); |
//cs.close(); |
ctx.close(); |
} |
} |