重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

【SparkStreaming-创新互联

文章目录
  • 1.转换算子:
    • 案例需求:
  • sparkstreaming + kafka 整合 :
    • 版本选择:
    • 2.spark整合kafka api:
    • 查看kafka topic命令:
    • sparkstreaming里面: 开发模式:***
    • 3.提交offset信息
    • kafka消费语义:
    • 存储offset:

创新互联一直在为企业提供服务,多年的磨炼,使我们在创意设计,成都全网营销到技术研发拥有了开发经验。我们擅长倾听企业需求,挖掘用户对产品需求服务价值,为企业制作有用的创意设计体验。核心团队拥有超过十年以上行业经验,涵盖创意,策化,开发等专业领域,公司涉及领域有基础互联网服务托管服务器成都app开发、手机移动建站、网页设计、网络整合营销。1.转换算子:

transform

DStream 和 rdd之间数据进行交互的算子

流处理 数据源:
	一个数据来自于 mysql数据/hdfs上文本数据  【量小】  从表/维表 
	一个数据 来自于 kafka sss 读取形成 DStream数据 【量大】 主业务  =》 主表
案例需求:
弹幕 过滤的功能 /黑名单的功能 
离线:

弹幕: 主表
	不好看
	垃圾
	男主真帅
	女主真好看
	666
过滤的弹幕:维表 
	热巴真丑
	鸡儿真美
	王鹤棣退出娱乐圈

实时:

sparkstreaming + kafka 整合 :

kafka =》 sparkstreaming

版本选择:

spark 2.x : kafka版本: 0.8 0.10.0 or higher ok
spark 3.x =>kafka : 1.kafka版本: 0.10.0 or higher ok

spark 去kafka读取数据的方式:
1.kafka 0.8 reciver方式读取kafka数据 【效率低 、代码开发复杂】
2.kafka 0.10.0版本之后 direct stream的方式加载kafka数据 【效率高、代码开发简单】
kafka:
版本也有要求: 0.11.0 版本之后

交付语义: consumer producer
producer 默认就是精准一次
consumer 交付语义取决于 consumer 框架本身

交付语义: consumer

​ 至多一次 数据丢失问题
​ 至少一次 数据不会丢失,数据重复消费
​ 精准一次 数据不会丢失 数据也不会重复消费

spark 整合kafka 版本 0.10.0版本之后:
1.kafka 0.11.0之后 2.2.1 =>direct stream
2.sparkstreaming 默认消费kafka数据 交付语义:
至少一次

  1. spark消费kafka, DStream 【rdd 分区数】 =》 kafka topic 分区数 是一一对应的
    1:1 correspondence between Kafka partitions and Spark partitions,
2.spark整合kafka api:

​ 1.simple API =》 过时不用了

  1. new Kafka consumer API 整合 kafka 主流
    3.引入依赖: org.apache.spark spark-streaming-kafka-0-10_2.12 3.2.1

!!!不需要引入 kafka-clients 依赖

查看kafka topic命令:

kafka-topics.sh --list
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka

kafka-topics.sh --create
–zookeeper bigdata32:2181,bigdata33:2181,bigdata34:2181/kafka
–topic spark-kafka01 --partitions 3 --replication-factor 1

producer:
kafka-console-producer.sh
–broker-list bigdata33:9092,bigdata34:9092
–topic spark-kafka01

consumer:
kafka-console-consumer.sh
–bootstrap-server bigdata33:9092,bigdata34:9092
–topic spark-kafka
–from-beginning

val kafkaParams = Map[String, Object](
“bootstrap.servers” ->“bigdata33:9092,bigdata34:9092”,
“key.deserializer” ->classOf[StringDeserializer],
“value.deserializer” ->classOf[StringDeserializer],
“group.id” ->“dl2262_01”,
“auto.offset.reset” ->“latest”,
“enable.auto.commit” ->(false: java.lang.Boolean)
)

需求:
消费kafka数据 wc 将 结果写到 mysql里面

input
todo
output

kafka =>spark =>mysql 链路打通了

模拟:spark作业挂掉 =》 重启

“消费完kafka的数据 程序重启之后接着从上次消费的位置接着消费 ”

目前: code不能满足
1.目前代码 这两个参数 不能动
“auto.offset.reset” ->“earliest”
“enable.auto.commit” ->(false: java.lang.Boolean)

2.主要原因 : spark作业 消费kafka数据:
1.获取kafka offset =》 处理kafka数据 =》 “提交offset的操作” 没有
解决:
1.获取kafka offset // todo
2. 处理kafka数据
3.提交offset的操作 // todo

1.获取kafka offset // todo
1. kafka offset 信息
2.spark rdd分区数 和 kafka topic 的分区数 是不是 一对一

报错:
org.apache.spark.rdd.ShuffledRDD cannot be cast to org.apache.spark.streaming.kafka010.HasOffsetRanges

ShuffledRDD =》 HasOffsetRanges 说明 代码有问题

sparkstreaming里面: 开发模式:***

​ 1.获取kafka 流数据
​ 2. 流 Dstream =》 调用foreachRDD算子 进行输出:
​ 0.获取offset 信息
​ 1.做业务逻辑
​ 2.结果数据输出
​ 3.提交offset信息

offset解释:

01 batch:

rdd的分区数:3
topic partition fromOffset untilOffset
spark-kafka01 0 0 1
spark-kafka01 1 0 1
spark-kafka01 2 0 0

02 batch:
rdd的分区数:3
topic partition fromOffset untilOffset
spark-kafka01 0 1 1
spark-kafka01 1 1 1
spark-kafka01 2 0 0

此时 kafka 里面数据已经消费完了 fromOffset=untilOffset

3.提交offset信息

2.存储offset信息
spark流式处理 默认消费语义 : 至少一次
精准一次:
1.output + offset 同时完成
1.生产上Checkpoints不能用
2.Kafka itself =》至少一次
推荐使用 =》 简单 高效
90% 都可以解决 10% 精准一次
3.Your own data store: =》 开发大量代码 =》
mysql、redis、hbase、
至少一次
精准一次
mysql:
获取offset
todo
output
提交offset

spark作业挂了 =》 启动spark作业 :
1.从mysql里面获取offset
todo
output
提交offset

kafka消费语义:

​ 1.至多一次 【丢数据】
​ 2.至少一次 【不会丢数据 可能会重复消费数据】
​ 3.精准一次 【不丢、不重复消费】

offset信息提交 :
1.spark todo :
至少一次:
1 2 3 4
offset get
业务逻辑 output db
提交offset

精准一次:output + 提交offset 一起发生 =》 事务来实现
事务: 一次操作要么成功 要么失败

topic partition fromOffset untilOffset
spark-kafka01 0 3 3
spark-kafka01 2 2 2
spark-kafka01 1 2 2

存储offset:

​ kafka 本身:
​ offset 信息存储在哪?

kafka 某个topic下:
__consumer_offsets =》 spark作业 消费kafka的offset信息

topic offset 信息存储的地方

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


当前标题:【SparkStreaming-创新互联
当前地址:http://cqcxhl.com/article/dpgepc.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP