重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
学习spark任何技术之前,请正确理解spark,可以参考:正确理解spark
10年积累的成都网站建设、做网站经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站制作后付款的网站建设流程,更有老边免费网站建设让你可以放心的选择与我们合作。
我们在 http://7639240.blog.51cto.com/7629240/1966131 中已经知道了,一个scala函数其实就是java中的一个接口,对于java8 lambda而言,也是一样,一个lambda表达式就是java中的一个接口。接下来我们先看看spark中最简单的wordcount这个例子,分别用java8的非lambda以及lambda来实现:
一、非lambda实现的java spark wordcount程序:
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDDinputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(new FlatMapFunction () { @Override public Iterator call(String s) throws Exception { return Arrays.asList(s.split(" ")).iterator(); } }); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(new PairFunction () { @Override public Tuple2 call(String s) throws Exception { return new Tuple2 (s, 1); } }); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey(new HashPartitioner(2), new Function2 () { @Override public Integer call(Integer a, Integer b) throws Exception { return a + b; } }); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); } }
二、java8 lambda实现的wordcount代码
public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("appName").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); //JavaPairRDDinputRDD = sc.hadoopFile("hdfs://master:9999/user/word.txt", // TextInputFormat.class, LongWritable.class, Text.class); JavaRDD inputRDD = sc.textFile("file:///Users/tangweiqun/test.txt"); JavaRDD wordsRDD = inputRDD.flatMap(input -> Arrays.asList(input.split(" ")).iterator()); JavaPairRDD keyValueWordsRDD = wordsRDD.mapToPair(word -> new Tuple2 (word, 1)); JavaPairRDD wordCountRDD = keyValueWordsRDD.reduceByKey((a, b) -> a + b); //如果输出文件存在的话需要删除掉 File outputFile = new File("/Users/tangweiqun/wordcount"); if (outputFile.exists()) { File[] files = outputFile.listFiles(); for(File file: files) { file.delete(); } outputFile.delete(); } wordCountRDD.saveAsTextFile("file:///Users/tangweiqun/wordcount"); System.out.println(wordCountRDD.collect()); } }
从上面可以看出,lambda的实现更加简洁,也可以看出一个lambda函数表达式就是一个java接口。
我们在http://7639240.blog.51cto.com/7629240/1966958提到的combineByKey,如下的代码:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数 Function > createCombiner = new Function >() { @Override public Tuple2 call(Integer value) throws Exception { return new Tuple2<>(value, 1); } }; //当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数 Function2 , Integer, Tuple2 > mergeValue = new Function2 , Integer, Tuple2 >() { @Override public Tuple2 call(Tuple2 acc, Integer value) throws Exception { return new Tuple2<>(acc._1() + value, acc._2() + 1); } }; //当需要对不同分区的数据进行聚合的时候应用这个函数 Function2 , Tuple2 , Tuple2 > mergeCombiners = new Function2 , Tuple2 , Tuple2 >() { @Override public Tuple2 call(Tuple2 acc1, Tuple2 acc2) throws Exception { return new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); } }; JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //结果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
可以写成如下的lambda实现的combineByKey:
JavaPairRDDjavaPairRDD = sc.parallelizePairs(Arrays.asList(new Tuple2("coffee", 1), new Tuple2("coffee", 2), new Tuple2("panda", 3), new Tuple2("coffee", 9)), 2); //当在一个分区中遇到新的key的时候,对这个key对应的value应用这个函数 Function > createCombiner = value -> new Tuple2<>(value, 1); //当在一个分区中遇到已经应用过上面createCombiner函数的key的时候,对这个key对应的value应用这个函数 Function2 , Integer, Tuple2 > mergeValue = (acc, value) ->new Tuple2<>(acc._1() + value, acc._2() + 1); //当需要对不同分区的数据进行聚合的时候应用这个函数 Function2 , Tuple2 , Tuple2 > mergeCombiners = (acc1, acc2) -> new Tuple2<>(acc1._1() + acc2._1(), acc1._2() + acc2._2()); JavaPairRDD > combineByKeyRDD = javaPairRDD.combineByKey(createCombiner, mergeValue, mergeCombiners); //结果:[(coffee,(12,3)), (panda,(3,1))] System.out.println("combineByKeyRDD = " + combineByKeyRDD.collect());
如果想深入的系统的理解spark RDD api可以参考: spark core RDD api原理详解