重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

go语言实现的消息队列 go语言实现的消息队列是什么

go语言能做什么?

很多朋友可能知道Go语言的优势在哪,却不知道Go语言适合用于哪些地方。

创新互联公司-专业网站定制、快速模板网站建设、高性价比井陉矿网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式井陉矿网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖井陉矿地区。费用合理售后完善,十余年实体公司更值得信赖。

1、 Go语言作为服务器编程语言,很适合处理日志、数据打包、虚拟机处理、文件系统、分布式系统、数据库代理等;网络编程方面。Go语言广泛应用于Web应用、API应用、下载应用等;除此之外,Go语言还可用于内存数据库和云平台领域,目前国外很多云平台都是采用Go开发。

2、 其实Go语言主要用作服务器端开发。其定位是用来开发"大型软件"的,适合于很多程序员一起开发大型软件,并且开发周期长,支持云计算的网络服务。Go语言能够让程序员快速开发,并且在软件不断的增长过程中,它能让程序员更容易地进行维护和修改。它融合了传统编译型语言的高效性和脚本语言的易用性和富于表达性。

3、 Go语言成功案例。Nsq:Nsq是由Go语言开发的高性能、高可用消息队列系统,性能非常高,每天能处理数十亿条的消息;

4、 Docker:基于lxc的一个虚拟打包工具,能够实现PAAS平台的组建。

5、 Packer:用来生成不同平台的镜像文件,例如VM、vbox、AWS等,作者是vagrant的作者

6、 Skynet:分布式调度框架。

7、 Doozer:分布式同步工具,类似ZooKeeper。

8、 Heka:mazila开源的日志处理系统。

9、 Cbfs:couchbase开源的分布式文件系统。

10、 Tsuru:开源的PAAS平台,和SAE实现的功能一模一样。

11、 Groupcache:memcahe作者写的用于Google下载系统的缓存系统。

12、 God:类似redis的缓存系统,但是支持分布式和扩展性。

13、 Gor:网络流量抓包和重放工具。

以上的就是关于go语言能做什么的内容介绍了。

Go语言使用NSQ消息队列

重点提示:

这样我们就启动了一个 nsqd 的实例

编写一个消息生产者

nsq_single_product.go

编写一个消息消费者

nsq_single_consumer.go

添加第一个实例

添加第二个实例

消息生产者

nsq_cluster_product.go

消息消费者

nsq_cluster_consumer.go

golangchannel和mq的区别

golangchannel和mq的区别

我是一个着迷于产品和运营的技术人,乐于跨界的终身学习者。欢迎关注我哟~

每周五12点 按时送达~

我的第「218」篇原创敬上

大家好,我是Z哥。

最近在项目中遇到了一个使用 RabbitMQ 时的问题,这个问题我觉得还是有一定普适性的,和大家分享一下,避免大家后续在同一个问题上犯错。

消息队列(MQ)是在软件开发中很常用的中间件,如果一个程序需要协调另一个程序进行数据的“write”操作,并且不关心“write”的结果,则便会选择它。它是一个保存消息(数据)的容器,由它来确保消息一定被送达到目标程序。

打个比喻来说,消息队列就是一个邮差,它负责将信件(消息)从源头送往目的地,并且根据信件重要性的不同,提供当面签收确认或者直接投放两种服务。

RabbitMQ 就是一个典型的消息队列,以 AMQP 为标准。历史也比较悠久,大概是从 2007年研发出来的,用的编程语言Erlang也同样具有年代感。

需要简单介绍一下 Erlang 的特点,它对我们理解 RabbitMQ 有很大的帮助。

Erlang 是一种运行于“虚拟机”(类似 JVM)的解释性语言。是一个结构化,动态类型编程语言,内建并行计算支持。使用 Erlang 编写出的应用运行时通常由成千上万个轻量级“进程”(并非传统意义上的进程)组成,并通过消息传递相互通讯。进程间上下文切换对于 Erlang 来说仅仅 只是一两个环节,比起 C 程序的线程切换要高效得多得多了。

——整理于百度百科的资料

不管是什么 MQ 中间件,作为消息的生产方和消费方都需要和 MQ 的服务端建立连接进行通讯。

一般这个连接都会使用 TCP 协议,在 RabbitMQ 里也不例外。大多数 RabbitMQ 的 SDK 都会将连接封装为一个「Connection」对象。

还没完,大多数的 MQ 中间件还会在「Connection」的基础上增加一个「Channel」的概念,以通过复用的方式提高 TCP 连接的利用率,因为建立和销毁 TCP 连接是非常昂贵的开销。在 RabbitMQ 中的复用 TCP 连接方式是「Non-blocking I/O」的模式。

关于NIO,「Non-blocking I/O」的概念,有感兴趣的话可以跳转去看之前写的这篇文章。(用最通俗的话讲明白阻塞/非阻塞/异步/同步,到底啥区别?)

多说一句,任何方案都不是“银弹”。当每个 Channel 的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是 Channel 本身的流量很大时,这时候多个 Channel 复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些 Channel 均摊到这些 Connection 中,至于哪些 Channel 使用那个 Connection 以及Connection 与 Channel 之间的数量关系是多少,需要根据业务自身的实际情况进行调节。

Channel 在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。比如, channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

可能你要问了,Channel 是不是也能像 Connection 一样被复用?这是个好问题,也是我们这次遇到问题的关键点。

结论是:可以,但是需要自己保证客户端对 Channel 访问的线程安全问题,因为在 Channel 的另一端,在 RabbitMQ 的服务端,每个 Channel 由一个单独的“进程”所管理,如果由于多线程复用Channel 导致数据帧乱序了,RabbitMQ 的服务端会主动关闭整个 Connection 。

因此,我们这次犯的错误就是多线程复用了同一个 Channel 导致的问题。所以,如果你也用到 streadway/amqp 这个库的话,需要特别注意这点。

不过,不同语言的SDK内部实现不同,我们分别使用 Golang 的 AMQP 库 streadway/amqp,和 RabbitMQ 官方提供的 C# 版本的库分别模拟过同样的场景,前者出现问题,后者却没有问题。

受限于时间原因,没有具体去核实 C# 库的源码,主观猜测是 C# 库内部多做了一些对于单个 Channel 的线程安全处理。

最后,我整理了三点使用 streadway/amqp 库的最佳实践,你可以看看:

01

golang 中使用 streadway/amqp 时,需要保证每一个线程单独一个 Channel。

streadway/amqp 库中的获取一个 Channel 的方法「Connection.channel()」是线程安全的。但是内部有一个 defaultChannelMax 的参数对 Channel 的数量进行了限制,默认是 (2 10) - 1,2047。这个需要注意:

02

我们可以通过调用 amqp.DialConfig(url string, config Config) 来调整个限制。

但是,并不是你调整了多少就是多少,还需要和 RabbitMQ 服务端的配置进行 min() 函数的处理,最终为两者的最小值。

Tips:特别是用云厂商的 MQ 产品,因为阶梯收费的原因会对很多性能参数做限制,需要格外关注这点,比如某版本的阿里云 RabbitMQ 实例限制是单个 Connection 最多 64 个 Channel)

03

正如前面对 Erlang 的简单介绍,Erlang 是一个天然支持多“进程”设计的语言,所以在 RabbitMQ 的服务端设计中,每一个 Queue,每一个 Connection 都是单独的一个“进程”。因此如果你想尽可能地压榨 RabbitMQ 性能,可以通过建立更多的 Connection 或者创建更多的 Queue 来实现,当然需要注意到 Connection 的创建和销毁的性能开销问题。

推荐阅读:

减少联调、高效集成,试试这个工具

golang使用3周总结

也可以「关注」我,带你以技术思维看世界~

想更进一步和我一起玩耍,欢迎「搜索微信公号:跨界架构师」。

内容包括:架构设计丨分布式系统丨产品丨运营丨个人深度思考。

go语言循环队列的实现

队列的概念在 顺序队列 中,而使用循环队列的目的主要是规避假溢出造成的空间浪费,在使用循环队列处理假溢出时,主要有三种解决方案

本文提供后两种解决方案。

顺序队和循环队列是一种特殊的线性表,与顺序栈类似,都是使用一组地址连续的存储单元依次存放自队头到队尾的数据元素,同时附设队头(front)和队尾(rear)两个指针,但我们要明白一点,这个指针并不是指针变量,而是用来表示数组当中元素下标的位置。

本文使用切片来完成的循环队列,由于一开始使用三个参数的make关键字创建切片,在输出的结果中不包含nil值(看起来很舒服),而且在验证的过程中发现使用append()函数时切片内置的cap会发生变化,在消除了种种障碍后得到了一个四不像的循环队列,即设置的指针是顺序队列的指针,但实际上进行的操作是顺序队列的操作。最后是对make()函数和append()函数的一些使用体验和小结,队列的应用放在链队好了。

官方描述(片段)

即切片是一个抽象层,底层是对数组的引用。

当我们使用

构建出来的切片的每个位置的值都被赋为interface类型的初始值nil,但是nil值也是有大小的。

而使用

来进行初始化时,虽然生成的切片中不包含nil值,但是无法通过设置的指针变量来完成入队和出队的操作,只能使用append()函数来进行操作

在go语言中,切片是一片连续的内存空间加上长度与容量的标识,比数组更为常用。使用 append 关键字向切片中追加元素也是常见的切片操作

正是基于此,在使用go语言完成循环队列时,首先想到的就是使用make(type, len, cap)关键字方式完成切片初始化,然后使用append()函数来操作该切片,但这一方式出现了很多问题。在使用append()函数时,切片的cap可能会发生变化,用不好就会发生扩容或收缩。最终造成的结果是一个四不像的结果,入队和出队操作变得与指针变量无关,失去了作为循环队列的意义,用在顺序队列还算合适。

参考博客:

Go语言中的Nil

Golang之nil

Go 语言设计与实现

golang实现本地延迟队列

有个服务会大量使用延迟消息,进行事件处理。随着业务量不断上涨。在晚间、节假日等流量高峰期消息延迟消息队列限流会导致事件丢失,影响业务。与下游沟通后给上调到了最大限流值,问题依然存在,于是决定自己搞一套降级方案。

下游服务触发限流时,能降级部分流量到本地延迟队列,把业务损失降到最低。

本地延迟队列承接部分mq流量

流程如下:

1. 使用zset 存储延迟消息,其中:score为执行时间,value为消息体

2. 启动协程轮询zset,获取score最小的10条数据,协程执行间隔时间xs

如果最小分值小于等于当前时间戳,则发送消息

若最小分值大于当前时间戳,sleep等待执行

需要对key进行hash,打散到多个分片中,避免大key和热key问题,官方大key定义

因此,需保证每个key中value数量n5000,单个value大小不超过 10240/n kb

假设承接10w qps,如何处理?

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息

假如每条消息大小500b,需占用存储6000000kb = 6000Mb = 6GB

为避免大key问题,每个zset存放4000个元素,需要哈希到3000(3000是key的数量,可配置)个zset中。

整个集群假设500台实例,每个处理qps平均在200左右。

单实例消费能力计算:

遍历每个zset,针对每个zset起goroutine处理,此示例中需要 起3000个

但是每秒能处理成功的只有200个,其他都在空跑

综上:

将redis key分片数n和每次处理的消息数m进行动态配置,便于调整

当流量上涨时,调大分片数n和单实例单分片并发数m即可,假如消费间隔200ms,集群处理能力为n*m*5 qps

n = (qps * 120) / 4000

若qps=q,则计算公式如下

zadd = q

zRange = 500 * 5 * n / 500

zRemove = q

setNx = 500 * 5 * n

若10w qps,则

读qps = 15000 + 500*3000*5 =7515000,写 20w

pros

redis 读写性能好,可支持较大并发量,zrange可直接取出到达执行时间的消息

cons

redis 大key问题导致对数据量有一定的限制

分片数量扩缩容会漏消费,会导致事件丢失,业务有损

key分片数量过多时,redis读写压力较大

机器资源浪费,3000个协程,单实例同一秒只有200个针对处理,其他都在空跑

流程如下:

使用带缓冲的channel来实现延迟队列,channel中存放的数据为消息体(包括执行时间),channel能保证先进先出

从channel中取出数据后,判断是否到达执行时间

到达,同步发送mq

未到达,sleep 剩余执行时间,然后再次执行

从channel读出的数据如果未到达执行时间,无法再次放入channel中,需要协程sleep(执行时间-当前时间)

10w qps延迟120s时,最开始消息队列会积累100000*120=12000000条消息,假设每条消息大小500b,需要6G存储空间

channel 大小 = (qps*120)/ c , c=集群实例数,c=500 = channel大小为24000,占用12M内存

要处理10w qps,分摊到每个机器的处理速度为 100000/500 = 200,假设单协程处理10qps,开20个即可。

pros:

本地存储,相比redis,读写速度更快;协程数量少,开销低;资源利用率较方案一高

cons:

稳定性不如redis,实例故障可能导致数据丢失;worker池和channel扩缩容依赖服务重启,成本高速度慢

综上,我们以10w qps为例,对比两种方案在以下指标差异,选择方案二。

附上demo

Go语言能做什么?

Go 语言被设计成一门应用于搭载 Web 服务器,存储集群或类似用途的巨型中央服务器的系统编程语言。对于高性能分布式系统领域而言,Go 语言无疑比大多数其它语言有着更高的开发效率。学习Go语言,可以说是很简单的,入门快,想学习Go语言,可以到黑马程序员看看,有新出的教程。


网站名称:go语言实现的消息队列 go语言实现的消息队列是什么
URL链接:http://cqcxhl.com/article/docespi.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP