重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
这篇文章主要讲解了“Apache Pulsar启动了哪些服务”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Apache Pulsar启动了哪些服务”吧!
离石ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!
PulsarStandaloneStarter
在standalone模式下,主要启动了以下几个服务
PulsarService
PulsarAdmin
LocalBookeeperEnsemble
WorkerService
PulsarBrokerStarter.BrokerStarter
在普通模式下,启动了以下几个服务
PulsarService
BookieServer
AutoRecoveryMain
StatsProvider
WorkerService
简单说一些这几个服务
WorkerService: Pulsar function 相关,可以不启动
PulsarService: 主要的PulsarBroker相关
BookieServer: Bookeeper相关
AutoRecoveryMain: Bookeeper autorecovery相关
StatsProvider: Metric Exporter类似的功能
PulsarService.start
ProtocolHandlers
支持不同protocol处理(kafka协议等)
localZookeeperConnectionProvider
维护zk session 和zk连接
startZkCacheService
LocalZooKeeperCache => LocalZooKeeperCacheService
GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
创建配置Bookkeeper 客户端
managedLedgerClientFactory
维护一个ManagedLedger的客户端,借用BookkeeperClient
BrokerService
这个是服务器的主要逻辑了,这个放在后面说
loadManager
收集集群机器负载,并根据负载情况均衡负载
startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相关
schemaStorage
schemaRegistryService
上面2个都是和Schema相关的
defaultOffloader
LedgerOffloader,用来将Ledger(Bookkeeper)中的冷数据放到其他存储当中
WebService
webSocketService
http,websocket相关
LeaderElectionService
和LoadManager有关,如果是集中方式的话需要选出一个Leader定期根据集群情况进行均衡负载
transactionMetadataStoreService
事务相关
metricGenerator
metric相关
WorkerService
pulsar function 相关
public void start() throws Exception { // producer id 分布式生成器 this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); // 网络层配置 ServerBootstrap bootstrap = defaultServerBootstrap.clone(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); ... // 绑定端口 listenChannel = bootstrap.bind(addr).sync().channel(); ... // metric this.startStatsUpdater( serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs()); // 启动了一堆需要定期执行的任务 this.startInactivityMonitor(); // 启动3个schedule任务分别检测 // 1. 长时间无效的topic // 2. 长时间无效的producer(和message去重相关) // 3. 长时间无效的subscription this.startMessageExpiryMonitor(); this.startCompactionMonitor(); this.startMessagePublishBufferMonitor(); this.startConsumedLedgersMonitor(); this.startBacklogQuotaChecker(); this.updateBrokerPublisherThrottlingMaxRate(); this.startCheckReplicationPolicies(); // register listener to capture zk-latency ClientCnxnAspect.addListener(zkStatsListener); ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
顺着netty的初始化方式我们直接看ChannelInitializer,这里应该和Kafka类似进行处理请求的操作。
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("flowController", new FlowControlHandler()); ServerCnx cnx = new ServerCnx(pulsar); ch.pipeline().addLast("handler", cnx); connections.put(ch.remoteAddress(), cnx); }
这个类的作用可以对标KafkaApis,处理各种Api请求
这个类实际上是一个ChannelHandler
继承了PulsarHandler(主要负责一些连接的keepalive逻辑)
PulsarHandler继承了 PulsarDecoder ( 主要负责序列化,反序列化Api请求)
PulsarDecoder实际上是一个 ChannelInboundHandlerAdapter
而PulsarAPi实际上是通过Pulsar.proto 生成的,这里编写了各种Api的定义
感谢各位的阅读,以上就是“Apache Pulsar启动了哪些服务”的内容了,经过本文的学习后,相信大家对Apache Pulsar启动了哪些服务这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!