重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Spark的基础介绍和操作调优

本篇内容介绍了“Spark的基础介绍和操作调优”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

本篇内容介绍了“Spark的基础介绍和操作调优”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

Spark 基础介绍

网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、小程序开发、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了巴楚免费建站欢迎大家使用!

在讨论spark调优之前,先看看spark里的一些概念。

action

Action是得到非RDD结果的RDD操作。如Spark中有如下常见action操作: reduce, collect, count, first, take, takeSample, countByKey, saveAsTextFile

job

每个spark的action会被分解成一个job。

stage

一个job会被分成多组task,每组task称为一个stage。stage的划分界限为以下两种task之一:

shuffleMapTask - 所有的wide transformation之前,可以简单认为是shuffle之前

resultTask - 可以简单认为是take()之类的操作

partition

RDD 包含固定数目的 partition, 每个 partiton 包含若干的 record。narrow tansformation

(比如 map 和 filter)返回的 RDD,一个 partition 中的 record 只需要从父 RDD 对应的 partition 中的 record 计算得到。同样narrow transformation不会改变partition的个数。

task

被送到executor上执行的工作单元; 一个task只能做一个stage中的一个partition的数据。

操作调优

调整在 stage 边届时的 partition 个数经常可以很大程度上影响程序的执行效率;

associative reductive operation, 能使用reduceByKey时不使用groupByKey,因为grouByKey会把所有数据shuffle一遍,而reduceByKey只会Shuffle reduce的结果。

输入和输出结果不一样时,不使用reduceByKey,而使用aggregateByKey;aggregateByKey

: Aggregate the values of each key, using given combine functions and a neutral "zero value". This function can return a different result type, U, than the type of the values in this RDD, V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, as in scala.TraversableOnce. The former operation is used for merging values within a partition, and the latter is used for merging values between partitions. To avoid memory allocation, both of these functions are allowed to modify and return their first argument instead of creating a new U.

不要用flatMap-join-groupBy的模式,可以用cogroup;

当两个reduceByKey的结果join时,如果大家的partition都一样,则spark不会在join时做shuffle;

当一个内存能放得下的数据集join时,可以考虑broadcast而不使用join;

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)scala> broadcastVar.valueres0: Array[Int] = Array(1, 2, 3)资源调优

spark中的资源可以简单归结为CPU和内存,而以下的参数会影响内存和CPU的使用。

executor 越大并行性越好,越大每个executor所有的内存就越小;

core,越大并行性越好;

HDFS client 在大量并发线程是时性能问题。大概的估计是每个 executor 中最多5个并行的 task 就可以占满的写入带宽。

partition,如果比excutor*core小则很傻;越多每个partition占用的内存就越少;足够大以后对性能提升不再有用。

我naive的认为应该这样调整:

core = min(5,cpu核数);

executor = instance数 * cpu核数 / core

平均每instance的executor个数决定executor.memory,从而决定shuffle.memory和storage.memory;

估计总数据量,即最大的shuffle时的数据大小(spark driver运行记录中会有shuffle size);

用4的结果除以3得到partition数,如果很小,把partition设成和(executor*core)的若干倍.


分享标题:Spark的基础介绍和操作调优
本文网址:http://cqcxhl.com/article/cjpps.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP