
import org.apache.spark.Partitioner; |
import org.apache.spark.SparkConf; |
import org.apache.spark.api.java.JavaPairRDD; |
import org.apache.spark.api.java.JavaRDD; |
import org.apache.spark.api.java.JavaSparkContext; |
import org.apache.spark.api.java.function.FlatMapFunction; |
import org.apache.spark.api.java.function.Function2; |
import org.apache.spark.api.java.function.PairFunction; |
import scala.Tuple2; |
|
import java.util.ArrayList; |
import java.util.Arrays; |
import java.util.Iterator; |
import java.util.List; |
import java.util.Scanner; |
//Driver |
public class SparkWordCount1{ |
public static void main(String[] args) { |
SparkConf sparkConf = new SparkConf(). |
setMaster("local"). |
setAppName("wordcount"); |
|
sparkConf.set("spark.default.parallelism", "4"); //设置分区数 默认分区器是Hash分区器 |
JavaSparkContext ctx = new JavaSparkContext(sparkConf); |
ctx.setLogLevel("ERROR"); |
|
|
//final JavaRDD<String> linesRdd = ctx.textFile(args[0]); |
ArrayList<String> lines = new ArrayList<String>(); |
lines.add("A"); |
lines.add("D"); |
lines.add("C"); |
lines.add("F"); |
lines.add("E"); |
// lines.add("Hello Java Hi Ok"); |
// lines.add("Ok No House Hello"); |
// lines.add("Yes I Like Java"); |
// lines.add("No I Dislike Java"); |
|
|
JavaRDD<String> linesRdd = ctx.parallelize(lines,4); //干预分区数 |
|
|
System.out.println("linesRdd part num:"+ linesRdd.getNumPartitions()); |
System.out.println("linesRdd partitioner:"+ linesRdd.partitioner()); |
System.out.println("linesRdd:" +linesRdd.glom().collect()); |
|
|
JavaRDD<String> words = linesRdd.flatMap( |
(s) -> Arrays.asList(s.split(" ")).iterator()); |
System.out.println("words part num:" + words.getNumPartitions()); |
System.out.println("words partitioner:" + words.partitioner()); |
System.out.println("words:" + words.glom().collect()); |
|
JavaPairRDD<String, Integer> ones = words.mapToPair( |
s->new Tuple2<String, Integer>(s, 1)); |
|
//ones.repartition(5); //第一种人为干预分区数 , 优先级高于前面的 |
|
|
|
//第二种人为干预分区数 , 优先级高于前面的 |
ones = ones.partitionBy(new Partitioner() { |
|
@Override |
public int numPartitions() { |
return 3; |
} |
|
@Override |
public int getPartition(Object key) { |
// TODO Auto-generated method stub |
int hc = key.hashCode(); |
int index = hc % numPartitions(); |
|
return index; |
} |
}); |
|
System.out.println("ones part num:"+ ones.getNumPartitions()); |
System.out.println("ones partitioner:"+ ones.partitioner()); |
System.out.println("ones:" +ones.glom().collect()); |
|
|
|
|
|
JavaPairRDD<String, Integer> counts = ones.reduceByKey((x,y)->x+y); |
System.out.println("counts part num:" + counts.getNumPartitions()); |
System.out.println("counts partitioner:" + counts.partitioner()); |
System.out.println("counts:" + counts.glom().collect()); |
|
|
//List<Tuple2<String, Integer>> results = counts.collect(); |
//System.out.println(results.toString()); |
|
//Scanner sc =new Scanner(System.in); |
//sc.next(); |
|
|
|
//counts.foreach(System.out::println); |
//cs.close(); |
ctx.close(); |
} |
} |



