重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
1. 介绍
创新互联2013年开创至今,是专业互联网技术服务公司,拥有项目网站设计、成都网站设计网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元石峰做网站,已为上家服务,为石峰各地企业和个人服务,联系电话:18980820575
最近在研究一些消息中间件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。
官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。
1.1 Features
1). Distributed
NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。
2). Scalable易于扩展
NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。
3). Ops Friendly
NSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。
4.Integrated高度集成
官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。
1.2 组件
1.3 拓扑结构
NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。
NSQ
首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。
事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。
nsqd
每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。
nsqlookupd
2. Internals
2.1 消息传递担保
1)客户表示已经准备好接收消息
2)NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)
3)客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息
这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。
如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。
2.2 简化配置和管理
单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每个通道都接收到一个话题中所有消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题。
在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。
2.3 消除单点故障
NSQ被设计以分布的方式被使用。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例。没有中间人,没有消息代理,也没有单点故障。
这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有生产者。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,保证所有东西最终将被处理。对于 nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的 nsqlookupd 实例和合并 response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。
2.4 效率
对于数据的协议,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,称之为 RDY 状态,基本上是客户端流量控制的一种形式。
efficiency
2.5 心跳和超时
组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误。当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。
2.6 分布式
因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区。
这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧。唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集。清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义。虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显。
2.7 no replication
不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列。但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况。
2.8 没有严格的顺序
虽然Kafka由一个有序的日志构成,但NSQ不是。消息可以在任何时间以任何顺序进入队列。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况。
2.9 无数据重复删除功能
NSQ对于超时系统,它使用了心跳检测机制去测试消费者是否存活还是死亡。很多原因会导致我们的consumer无法完成心跳检测,所以在consumer中必须有一个单独的步骤确保幂等性。
3. 实践安装过程
本文将nsq集群具体的安装过程略去,大家可以自行参考官网,比较简单。这部分介绍下笔者实验的拓扑,以及nsqadmin的相关信息。
3.1 拓扑结构
topology
实验采用3台NSQD服务,2台LOOKUPD服务。
采用官方推荐的拓扑,消息发布的服务和NSQD在一台主机。一共5台机器。
NSQ基本没有配置文件,配置通过命令行指定参数。
主要命令如下:
LOOKUPD命令
NSQD命令
工具类,消费后存储到本地文件。
发布一条消息
3.2 nsqadmin
对Streams的详细信息进行查看,包括NSQD节点,具体的channel,队列中的消息数,连接数等信息。
nsqadmin
channel
列出所有的NSQD节点:
nodes
消息的统计:
msgs
lookup主机的列表:
hosts
4. 总结
NSQ基本核心就是简单性,是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者可以自行处理故障事件而不会影响系统剩下的其余部分。
事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。越来越多的consumer需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。
结合我们的业务系统来看,对于我们所需要传输的发票消息,相对比较敏感,无法容忍某个nsqd宕机,或者磁盘无法使用的情况,该节点堆积的消息无法找回。这是我们没有选择该消息中间件的主要原因。简单性和可靠性似乎并不能完全满足。相比Kafka,ops肩负起更多负责的运营。另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务。但对于其他适合NSQ的consumer,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础。
切换到新语言始终是一大步,尤其是当您的团队成员只有一个时有该语言的先前经验。现在,Stream 的主要编程语言从 Python 切换到了 Go。这篇文章将解释stream决定放弃 Python 并转向 Go 的一些原因。
Go 非常快。性能类似于 Java 或 C++。对于用例,Go 通常比 Python 快 40 倍。
对于许多应用程序来说,编程语言只是应用程序和数据库之间的粘合剂。语言本身的性能通常并不重要。然而,Stream 是一个API 提供商,为 700 家公司和超过 5 亿最终用户提供提要和聊天平台。多年来,我们一直在优化 Cassandra、PostgreSQL、Redis 等,但最终,您会达到所使用语言的极限。Python 是一门很棒的语言,但对于序列化/反序列化、排名和聚合等用例,它的性能相当缓慢。我们经常遇到性能问题,Cassandra 需要 1 毫秒来检索数据,而 Python 会花费接下来的 10 毫秒将其转换为对象。
看看我如何开始 Go 教程中的一小段 Go 代码。(这是一个很棒的教程,也是学习 Go 的一个很好的起点。)
如果您是 Go 新手,那么在阅读那个小代码片段时不会有太多让您感到惊讶的事情。它展示了多个赋值、数据结构、指针、格式和一个内置的 HTTP 库。当我第一次开始编程时,我一直喜欢使用 Python 更高级的功能。Python 允许您在编写代码时获得相当的创意。例如,您可以:
这些功能玩起来很有趣,但是,正如大多数程序员会同意的那样,在阅读别人的作品时,它们通常会使代码更难理解。Go 迫使你坚持基础。这使得阅读任何人的代码并立即了解发生了什么变得非常容易。 注意:当然,它实际上有多“容易”取决于您的用例。如果你想创建一个基本的 CRUD API,我仍然推荐 Django + DRF或 Rails。
作为一门语言,Go 试图让事情变得简单。它没有引入许多新概念。重点是创建一种非常快速且易于使用的简单语言。它唯一具有创新性的领域是 goroutine 和通道。(100% 正确CSP的概念始于 1977 年,所以这项创新更多是对旧思想的一种新方法。)Goroutines 是 Go 的轻量级线程方法,通道是 goroutines 之间通信的首选方式。Goroutines 的创建非常便宜,并且只需要几 KB 的额外内存。因为 Goroutine 非常轻量,所以有可能同时运行数百甚至数千个。您可以使用通道在 goroutine 之间进行通信。Go 运行时处理所有复杂性。goroutines 和基于通道的并发方法使得使用所有可用的 CPU 内核和处理并发 IO 变得非常容易——所有这些都不会使开发复杂化。与 Python/Java 相比,在 goroutine 上运行函数需要最少的样板代码。您只需在函数调用前加上关键字“go”:
Go 的并发方法很容易使用。与 Node 相比,这是一种有趣的方法,开发人员必须密切关注异步代码的处理方式。Go 中并发的另一个重要方面是竞争检测器。这样可以很容易地确定异步代码中是否存在任何竞争条件。
我们目前用 Go 编写的最大的微服务编译需要 4 秒。与以编译速度慢而闻名的 Java 和 C++ 等语言相比,Go 的快速编译时间是一项重大的生产力胜利。我喜欢在程序编译的时候摸鱼,但在我还记得代码应该做什么的同时完成事情会更好。
首先,让我们从显而易见的开始:与 C++ 和 Java 等旧语言相比,Go 开发人员的数量并不多。根据StackOverflow的数据, 38% 的开发人员知道 Java, 19.3% 的人知道 C++,只有 4.6% 的人知道 Go。GitHub 数据显示了类似的趋势:Go 比 Erlang、Scala 和 Elixir 等语言使用更广泛,但不如 Java 和 C++ 流行。幸运的是,Go 是一种非常简单易学的语言。它提供了您需要的基本功能,仅此而已。它引入的新概念是“延迟”声明和内置的并发管理与“goroutines”和通道。(对于纯粹主义者来说:Go 并不是第一种实现这些概念的语言,只是第一种使它们流行起来的语言。)任何加入团队的 Python、Elixir、C++、Scala 或 Java 开发人员都可以在一个月内在 Go 上发挥作用,因为它的简单性。与许多其他语言相比,我们发现组建 Go 开发人员团队更容易。如果您在博尔德和阿姆斯特丹等竞争激烈的生态系统中招聘人员,这是一项重要的优势。
对于我们这样规模的团队(约 20 人)来说,生态系统很重要。如果您必须重新发明每一个小功能,您根本无法为您的客户创造价值。Go 对我们使用的工具有很好的支持。实体库已经可用于 Redis、RabbitMQ、PostgreSQL、模板解析、任务调度、表达式解析和 RocksDB。与 Rust 或 Elixir 等其他较新的语言相比,Go 的生态系统是一个重大胜利。它当然不如 Java、Python 或 Node 之类的语言好,但它很可靠,而且对于许多基本需求,你会发现已经有高质量的包可用。
Gofmt 是一个很棒的命令行实用程序,内置在 Go 编译器中,用于格式化代码。就功能而言,它与 Python 的 autopep8 非常相似。我们大多数人并不真正喜欢争论制表符与空格。格式的一致性很重要,但实际的格式标准并不那么重要。Gofmt 通过使用一种正式的方式来格式化您的代码来避免所有这些讨论。
Go 对协议缓冲区和 gRPC 具有一流的支持。这两个工具非常适合构建需要通过 RPC 通信的微服务。您只需要编写一个清单,在其中定义可以进行的 RPC 调用以及它们采用的参数。然后从这个清单中自动生成服务器和客户端代码。生成的代码既快速又具有非常小的网络占用空间并且易于使用。从同一个清单中,您甚至可以为许多不同的语言生成客户端代码,例如 C++、Java、Python 和 Ruby。因此,内部流量不再有模棱两可的 REST 端点,您每次都必须编写几乎相同的客户端和服务器代码。.
Go 没有像 Rails 用于 Ruby、Django 用于 Python 或 Laravel 用于 PHP 那样的单一主导框架。这是 Go 社区内激烈争论的话题,因为许多人主张你不应该一开始就使用框架。我完全同意这对于某些用例是正确的。但是,如果有人想构建一个简单的 CRUD API,他们将更容易使用 Django/DJRF、Rails Laravel 或Phoenix。对于 Stream 的用例,我们更喜欢不使用框架。然而,对于许多希望提供简单 CRUD API 的新项目来说,缺乏主导框架将是一个严重的劣势。
Go 通过简单地从函数返回错误并期望调用代码来处理错误(或将其返回到调用堆栈)来处理错误。虽然这种方法有效,但很容易失去问题的范围,以确保您可以向用户提供有意义的错误。错误包通过允许您向错误添加上下文和堆栈跟踪来解决此问题。另一个问题是很容易忘记处理错误。像 errcheck 和 megacheck 这样的静态分析工具可以方便地避免犯这些错误。虽然这些变通办法效果很好,但感觉不太对劲。您希望该语言支持正确的错误处理。
Go 的包管理绝不是完美的。默认情况下,它无法指定特定版本的依赖项,也无法创建可重现的构建。Python、Node 和 Ruby 都有更好的包管理系统。但是,使用正确的工具,Go 的包管理工作得很好。您可以使用Dep来管理您的依赖项,以允许指定和固定版本。除此之外,我们还贡献了一个名为的开源工具VirtualGo,它可以更轻松地处理用 Go 编写的多个项目。
我们进行的一个有趣的实验是在 Python 中使用我们的排名提要功能并在 Go 中重写它。看看这个排名方法的例子:
Python 和 Go 代码都需要执行以下操作来支持这种排名方法:
开发 Python 版本的排名代码大约花了 3 天时间。这包括编写代码、单元测试和文档。接下来,我们花了大约 2 周的时间优化代码。其中一项优化是将分数表达式 (simple_gauss(time)*popularity) 转换为抽象语法树. 我们还实现了缓存逻辑,可以在未来的特定时间预先计算分数。相比之下,开发此代码的 Go 版本大约需要 4 天时间。性能不需要任何进一步的优化。因此,虽然 Python 的最初开发速度更快,但基于 Go 的版本最终需要我们团队的工作量大大减少。另外一个好处是,Go 代码的执行速度比我们高度优化的 Python 代码快大约 40 倍。现在,这只是我们通过切换到 Go 体验到的性能提升的一个示例。
与 Python 相比,我们系统的其他一些组件在 Go 中构建所需的时间要多得多。作为一个总体趋势,我们看到 开发 Go 代码需要更多的努力。但是,我们花更少的时间 优化 代码以提高性能。
我们评估的另一种语言是Elixir.。Elixir 建立在 Erlang 虚拟机之上。这是一种迷人的语言,我们之所以考虑它,是因为我们的一名团队成员在 Erlang 方面拥有丰富的经验。对于我们的用例,我们注意到 Go 的原始性能要好得多。Go 和 Elixir 都可以很好地服务数千个并发请求。但是,如果您查看单个请求的性能,Go 对于我们的用例来说要快得多。我们选择 Go 而不是 Elixir 的另一个原因是生态系统。对于我们需要的组件,Go 有更成熟的库,而在许多情况下,Elixir 库还没有准备好用于生产环境。培训/寻找开发人员使用 Elixir 也更加困难。这些原因使天平向 Go 倾斜。Elixir 的 Phoenix 框架看起来很棒,绝对值得一看。
Go 是一种非常高性能的语言,对并发有很好的支持。它几乎与 C++ 和 Java 等语言一样快。虽然与 Python 或 Ruby 相比,使用 Go 构建东西确实需要更多时间,但您将节省大量用于优化代码的时间。我们在Stream有一个小型开发团队,为超过 5 亿最终用户提供动力和聊天。Go 结合了 强大的生态系统 、新开发人员的 轻松入门、快速的性能 、对并发的 可靠支持和高效的编程环境 ,使其成为一个不错的选择。Stream 仍然在我们的仪表板、站点和机器学习中利用 Python 来提供个性化的订阅源. 我们不会很快与 Python 说再见,但今后所有性能密集型代码都将使用 Go 编写。我们新的聊天 API也完全用 Go 编写。
协程,又称微线程,纤程。英文名 Coroutine 。Python对协程的支持是通过 generator 实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用 next()函数 获取由 yield 语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。yield其实是终端当前的函数,返回给调用方。python3中使用yield来实现range,节省内存,提高性能,懒加载的模式。
asyncio是Python 3.4 版本引入的 标准库 ,直接内置了对异步IO的支持。
从Python 3.5 开始引入了新的语法 async 和 await ,用来简化yield的语法:
import asyncio
import threading
async def compute(x, y):
print("Compute %s + %s ..." % (x, y))
print(threading.current_thread().name)
await asyncio.sleep(x + y)
return x + y
async def print_sum(x, y):
result = await compute(x, y)
print("%s + %s = %s" % (x, y, result))
print(threading.current_thread().name)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
tasks = [print_sum(1, 2), print_sum(3, 4)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
线程是内核进行抢占式的调度的,这样就确保了每个线程都有执行的机会。而 coroutine 运行在同一个线程中,由语言的运行时中的 EventLoop(事件循环) 来进行调度。和大多数语言一样,在 Python 中,协程的调度是非抢占式的,也就是说一个协程必须主动让出执行机会,其他协程才有机会运行。
让出执行的关键字就是 await。也就是说一个协程如果阻塞了,持续不让出 CPU,那么整个线程就卡住了,没有任何并发。
PS: 作为服务端,event loop最核心的就是IO多路复用技术,所有来自客户端的请求都由IO多路复用函数来处理;作为客户端,event loop的核心在于利用Future对象延迟执行,并使用send函数激发协程,挂起,等待服务端处理完成返回后再调用CallBack函数继续下面的流程
Go语言的协程是 语言本身特性 ,erlang和golang都是采用了CSP(Communicating Sequential Processes)模式(Python中的协程是eventloop模型),但是erlang是基于进程的消息通信,go是基于goroutine和channel的通信。
Python和Go都引入了消息调度系统模型,来避免锁的影响和进程/线程开销大的问题。
协程从本质上来说是一种用户态的线程,不需要系统来执行抢占式调度,而是在语言层面实现线程的调度 。因为协程 不再使用共享内存/数据 ,而是使用 通信 来共享内存/锁,因为在一个超级大系统里具有无数的锁,共享变量等等会使得整个系统变得无比的臃肿,而通过消息机制来交流,可以使得每个并发的单元都成为一个独立的个体,拥有自己的变量,单元之间变量并不共享,对于单元的输入输出只有消息。开发者只需要关心在一个并发单元的输入与输出的影响,而不需要再考虑类似于修改共享内存/数据对其它程序的影响。
一、Kafka简述
1. 为什么需要用到消息队列
异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;
解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。
缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。
2.为什么选择kafka呢?
这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文
kafka的优点:
1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群
kafka的缺点:
1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高
3. Golang 操作kafka
3.1. kafka的环境
网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件
3.2. 第三方库
github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组
3.3. 消费者
单个消费者
funcconsumer(){varwg sync.WaitGroup consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{ fmt.Println("Failed to start consumer: %s", err)return} partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{ fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList { pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{ fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return} wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) }deferpc.AsyncClose() wg.Done() }(pc) } wg.Wait()}funcmain(){ consumer()}
消费组
funcconsumerCluster(){ groupID :="group-1"config := cluster.NewConfig() config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{ glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){ errors := c.Errors() noti := c.Notifications()for{select{caseerr := -errors: glog.Errorln(err)case-noti: } } }(c)formsg :=rangec.Messages() { fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value)) c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}
3.4. 生产者
同步生产者
packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){ config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{} msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!") client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{ fmt.Println("producer close err, ", err)return}deferclient.Close() pid, offset, err := client.SendMessage(msg)iferr !=nil{ fmt.Println("send message failed, ", err)return} fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}
异步生产者
funcasyncProducer(){ config := sarama.NewConfig() config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){ errors := p.Errors() success := p.Successes()for{select{caseerr := -errors:iferr !=nil{ glog.Errorln(err) }case-success: } } }(p)for{ v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000)) fmt.Fprintln(os.Stdout, v) msg := sarama.ProducerMessage{ Topic: topics, Value: sarama.ByteEncoder(v), } p.Input() - msg time.Sleep(time.Second *1) }}funcmain(){goasyncProducer()select{ }}
3.5. 结果展示-
同步生产打印:
分区ID:0,offset:90
消费打印:
Partition:0,Offset:90,key:,value:Hello World!
异步生产打印:
async:7272async:7616async:998
消费打印:
Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998