重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
Spark Streaming中的反压机制是Spark 1.5.0推出的新特性,可以根据处理效率动态调整摄入速率。
在余干等地区,都构建了全面的区域性战略布局,加强发展的系统性、市场前瞻性、产品创新能力,以专注、极致的服务理念,为客户提供成都网站设计、成都做网站 网站设计制作按需定制开发,公司网站建设,企业网站建设,品牌网站设计,营销型网站,外贸网站建设,余干网站建设费用合理。当批处理时间(Batch Processing Time)大于批次间隔(Batch Interval,即 BatchDuration)时,说明处理数据的速度小于数据摄入的速度,持续时间过长或源头数据暴增,容易造成数据在内存中堆积,最终导致Executor OOM或任务奔溃。
在这种情况下,若是基于Kafka Receiver的数据源,可以通过设置spark.streaming.receiver.maxRate来控制大输入速率;若是基于Direct的数据源(如Kafka Direct Stream),则可以通过设置spark.streaming.kafka.maxRatePerPartition来控制大输入速率。当然,在事先经过压测,且流量高峰不会超过预期的情况下,设置这些参数一般没什么问题。但大值,不代表是最优值,最好还能根据每个批次处理情况来动态预估下个批次最优速率。在Spark 1.5.0以上,就可通过背压机制来实现。开启反压机制,即设置spark.streaming.backpressure.enabled为true,Spark Streaming会自动根据处理能力来调整输入速率,从而在流量高峰时仍能保证大的吞吐和性能。
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
// 处理结束时间
processingEnd <- batchCompleted.batchInfo.processingEndTime
// 处理时间,即`processingEndTime` - `processingStartTime`
workDelay <- batchCompleted.batchInfo.processingDelay
// 在调度队列中的等待时间,即`processingStartTime` - `submissionTime`
waitDelay <- batchCompleted.batchInfo.schedulingDelay
// 当前批次处理的记录数
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
可以看到,接着又调用的是computeAndPublish
方法,如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
// 根据处理时间、调度时间、当前Batch记录数,预估新速率
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
// 设置新速率
rateLimit.set(s.toLong)
// 发布新速率
publish(getLatestRate())
}
}
更深一层,具体调用的是rateEstimator.compute
方法来预估新速率,如下:
def compute(
time: Long,
elements: Long,
processingDelay: Long,
schedulingDelay: Long): Option[Double]
spark.streaming.backpressure.enabled
默认值false,是否启用反压机制。
spark.streaming.backpressure.initialRate
默认值无,初始大接收速率。只适用于Receiver Stream,不适用于Direct Stream。类型为整数,默认直接读取所有,在1开启的情况下,限制第一次批处理应该消费的数据,因为程序冷启动队列里面有大量积压,防止第一次全部读取,造成系统阻塞
spark.streaming.kafka.maxRatePerPartition
类型为整数,默认直接读取所有,限制每秒每个消费线程读取每个kafka分区大的数据量
注意: 只有 3 激活的时候,每次消费的大数据量,就是设置的数据量,如果不足这个数,就有多少读多少,如果超过这个数字,就读取这个数字的设置的值
只有 1+3 激活的时候,每次消费读取的数量大会等于3设置的值,最小是spark根据系统负载自动推断的值,消费的数据量会在这两个范围之内变化根据系统情况,但第一次启动会有多少读多少数据。此后按 1+3 设置规则运行
1+2+3 同时激活的时候,跟上一个消费情况基本一样,但第一次消费会得到限制,因为我们设置第一次消费的频率了。
spark.streaming.backpressure.rateEstimator
默认值pid,速率控制器,Spark 默认只支持此控制器,可自定义。
spark.streaming.backpressure.pid.proportional
默认值1.0,只能为非负值。当前速率与最后一批速率之间的差值对总控制信号贡献的权重。用默认值即可。
spark.streaming.backpressure.pid.integral
默认值0.2,只能为非负值。比例误差累积对总控制信号贡献的权重。用默认值即可。
spark.streaming.backpressure.pid.derived
默认值0.0,只能为非负值。比例误差变化对总控制信号贡献的权重。用默认值即可。
spark.streaming.backpressure.pid.minRate
默认值100,只能为正数,最小速率。
//启用反压机制
conf.set("spark.streaming.backpressure.enabled","true")
//最小摄入条数控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//大摄入条数控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
//初始大接收速率控制
conf.set("spark.streaming.backpressure.initialRate","10")
要保证反压机制真正起作用前Spark 应用程序不会崩溃,需要控制每个批次大摄入速率。以Direct Stream为例,如Kafka Direct Stream,则可以通过spark.streaming.kafka.maxRatePerPartition参数来控制。此参数代表了 每秒每个分区大摄入的数据条数。假设BatchDuration为10秒,spark.streaming.kafka.maxRatePerPartition为12条,kafka topic 分区数为3个,则一个批(Batch)大读取的数据条数为360条(31210=360)。同时,需要注意,该参数也代表了整个应用生命周期中的大速率,即使是背压调整的大值也不会超过该参数。
另外有需要云服务器可以了解下创新互联scvps.cn,海内外云服务器15元起步,三天无理由+7*72小时售后在线,公司持有idc许可证,提供“云服务器、裸金属服务器、高防服务器、香港服务器、美国服务器、虚拟主机、免备案服务器”等云主机租用服务以及企业上云的综合解决方案,具有“安全稳定、简单易用、服务可用性高、性价比高”等特点与优势,专为企业上云打造定制,能够满足用户丰富、多元化的应用场景需求。