重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
学习spark任何的技术之前,请正确理解spark,可以参考:正确理解spark
成都创新互联是一家集网站建设,昭通企业网站建设,昭通品牌网站建设,网站定制,昭通网站建设报价,网络营销,网络优化,昭通网站推广为一体的创新建站企业,帮助传统企业提升企业形象加强企业竞争力。可充分满足这一群体相比中小企业更为丰富、高端、多元的互联网需求。同时我们时刻保持专业、时尚、前沿,时刻以成就客户成长自我,坚持不断学习、思考、沉淀、净化自己,让我们为更多的企业打造出实用型网站。
RDD java api其实底层是调用了scala的api来实现的,所以我们有必要对java api是怎么样去调用scala api,我们先自己简单的实现一个scala版本和java版本的RDD和SparkContext
一、简单实现scala版本的RDD和SparkContext
class RDD[T](value: Seq[T]) { //RDD的map操作 def map[U](f: T => U): RDD[U] = { new RDD(value.map(f)) } def iterator[T] = value.iterator } class SparkContext { //创建一个RDD def createRDD(): RDD[Integer] = new RDD[Integer](Seq(1, 2, 3)) }
二、简单实现java版本的RDD和SparkContext
//这个时java中的一个接口 //我们可以将scala中的map需要的函数其实就是对应着java中的一个接口 package com.twq.javaapi.java7.function; public interface Functionextends Serializable { R call(T1 v1) throws Exception; } //这边实现的java版的RDD和SparkContext其实还是用scala代码实现,只不过这些scala代码可以被java代码调用了 import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ import com.twq.javaapi.java7.function.{Function => JFunction} //每一个JavaRDD都会含有一个scala的RDD,用于调用该RDD的api class JavaRDD[T](val rdd: RDD[T]) { def map[R](f: JFunction[T, R]): JavaRDD[R] = //这里是关键,调用scala RDD中的map方法 //我们将java的接口构造成scala RDD的map需要的函数函数 new JavaRDD(rdd.map(x => f.call(x))) //我们需要将scala的Iterator转成java版的Iterator def iterator: JIterator[T] = rdd.iterator.asJava } //每个JavaSparkContext含有一个scala版本的SparkContext class JavaSparkContext(sc: SparkContext) { def this() = this(new SparkContext()) //转调scala版本的SparkContext来实现JavaSparkContext的功能 def createRDD(): JavaRDD[Integer] = new JavaRDD[Integer](sc.createRDD()) }
三、写java代码调用rdd java api
package com.twq.javaapi.java7; import com.twq.javaapi.java7.function.Function; import com.twq.rdd.api.JavaRDD; import com.twq.rdd.api.JavaSparkContext; import java.util.Iterator; /** * Created by tangweiqun on 2017/9/16. */ public class SelfImplJavaRDDTest { public static void main(String[] args) { //初始化JavaSparkContext JavaSparkContext jsc = new JavaSparkContext(); //调用JavaSparkContext的api创建一个RDD JavaRDDfirstRDD = jsc.createRDD(); //对创建好的firstRDD应用JavaRDD中的map操作 JavaRDD strRDD = firstRDD.map(new Function () { @Override public String call(Integer v1) throws Exception { return v1 + "test"; } }); //将得到的RDD的结果打印,结果为 //1test //2test //3test Iterator result = strRDD.iterator(); while (result.hasNext()) { System.out.println(result.next()); } } }
以上就是RDD java api调用scala api的实现原理,虽然只举了map操作,但是其他的类似于flatMap操作的实现都是类似的
接下来可以详细了解RDD java的每一个api了
我们可以参考spark core RDD api来详细理解scala中的每一个api。。。
系统学习spark:
1、[老汤] Spark 2.x 之精讲Spark Core:https://edu.51cto.com/sd/88429
2、[老汤]Spark 2.x 之精讲Spark SQL专题:https://edu.51cto.com/sd/16f3d
3、[老汤]Scala内功修炼系列专题:https://edu.51cto.com/sd/8e85b
4、[老汤]Spark 2.x之精讲Spark Streamig:https://edu.51cto.com/sd/8c525
5、[老汤]Spark 2.x精讲套餐:https://edu.51cto.com/sd/ff9a4
6、从Scala到Spark 2.x专题:https://edu.51cto.com/sd/d72af