重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

怎么进行PulsarKafkaClient的简单分析

本篇文章给大家分享的是有关怎么进行Pulsar Kafka Client的简单分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

专注于为中小企业提供成都做网站、成都网站建设服务,电脑端+手机端+微信端的三站合一,更高效的管理,为中小企业渝中免费做网站提供优质的服务。我们立足成都,凝聚了一批互联网行业人才,有力地推动了数千家企业的稳健成长,帮助中小企业通过网站建设实现规模扩充和转变。

为了方便 Kafka 用户使用 Pulsar,Pulsar 对 Kafka Client 做了一些封装,让 Kafka 用户更方便的使用 Pulsar。

下面主要介绍 Kafka Client 如何将消息发送到 Pulsar, 并从 Pulsar 消费消息,以及如何使用 Pulsar Schema。    

⌨️引入依赖

  org.apache.pulsar  pulsar-client-kafka  {project.version}
依赖引入了 Kafka 的 0.10.2.1 版本的客户端,还有 Pulsar 对 Kafka Client 封装后的客户端。  

⌨️ 使用 Kafka Schema

>>>添加生产者代码

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
Producer producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord(topic, i, Integer.toString(i)));}
producer.close();
在上述配置中 topic 是指 Pulsar 中的 Topic,接着使用 Kafka 的配置方式来初始化各种配置,包括 Server 地址、key 的序列化与 value 的序列化类,然后构造一个 ProducerRecord 的类将其发送出去。  

>>> 添加消费者代码

String topic = "persistent://public/default/test";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
@SuppressWarnings("resource")Consumer consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
有些配置同生产者代码的配置是类似的,例如 topic,Server 等。另外使用 Kafka 的 group.id 作为配置 Pulsar 中的订阅名称,关闭自动提交,在消费者端为 key 和 value 配置的是反序列化的类。然后同常规的消费者类似,开始消费消息。  

⌨️使用 Pulsar Schema

在上述情况中使用的是 Kafka 的 Schema 来进行序列化与反序列化,当然也支持使用 Pulsar 的 Schema 来进行此过程。下面使用 AVRO 进行简单的介绍。
首先定义 Schema 所需要使用的 pojo 类。  
@Data@ToString@EqualsAndHashCodepublic class Foo {    @Nullable    private String field1;    @Nullable    private String field2;    private int field3;}
@Data@ToString@EqualsAndHashCodepublic class Bar {    private boolean field1;}

>>> 生产者端代码

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");
props.put("key.serializer", IntegerSerializer.class.getName());props.put("value.serializer", StringSerializer.class.getName());
AvroSchema barSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Bar.class).build());AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);

Producer producer = new KafkaProducer<>(props, fooSchema, barSchema);
for (int i = 0; i < 10; i++) {    producer.send(new ProducerRecord(topic, i, foo, bar));    log.info("Message {} sent successfully", i);}
producer.close();
可以看到大部分配置同上面使用 Kafka Client 的配置是类似的,但是中间加入了一些 Pulsar 的 Schema,使用 Foo 作为 key,使用 Bar 类作为 value。  

>>> 消费者端代码

String topic = "persistent://public/default/test-avro";
Properties props = new Properties();props.put("bootstrap.servers", "pulsar://localhost:6650");props.put("group.id", "my-subscription-name");props.put("enable.auto.commit", "false");props.put("key.deserializer", IntegerDeserializer.class.getName());props.put("value.deserializer", StringDeserializer.class.getName());
AvroSchema barSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Bar.class).build());AvroSchema fooSchema = AvroSchema.of(SchemaDefinition.builder().withPojo(Foo.class).build());
Bar bar = new Bar();bar.setField1(true);
Foo foo = new Foo();foo.setField1("field1");foo.setField2("field2");foo.setField3(3);
@SuppressWarnings("resource")Consumer consumer = new PulsarKafkaConsumer<>(props, fooSchema, barSchema);consumer.subscribe(Arrays.asList(topic));
while (true) {    ConsumerRecords records = consumer.poll(100);    records.forEach(record -> {        log.info("Received record: {}", record);    });
   // Commit last offset    consumer.commitSync();}
消费者端同样是类似的配置,使用与生产者端相同的 Schema 进行数据的反序列化。      

以上就是怎么进行Pulsar Kafka Client的简单分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注创新互联行业资讯频道。


本文题目:怎么进行PulsarKafkaClient的简单分析
浏览地址:http://cqcxhl.com/article/iipsdg.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP