重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
使用同步方式来解决多个服务之间的通信
同步通信的方式方式会存在性能和稳定性的问题
使用异步的通信方式
异步的优势:可以让上游快速成功,明显提升系统吞吐量;即使有服务失败,也可以通过分布式事务解决方案来保证最终是成功的,也能保障业务执行之后的最终一致性。
消息队列解决具体的是什么问题--------通信问题
目前消息队列的中间件选型有很多种:
这些消息队列中间件有什么区别?
kafka是一个分布式、支持分区(partition)、多副本(replica),基于zookeeper协调的分布式消息系统,大特点是可以实时的处理大量数据
kafka使用场景kafka的安装
1.部署一台zookeeper服务器
2.安装jdk
3.下载kafka的安装包:https:kafka.apache.org/download
4.上传到kafka服务器上并解压:/usr/local/kafka
5. 进入conf目录内,修改server.properties
```powershell
#broker.id属性在kafka集群中必须要唯一
broker.id=0
#kafka部署的机器ip和提供服务的端口
listeners=PLAINTEXT://10.234.252.122:9092
#kafka消息存储文件
log.dirs=/usr/local/kafka
#kafka连接zookeeper的地址
zookeeper.connect=10.234.252.122:2181
```
```powershell
#进入到bin目录内,执行以下命令来启动kafka服务器(带着配置文件)
./kafka-server-start.sh -daemon ../config/server.properties
#校验kafka是否启动成功,进入到zk内查看是否有kafka的节点
ls /brokers/ids #查询出有broker的id则存在
```
创建topic
执行以下命令创建名为“test”的topic,这个topic只有一个partition,并且备份因子也设置为1:
#./kafka-topic.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看当前kafka内有那些topic
./kafka-topic.sh --zookeeper localhost:2182 --list
./kafka-console-producer.sh --broker-list 10.234.252.122:9092 --topic test
>hello
>world
>1111
>22222222
方式1:从最后一条消息的偏移量(offset)+1开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test
方式2:从开始消费
#./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --from-beginning
>hello
>world
>1111
>22222222
/usr/local/kafka/kafka-logs/主题-分区/000000.log
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup1
./kafka-console.consumer.sh --bootstrap-server 10.234.252.122:9092 --topic test --consumer-property group.id=testgroup2
查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --list
查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 10.234.252.122:9092 --describe --group testgroup
注意:
主题topic
主题-topic在kafka中是一个逻辑的概念,kafka通过topic将消息进行分类,不同的topic会被订阅该topic的消费者消费。
但是有一个问题,如果说这个topic中的消息非常非常多,多到需要几T来存,因为消息是会被保存到log日志文件中的,为了解决这个文件过大的问题,kafka提出来分区的概念
partition分区
通过partition将一个topic中的消息分区来存储,这样的好处:
分区的作用:
分布式存储
可以并行写
**为一个主题创建多个分区**
./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --replication-factor 1 --topic test1
kafka中消息日志文件中保存的内容
00000.log:这个文件中保存的就是消息
__consumer_offsets-49:kafka内部自己创建了 __consumer_offsets主题,包含了50个分区,这个主题用来存放消费者消费某个主题的偏移量,因为每个消费者都会自己维护着消费的主题的偏移量,也就是说每个消费者会把消费的主题的偏移量自主的上报给kafka中的默认主题:__consumer_offsets。因此kafka为了提升这个主题的并发性,默认设置了50个分区
提交到那个分区:通过hash函数:hash(consumergroupid)%_consumer_offsets主题的分区数
提交到该主题中的内容是:key是consumergroupid+topic+分区号,value就是当前offsets的值
文件中保存的消息,默认保存7天,7天后消息会被删除
在创建主题时,除了指明主题的分区数以外,还指明了副本数
副本是为了为主题中的分区创建多个备份,多个副本在kafka集群的多个broker中,会有一个副本作为leader,其他是follower
./kafka-console-producer.sh --broker-list 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --topic my-replicated-topic
./kafka-console-consumer.sh --bootstrap-server 10.234.252.122:9092,10.234.252.209:9092,10.234.253.22:9092 --from-beginning --topic my-replicated-topic
关于分区消费组消费者的细节
如果生产者发送消息没有收到ack,生产者会阻塞,阻塞到3s的时间,如果还没有收到消息,会进行重试。重试的次数为3次
异步发送,生产者发完消息后就可以执行之后的业务,broker在收到消息后异步调用生产者提供的callback回调方法
在同步发送的前提下,生产者在获得集群返回的ack之前会一直阻塞,那么集群什么时候返回ACK呢?此时ACK有三个配置:
min.insync.replica=2
(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完成后,才会返回ACK给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差其他一些细节:
消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets
主题提交当前主题-分区消费的偏移量
提交的内容
消费者无论是自动提交还是手动提交,都需要把所属的消费组+消费的主题+消费的某个分区及消费的偏移量
这样的信息提交到集群的_consumer_offsets
主题里面
自动提交
消费者poll消息下来以后就会自动提交offset
自动提交会丢消息:因为如果消费者还没消费完poll下来的消息就会自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息,之前未被消费的消息就会丢失掉了
手动提交
需要把自动提交的配置改成false
手动提交又分成两种
- 手动同步提交
在消费完消息后调用同步提交的方法,当集群返回ACK前一直阻塞,返回ACK后表示提交成功,执行之后的逻辑
- 手动异步提交
在消息消费完后提交,不需要等到集群ACK,直接执行之后的逻辑,可以设置一个回调方法,供集群调用
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧