[java]代码库
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;
//Driver
public class SparkWordCount1{
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().
setMaster("local").
setAppName("wordcount");
sparkConf.set("spark.default.parallelism", "4"); //设置分区数 默认分区器是Hash分区器
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
ctx.setLogLevel("ERROR");
//final JavaRDD<String> linesRdd = ctx.textFile(args[0]);
ArrayList<String> lines = new ArrayList<String>();
lines.add("A");
lines.add("D");
lines.add("C");
lines.add("F");
lines.add("E");
// lines.add("Hello Java Hi Ok");
// lines.add("Ok No House Hello");
// lines.add("Yes I Like Java");
// lines.add("No I Dislike Java");
JavaRDD<String> linesRdd = ctx.parallelize(lines,4); //干预分区数
System.out.println("linesRdd part num:"+ linesRdd.getNumPartitions());
System.out.println("linesRdd partitioner:"+ linesRdd.partitioner());
System.out.println("linesRdd:" +linesRdd.glom().collect());
JavaRDD<String> words = linesRdd.flatMap(
(s) -> Arrays.asList(s.split(" ")).iterator());
System.out.println("words part num:" + words.getNumPartitions());
System.out.println("words partitioner:" + words.partitioner());
System.out.println("words:" + words.glom().collect());
JavaPairRDD<String, Integer> ones = words.mapToPair(
s->new Tuple2<String, Integer>(s, 1));
//ones.repartition(5); //第一种人为干预分区数 , 优先级高于前面的
//第二种人为干预分区数 , 优先级高于前面的
ones = ones.partitionBy(new Partitioner() {
@Override
public int numPartitions() {
return 3;
}
@Override
public int getPartition(Object key) {
// TODO Auto-generated method stub
int hc = key.hashCode();
int index = hc % numPartitions();
return index;
}
});
System.out.println("ones part num:"+ ones.getNumPartitions());
System.out.println("ones partitioner:"+ ones.partitioner());
System.out.println("ones:" +ones.glom().collect());
JavaPairRDD<String, Integer> counts = ones.reduceByKey((x,y)->x+y);
System.out.println("counts part num:" + counts.getNumPartitions());
System.out.println("counts partitioner:" + counts.partitioner());
System.out.println("counts:" + counts.glom().collect());
//List<Tuple2<String, Integer>> results = counts.collect();
//System.out.println(results.toString());
//Scanner sc =new Scanner(System.in);
//sc.next();
//counts.foreach(System.out::println);
//cs.close();
ctx.close();
}
}