重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
一、消息确认机制
成都创新互联公司成立于2013年,先为彰武等服务建站,彰武等地企业,进行企业商务咨询服务。为彰武企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。
rabbitmq在发送消息后立即从内存中删除消息,因此如果消费者处理消息耗时较长,在处理过程中消费者被kill,则处理中的消息、以及其他发往该消费者的消息都将丢失。
为了保证消息不丢失,rabbitmq支持消息确认机制,消费者可以发送ack告诉rabbitm指定消息已经收到并处理,因此rabbitmq可以删除该消息。
如果消费者死掉(channel关闭、connection关闭、或者TCP connection丢失),导致rabbitmq没有收到ack,rabbitmq将把消息重入队列。
不存在消息超时,这意味着处理一个消息非常长的时间也是ok的。
消息确认机制默认是开启的,通过在channel.basic_consume中设置no_ack=True关闭。
注意消费者在处理消息后,不要忘记调用channel.basic_ack进行消息确认,否则rabbitmq将不断消耗内存把消息重入队列。
二、队列/消息持久化
为了防止rabbitmq服务终止导致队列和消息丢失,需要将队列和消息标记为持久化的:
确保rabbitmq永远不丢失队列,需要将队列 声明为持久化的:
channel.queue_declare(queue='task_queue', durable=True)
将消息声明为持久化的:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
注意:尽管已经很健壮了,但是仍然无法完全保证消息不会丢失,例如rabbitmq接收消息但是还没有保存到硬盘的情况。
三、exchange
简单的说,exchange的一端接收消息,另一端把消息放进队列。
在rabbitmq中生产者不会将请求直接发送给消费者,生产值只会把消息发给exchange,exchange收到消息后需要知道怎么做:添加到特定队列、添加到多个队列、还是丢弃。
exchange的类型包括direct,topic,headers,fanout
四、绑定
exchange和queue之间的联系被称为绑定(binding),可以简单的看:队列对于特定exchange上的消息感兴趣
channel.queue_bind(exchange='logs', queue=result.method.queue)
此时'logs' exchange将添加消息到指定queue
绑定可以使用一个额外的routing_key参数,例如:
channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
对于fanout类型的exchange来说,routing_key参数是被忽略的
五、topic exchange
发往topic exchange的消息不能携带任意的routing_key,必须是以点隔开的一串字符,最大255个字节
binding key也必须是相同的形式,注意binding key有两个重要的特殊情况:
* 可以替代一个单词
#可以替代零个或多个单词
例如,如果binding key是*.orange.*,则可以匹配所有
如果binding key是lazy.#,则类似于带有lazy.orange.male.rabbit的key的消息可以匹配。
topic exchange非常强大,通过匹配routing_key可以表现的像存在多个exchange
六、RPC
为了接收响应,客户端需要在发送请求时附加发送回调队列地址:
result = channel.queue_declare(exclusive=True) callback_queue = result.method.queue channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = callback_queue, ), body=request) # ... and some code to read a response message from the callback_queue ...