重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
MQTT设计了一套保证消息稳定传输的机制,包括消息应答、存储和重传。
创新互联专业为企业提供定襄网站建设、定襄做网站、定襄网站设计、定襄网站制作等企业网站建设、网页设计与制作、定襄企业网站模板建站服务,10余年定襄做网站经验,不只是建网站,更提供有价值的思路和整体网络服务。
为了保证消息被正确的接收
在这套机制下,提供了三种不同层次QoS(Quality of Service):
QoS 是消息的发送方(Sender)和接受方(Receiver)之间达成的一个协议:
::: warning
QoS是Sender和Receiver之间的协议,而不是Publisher和Subscriber之间的协议。
换句话说,Publisher发布了一条QoS1的消息,只能保证Broker能至少收到一次这个消息;
而对于Subscriber能否至少收到一次这个消息,还要取决于Subscriber在Subscribe的时候和Broker协商的QoS等级。
:::
QoS0等级下,Sender和Receiver之间一次消息的传递流程如下:
Sender向Receiver发送一个包含消息数据的PUBLISH包,然后不管结果如何,丢掉已发送的PUBLISH包,一条消息的发送完成。
QoS1要保证消息至少到达一次,所以有一个应答的机制。Sender和Receiver的一次消息的传递流程如下:
1.Sender向Receiver发送一个带有数据的PUBLISH包,并在本地保存这个PUBLISH包;
2.Receiver收到PUBLISH包以后,向Sender发送一个PUBACK数据包,PUBACK数据包没有消息体(Payload),在可变头中有一个包标识(Packet Identifier),和它收到的PUBLISH包中的Packet Identifier一致。
3.Sender收到PUBACK之后,根据PUBACK包中的Packet Identifier找到本地保存的PUBLISH包,然后丢弃掉,一次消息的发送完成。
但是消息传递流程中可能会出现问题:
相比QoS0和QoS1,QoS2不仅要确保Receiver能收到Sender发送的消息,还需要确保消息不重复。它的重传和应答机制就要复杂一些,同时开销也是最大的。QoS2下,一次消息的传递流程如下所示:
1.Sender发送QoS为2的PUBLISH数据包,数据包 Packet Identifier 为 P,并在本地保存该PUBLISH包;
2.Receiver收到PUBLISH数据包后,在本地保存PUBLISH包的Packet Identifier P,并回复Sender一个PUBREC数据包,PUBREC数据包可变头中的Packet Identifier为P,没有消息体(Payload);
3.当Sender收到PUBREC,它就可以安全的丢弃掉初始Packet Identifier为P的PUBLISH数据包。同时保存该PUBREC数据包,并回复Receiver一个PUBREL数据包,PUBREL数据包可变头中的Packet Identifier为P,没有消息体;
4.当Receiver收到PUBREL数据包,它可以丢掉保存的PUBLISH包的Packet Identifier P,并回复Sender一个可变头中 Packet Identifier 为 P,没有消息体(Payload)的PUBCOMP数据包;
5.当Sender收到PUBCOMP包,那么认为传输已完成,则丢掉对应的PUBREC数据包;
上面是一次完整无误的传输过程,然而传输过程中可能会出现以下情况:
针对上述的问题,较为详细的处理方法如下:
Receiver收到PUBREL数据包后,正式将消息递交给上层应用层,投递之后销毁Packet Identifier P,并发送PUBCOMP数据包,销毁之前的持久化消息。
之后不管接收到多少个PUBREL数据包,因为没有Packet Identifier P,直接回复PUBCOMP数据包即可。
在 MQTT 协议中,从 Broker 到 Subscriber 这段消息传递的实际 QoS 等于:Publisher 发布消息时指定的 QoS 等级和 Subscriber 在订阅时与 Broker 协商的 QoS 等级,这两个 QoS 等级中的最小那一个。
Actual Subscribe QoS = MIN(Publish QoS, Subscribe QoS)
如果 Client 想接收离线消息,必须使用持久化的会话(Clean Session = 0)连接到 Broker,这样 Broker 才会存储 Client 在离线期间没有确认接收的 QoS 大于 等于1 的消息。
在以下情况下你可以选择 QoS0:
在以下情况下你应该选择 QoS1:
在以下情况下你应该选择 QoS2:
最近遇到一个MQTT上传包体达到了5M,且上传速度较快,处理完毕后内存也没有下降的趋势,只有当服务器内存不够用的时候才会回收一小部分内存,导致内存飞涨,同一服务器上的其他服务根本无法提供稳定的服务。
这种情况基本可以确定为byte[]太大,直接分配到堆上面了,而堆上的大对象要2代GC才能够回收,但是二代GC又懒又慢,啥时候回收这部分内存完全看心情。
雪崩的时候没有一片雪花是无辜的
1. 解决数组租用的问题
既然心里已经大概有了解决的方案那这事情就很明朗了
一个固定数组的租用使用 ArrayPoolT 这个对象去进行数组的租用,但是这个玩意对于使用者有点不太友好,申请的长度并不是实际需要的长度,这对于一些需要使用固定长度进行计算的地方就很蛋疼了。所以需要对这个进行一个封装,将实际需要的长度存储起来作为这个数组的长度,超过的全部作废掉。
那这个类的基本设计就应该是下面这样的,有一个 Length 去决定从 ArrayPoolT 租出来的数组到底有多少是可用长度。
2.要对数组进行指针操作.NET给我们提供了 Span 的API,但是这个没办法在Class中使用,所以只能使用它的同胞兄弟 MemoryT ,这些玩意在网上到处都是介绍的,我在这里就不介绍了。
3.如果要减少流拷贝就需要将这个类加上一个 Clone 的函数,直接让其他人调用Clone函数,就不会出现流拷贝太多导致多次分配堆的问题。
那么综上所需,出来的封装类应该就是这样的
再针对这个搞点拓展函数
搞定 完事
原地址:
原文链接:
MQTT客户端软件MQTT.fx的使用详解
使用说明
mqtt.fx打开后的主页面如下:
点击齿轮进行连接设置
本地连接设置:
用户信息设置:
SSL安全证书设置:
网络代理设置:
遗嘱设置:
连接测试
1、启动mosquitto
地址,下一步配置使用
2、在主机中打开MQTT.FX软件
设置连接信息
IP为mosquitto所在的IP,端口号默认为1883。
点击进行连接
连接成功以后可以进行发布订阅。