重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
如何进行Kafka学习,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
黄岛ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:13518219792(备注:SSL证书合作)期待与您的合作!
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并消费消息
注意: 1.消息被消费以后,queue中不再有存储,所以消息消费者不可能消费到已经被消费的消息 2.Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费 |
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费 |
支持的协议多,非常重量级消息队列,对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持 |
号称最快的消息队列系统,尤其针对大吞吐量的需求场景,擅长高级/复杂的队列,但是技术也复杂,并且只提供非持久性队列 |
Apache下的一个子项,类似ZeroMQ,能够以代理人和点对点的技术实现队列 |
一个key-value的NoSql数据库,但也支持MQ功能,数据量较小,性能优于RabbitMQ,数据超过10K就慢的无法忍受 |
Kafka是分布式发布-订阅消息系统。它最初由Linkedln公司开发,使用Scala语言编写,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的(分区处理),多订阅者,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据 |
1.同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50M),每秒处理55万消息(110M)
2.可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失
3.分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式。无需停机即可扩展机器
4.消息被处理的状态是在consumer端维护,而不是由server端维护
5.支持online(上线)和offline(下线)的场景 |
重要说明: 1.在Kafka的体系中不存在单读的Conmuser,它会存在一个Conmuser Group,Conmuser Group里面会有多个Conmuser
2.可以把Consumer Group看成一个虚拟的Consumer,它消费的是一个具体的Topic的数据,但具体执行是由Consumer Group中的Consumer去执行的,Consumer是一个逻辑上的概念,是不存在的,而存在的是Consumer Group当中的Consumer, 一个Consumer Group对应的是Topic,Consumer Group中的Consumer对应的是Topic中的partition
3.一个消费者组里面的多个消费者对应的是什么呢? Topic组里面不同Partition的数据,一个Partition里面的数据交给一个Consumer来处理,另一个Partition里面的数据交给另一个Consumer来处理,当然它们必须是同一个Consumer Group里面的Consumer,这就达到了并行的消费(每一个Consumer对应的是一个Partition里面的数据)
4.Kafka为什么会有Partition的概念? 带来的好处就是处理的速度更快,不同的Conmuser去消费不同Partition的消息,数据的消费就变成了并行的 |
特指消息的生产者 |
特指消息的消费者 |
消费者组,可以并行消费Topic中Partition的消息 |
缓存代理,Kafka集群中的一台或多台服务器统称为broker
1.message在broker中通过log追加的方式进行持久化存储。并进行分区(patitions)
2.为了减少磁盘写入的次数,broker会将消息展示buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数
3.Broker没有副本机制,一旦broker宕机,该broker的消息将不可用(但是消息是有副本的,可以把消息的副本同步到其它的broker中)
4.Broker不保存订阅者的状态,由订阅者自己保存
5.无状态导致消息的删除成为难题(可能删除的消息正在被订阅)Kafka采用基于时间的SLA(服务水平保证),消息保存一定的时间(通常为7天)后会被删除
6.消息订阅者可以rewind back到任意位置重新进行消费,当订阅者故障时,可以选择最小的offset(id)进行重新读取消费消息 |
特指Kafka处理的消息源(feeds of messages)的不同分类 |
1.Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)
2.Kafka的Partitions分区的目的 2.1 kafka基于文件存储,通过分区,可以将日志内容分线到多个server上,来避免文件尺寸达到单击磁盘的上线,每个partition都会被当前server(kafka实例)保存
2.2 可以将一个topic切分任意多个partitions来提高消息保存/消费的效率
2.3 越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力 |
1.消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息
2.Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic有可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的)每个partition存储一部分Message
3.partition中的每条Message包含了一下三个属性 属性名称 数据类型 offset long MessageSize int32 data mssage的具体内容 |
消息和数据生产者,向kafka的一个topic发布消息的过程叫做producers
1.producer将消息发布到指定的topic中,同时producer也能决定将此消息归属于哪个partition。比如基于“round-robin”方式或者通过其他的一些算法等
2.异步发送:批量发送可以很有效的提高发送效率。kafka producer的异步发送模式允许进行批量发送,先将消息缓存在内存中,然后一次请求批量发送出去 |
1.消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers
2.在Kafka中,我们可以认为一个group是一个“订阅者”,一个Topic中的每个partition只会被一个“订阅者”中的一个consumer消费,不过一个consumer可以消费多个partition中的消息(消费者数据小于Partition的数量时)
3.注意:kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息 |
1.发现线性的访问磁盘,很多时候比随机的内存访问快的多
2.传统的使用内存作为磁盘的缓存
3.Kafka直接将数据写入到日志文件
|
1.写操作:通过将数据追加到文件中实现
2.读操作:读的时候从文件读就好了 |
1.读操作不会阻塞写稻作和其它操作,数据大小不对性能产生影响
2.没有容量限制(相对于内存来说)的硬盘空间建立消息系统
3.线性访问磁盘,速度快,可以保存任意一段时间 |
1.一个Tipic可以认为是一个类消息,每个topic将被分成多个partition,每个partition在存储层面是append log文件。任何发布到此partition的消息都会被直接追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),partition是以文件的形式存储在文件系统中
2.Logs文件根据broker中的配置要求,保留一定时间后删除来释放磁盘空间(默认是7天)
说明:Partition是Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。Partition中每条消息都会被分配一个有序的id(offset) |
为数据文件建立索引:稀疏存储,每隔一定字节的数据建立一条索引。下图是一个partition的索引示意图
注意: 1. 现在对1、3、6、8 建立了索引,如果要查找7,则会先查找到8然后,再找到8后的一个索引6,然后两个索引之间做二分法,找到7的位置
2. 日志文件也会进行segement(分割),分而治之 |
注意: 1.当生产者将消息发送到Kafka后,就会去立刻通知ZooKeeper,zookeeper中会watch到相关的动作,当watch到相关的数据变化后,会通知消费者去消费消息
2.消费者是主动去Pull(拉)kafka中的消息,这样可以降低Broker的压力,因为Broker中的消息是无状态的,Broker也不知道哪个消息是可以消费的
3.当消费者消费了一条消息后,也必须要去通知ZooKeeper。zookeeper会记录下消费的数据,这样但系统出现问题后就可以还原,可以知道哪些消息已经被消费了 |
说明: 1.Name Server集群指的是Zookeeper集群 |
1.Kafka的通讯协议主要说的是,consumer去拉数据使用的通讯协议
2.Kafka的Producer、Broker和Consumer采用的是一套自行设计基于TCP层的协议,根据业务需求定制,而非实现一套类似于Protocol Buffer的通讯协议
3.基本数据类型 3.1定长数据类型:int8,int16,int32和int64,对应到Java中就是byte,short,int和long 3.2变长数据类型:bytes和string。变长的数据类型由两部分组成,分别是一个有符号整数N(标识内容的长度)和N个字节的内容。其中N为-1标识内容为null。Bytes的长度由int32标识,string的长度由int16表示 3.3数组:数组由两个部分组成,分别是一个有int32类型的数字标识的数组长度N和N个元素 |
1.Kafka通讯的基本单位是Request/Response
2.基本结构: RequestOrResponse ---> MessageSize(RequestMessage | ResponseMessage) 名称类型描述ApiKeyInt16标识这次请求的API编号ApiVersionInt16标识请求的API版本,有了版本后就可以做到向后兼容CorrelationIdInt32由客户端指定的一个数字唯一标识这次请求的id,服务器端在处理请求后也会把同样的CorrelationId写到Response中,这样客户端就能把某个请求和响应对应起来了ClientIdstring客户端指定的用来描述客户端的字符串,会被用来记录日志和监控,它唯一标识一个客户端Request-Request的具体内容
3.通讯过程: 3.1客户端打开与服务端的Socket 3.2往Socket写入一个int32的数字(数字标识这次发送的Request有多少字节) 3.3服务器端先读出一个int32的整数从而获取这次Request的大小 3.4然后读取对应字节数的数据从而得到Request的具体内容 3.5服务器端处理了请求之后也用同样的发送发誓来发送响应
4.RequestMessage结构 4.1RequestMessage ---> ApiKey ApiVersion CorrelationId ClientId Request 名称类型描述MessageSizeint32标识RequestMessage或者ResponseMessage的长度RequestMessageResponseMessage--标识Request或者Response的内容
5.ResponseMessage 5.1ResponseMessage ---> CorrelationId Response 名称类型描述CorrelationIdint32对应Request的CorrelationIdResponse--对应Request的Response,不同的Request的Response的字段是不一样的
Kafka采用是经典的Reactor(同步IO)模式,也就是1个Acceptor响应客户端的连接请求,N个Processor来读取数据,这种模式可以构建出高性能的服务器
6.Message:Producer生产的消息,键-值对 6.1Message --- > Crc MagicByte Attributes Key Value 名称类型描述CRCInt32标识这条消息(不包括CRC字段本身)的校验码MagicByteInt8标识消息格式的版本,用来做向后兼容,目前值为0AttributesInt8标识这条消息的元数据,目前最低两位用来标识压缩格式Keybytes标识这条消息的Key,可以为nullValuebytes标识这条消息的Value。Kafka支持消息嵌套,也就是把一条消息作为Value放到另外一个消息里面
说明: CRC是一种消息检验方式,在Consumer拿到数据以后,CRC会获取MessageSize和MessageData的大小做比较,如果不一致则,那么这个操作的数据Consumer就不接收了,如果一直则才做处理。防止消息在传输过程中损坏,丢失的一种校验方式
7.MessageSet:用来组合多条Message,它在每条Message的基础上加上offset和MessageSize 7.1MessageSet --> [offset MessageSize Message] 名称类型描述OffsetInt64它用来作为log中的序列号,Producer在生产消息的时候还不知道具体的值是什么,可以随便填个数字进去MessageSizeInt32标识这条Message的大小Message-标识这条Message的具体内容,其格式见上一小结
8.Request/Response和Message/messageSet的关系 8.1 Request/Response是通讯层的结构,和网络的7层模型对比的话,它类似于TCP 8.2 Message/MessageSet定义的是业务层的结构,类似于网络7层模型中的HTTP层。Message/MessageSet只是Request/Response的payload中的一种数据结构 备注:Kafka的通讯协议中不包含Schema,格式也比较简单,这样设计的好处是协议自身的Overhead小,再加上把多条Message放到一期做压缩,提高压缩比率,从而在网络上传输的数据量会少一些 |
1.at most once:最多一次,这个和JMS中“非持久化”消息类似,发送一次,无论成败,将不会重发 消费者fetch(得到)消息,然后保存offset,然后处理消息; 当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理,那么伺候“未处理”的消息将不能被fetch到,这就是“at most once”
2.at least once:消息至少发送一次,如果消息未能接收成功,可能会重发,知道接收成功 消费者fetch消息,然后处理消息,然后保存offset,如果消息处理成功之后,但是在保存offset阶段zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是“at least once”,原因offset没有即使的提交给zookeeper,zookeeper恢复正常还是之前offset状态。 注:通常情况下“at least once”是我们的首选(相比at most once而言,重复接收数据总比丢失数据要好)
3.exactly once:消息只会发送一次 Kafka中并没有严格的去实现(基于2阶段提交,事务),我们认为这种策略在kafka中是没有必要的 |
1.下载并上传kafka到服务器
2.解压缩并移动到/usr/local目录下
3.启动服务 3.1启动zookeeper服务 # ./zookeeper-server-start.sh ../config/zookeeper.properties > /dev/null 2>&1 &
3.2启动kafka服务 # ./kafka-server-start.sh ../config/server.properties > /dev/null 2>&1 &
3.3创建topic: ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
3.4查看主题 ./kafka-topics.sh --list --zookeeper localhost:2181
3.5查看主题详情 ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
3.6删除主题 ./kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper 192.168.31.220:2181 |
./kafka-console-producer.sh --broker-list localhost:9092 --topic test1 |
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test1 --from-beginning |
生产者参数查看:./kafka-console-producer.sh 消费者参数查看:./kafka-console-consumer.sh |
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。