重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要介绍“DAG任务分解和Shuffle RDD怎么使用”,在日常操作中,相信很多人在DAG任务分解和Shuffle RDD怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”DAG任务分解和Shuffle RDD怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
站在用户的角度思考问题,与客户深入沟通,找到大柴旦网站设计与大柴旦网站推广的解决方案,凭借多年的经验,让设计与互联网技术结合,创造个性化、用户体验好的作品,建站类型包括:做网站、网站制作、企业官网、英文网站、手机端网站、网站推广、域名申请、虚拟空间、企业邮箱。业务覆盖大柴旦地区。
1、DagScheduler分析
DagScheduler功能主要是负责RDD的各个stage的分解和任务提交。Stage分解是从触发任务调度过程的finalStage开始倒推寻找父stage,如果父stage没有提交任务则循环提交缺失的父stage。每个stage有一个父RDD的概念,根据分区数的多少创建多个任务(Task)。
Task的调度实际是通过TaskSchedulerImp来完成的,TaskSchedulerImp里根据环境部署的不同又会使用不同的Backend,比如Yarn集群、独立集群等其Backend是不一样的,这里先有个概念,先不深究Backend。
这里先看看DagScheduler的核心逻辑把。里面首先要研究的一个方法:
def submitMissingTasks(stage: Stage, jobId: Int)
该方法就是提交stage执行,为什么叫这个名称呢?说明这里的stage是需先需要提交执行的,没有其他依赖的stage还未执行了。
submitMissingTasks方法会根据RDD的依赖关系创建两种task,ResultTask和ShuffleMapTask。
一步步来,只看关键代码,因为整体代码太多了不利于理解关键逻辑。
1.1 生成序列化的taskBinary
taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) }
taskBinaryBytes待会是要封装成对像分发到远端Executor上执行的,所以必须是可序列化的。
两者最主要区别就是:ShuffleMapStage的入参是依赖的shuffleDep;而ResultStage的入参是函数的定义func。
1.2 生成task
现在有了taskBinaryBytes,下一步就是生成Task了。
val tasks: Seq[Task[_]] = try { stage match { case stage: ShuffleMapStage => stage.pendingPartitions.clear() partitionsToCompute.map { id => val locs = taskIdToLocations(id) val part = partitions(id) stage.pendingPartitions += id new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } case stage: ResultStage => partitionsToCompute.map { id => val p: Int = stage.partitions(id) val part = partitions(p) val locs = taskIdToLocations(id) new ResultTask(stage.id, stage.latestInfo.attemptNumber, taskBinary, part, locs, id, properties, serializedTaskMetrics, Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier()) } } } catch { case NonFatal(e) => abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e)) runningStages -= stage return }
两种Task类型:ShuffleMapTask和ResultTask。这里要主要的是对Task而言,有多少分区(partition)就会生成多少个Task,Task是到分区维度的,而不是到RDD维度的,这个概念一定要明确。
1.3 提交Task
最后一步就是提交任务执行。这里就要用到taskScheduler了,当然了,这里的taskScheduler目前就是指TaskSchedulerImp。
taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
DagScheduler里还有一个方法这里可以提一下,就是:
submitWaitingChildStages(stage)
这个方法是提交等待当前stage执行的等待stage,这样DAG的整个调度过程就完整了。
2、Task执行
两种Task类型:ShuffleMapTask和ResultTask。
2.1 ResultTask
我们先看ResultTask的执行,它相对比较简单,核心方式是runTask,核心代码:
override def runTask(context: TaskContext): U = { val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) func(context, rdd.iterator(partition, context)) }
反序列化出来RDD和func,然后执行rdd的iterator方法获取数据集,并在此数据集上执行func函数,要注意实际上这是一次迭代过程而不是多次迭代过程。
2.2 ShuffleMapTask
ShuffleMapTask任务的执行相对复杂些。
核心方法还是runTask,核心代码:
override def runTask(context: TaskContext): MapStatus = { val ser = SparkEnv.get.closureSerializer.newInstance() val rddAndDep = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) val rdd = rddAndDep._1 val dep = rddAndDep._2 dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition) }
首先反序列化出RDD和依赖项ShuffleDependency。然后用ShuffleWriterProcessor写数据到RDD。
这里的dep其实没太大意义,主要就是来判断是否要进行合并使用的,不影响理解整个shuffle流程,所以我们可以先不要管dep:
dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
这里的rdd实际就是ShuffleMapTask所要生成的数据集。这句代码到底是什么意思呢? ShuffleWriterProcessor实际上是将数据集写到了BlockManager上去的,先看看ShuffleWriterProcessor的含义。
2.3 ShuffleWriterProcessor
ShuffleWriterProcessor的关键方法的定义先看一下。
def write(rdd: RDD[_],dep: ShuffleDependency[_, _, _], partitionId: Int, context: TaskContext,partition: Partition): MapStatus = { var writer: ShuffleWriter[Any, Any] = null val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context, createMetricsReporter(context)) writer.write( rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get }
ShuffleManager实际上就是BlockManager,管理块空间的。
Write是Shuffle写入器,写到BlockManager去;rdd.iterator(partition, context)就是当前Shuffle类型的RDD定义的数据集,dep是rdd计算数据集时依赖的RDD(这里的dep没多大意思先不管)。
这段代码的作用就是将shuffle rdd数据集输出到BlockManager上,在读取RDD的数据时,如果该RDD是shuffle类型,则需要到BlockManager上去读取,这里就是这个作用。
2.4 Shuffle RDD的相关概念
Shuffle类的RDD是指这类RDD的compute方法是依赖于其他RDD的,这里的其他RDD可以是多个。执行shuffle的RDD的计算过程的时候,是将一到多个依赖RDD的迭代器的输出作为数据源迭代器,在此之上执行自己的操作。所以shuffle RDD的compute方法里一定会用到依赖RDD的iterator方法。
可以看看CoGroupedRDD的源码,就能很快的理解shuffle的含义。
到此,关于“DAG任务分解和Shuffle RDD怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!