重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
今天就跟大家聊聊有关如何进行kafka批量消费多消费者问题分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。
专注于为中小企业提供网站制作、网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业瓮安免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了近千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。
package com.llw.medical.bs.listener;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.stereotype.Component;import java.util.List;import java.util.Optional;@Componentpublic class KafakaListener {@KafkaListener(id = "1", topics = {"topic2"})public void listen(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record =" + record); System.out.println("----------------- message =" + message); } }@KafkaListener(id = "2", topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"1", "2", "3"}// partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) })public void listen2(ConsumerRecord, ?> record) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 1=" + record); System.out.println("------------------ message 1=" + message); } }//id = "4", //id="4" @KafkaListener( id= "4",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") )*//* },*/ containerFactory = "kafkaBatchListener6")public void listen3(List> records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 4=" + record);// System.out.println("------------------ message 4=" + message); } } } finally {// ack.acknowledge(); } }//id="5" @KafkaListener(id = "5",groupId = "1",topics="topic1", /*topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) },*/ containerFactory = "kafkaBatchListener6")public void listen2(List > records) {//, Acknowledgment ack try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 6=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } }//https://www.cnblogs.com/linjiqin/p/13171789.html @KafkaListener(id = "6",groupId = "1",topics="topic1",/* topicPartitions = {@TopicPartition(topic = "topic1", partitions = {"0"} // partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4") ) }, */containerFactory = "kafkaBatchListener6")public void listen4(List > records) {try {for (ConsumerRecord, ?> record : records) { Optional> kafkaMessage = Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); System.out.println("----------------- record 3=" + record);// System.out.println("------------------ message 6=" + message); } } } finally {// ack.acknowledge(); } } }
一个partition只能有一个消费者,如果多个消费者会是广播模式,每个消费者都会有一条数据,kafka是一个发布和订阅模式的主键,并不是队列模式,
spring boot整合时,如果使用topicPartitions 注解参数指定partition会有消息重复消费的问题,最好使用topics注解,并指定groupId。
看完上述内容,你们对如何进行kafka批量消费多消费者问题分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注创新互联行业资讯频道,感谢大家的支持。