用户注册



邮箱:

密码:

用户登录


邮箱:

密码:
记住登录一个月忘记密码?

发表随想


还能输入:200字
云代码 - java代码库

spark分区器的使用,只有部分的说明,只做参考

2018-11-01 作者: 杰杰杰举报

[java]代码库

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;  
import org.apache.spark.api.java.JavaPairRDD;  
import org.apache.spark.api.java.JavaRDD;  
import org.apache.spark.api.java.JavaSparkContext;  
import org.apache.spark.api.java.function.FlatMapFunction;  
import org.apache.spark.api.java.function.Function2;  
import org.apache.spark.api.java.function.PairFunction;  

import scala.Tuple2;  
  
import java.util.ArrayList;
import java.util.Arrays;  
import java.util.Iterator;
import java.util.List;
import java.util.Scanner;

//Driver
public class SparkWordCount1{
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().
        						  setMaster("local").
        						  setAppName("wordcount");  
        
        sparkConf.set("spark.default.parallelism", "4");   //设置分区数   默认分区器是Hash分区器
        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        ctx.setLogLevel("ERROR");
        
        
        //final JavaRDD<String> linesRdd = ctx.textFile(args[0]);
        ArrayList<String> lines = new ArrayList<String>();
        lines.add("A");
        lines.add("D");
        lines.add("C");
        lines.add("F");
        lines.add("E");
       // lines.add("Hello Java Hi Ok");
       // lines.add("Ok No House Hello");
       // lines.add("Yes I Like Java");
       // lines.add("No I Dislike Java");
        
        
        JavaRDD<String> linesRdd = ctx.parallelize(lines,4);  //干预分区数
        
        
        System.out.println("linesRdd part num:"+ linesRdd.getNumPartitions());
        System.out.println("linesRdd partitioner:"+ linesRdd.partitioner());
        System.out.println("linesRdd:" +linesRdd.glom().collect());
        
        
        JavaRDD<String> words = linesRdd.flatMap(
        						(s) -> Arrays.asList(s.split(" ")).iterator());
        System.out.println("words part num:" + words.getNumPartitions());
        System.out.println("words partitioner:" + words.partitioner());
        System.out.println("words:" + words.glom().collect());
        
        JavaPairRDD<String, Integer> ones = words.mapToPair(
        									s->new Tuple2<String, Integer>(s, 1));
        
        //ones.repartition(5);  //第一种人为干预分区数    ,  优先级高于前面的
        
        
        
        						//第二种人为干预分区数    ,  优先级高于前面的
        ones  = ones.partitionBy(new Partitioner() {
			
			@Override
			public int numPartitions() {
				return 3;
			}
			
			@Override
			public int getPartition(Object key) {
				// TODO Auto-generated method stub
				int hc = key.hashCode();
				int index = hc % numPartitions();
				
				return index;
			}
		});
        
        System.out.println("ones part num:"+ ones.getNumPartitions());
        System.out.println("ones partitioner:"+ ones.partitioner());
        System.out.println("ones:" +ones.glom().collect());
        
        
        
        
        
        JavaPairRDD<String, Integer> counts =  ones.reduceByKey((x,y)->x+y);   
        System.out.println("counts part num:" + counts.getNumPartitions());
        System.out.println("counts partitioner:" + counts.partitioner());
        System.out.println("counts:" + counts.glom().collect());
        
        
        //List<Tuple2<String, Integer>> results = counts.collect();
        //System.out.println(results.toString());
        
        //Scanner sc =new Scanner(System.in);
        //sc.next();
        
        
        
        //counts.foreach(System.out::println);
        //cs.close();
        ctx.close();
    }  
} 


网友评论    (发表评论)


发表评论:

评论须知:

  • 1、评论每次加2分,每天上限为30;
  • 2、请文明用语,共同创建干净的技术交流环境;
  • 3、若被发现提交非法信息,评论将会被删除,并且给予扣分处理,严重者给予封号处理;
  • 4、请勿发布广告信息或其他无关评论,否则将会删除评论并扣分,严重者给予封号处理。


扫码下载

加载中,请稍后...

输入口令后可复制整站源码

加载中,请稍后...