重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
1) C++ 获取消息数据
amqp_rpc_reply_t ret;
timeval tvTimeout;
tvTimeout.tv_sec = 1;
tvTimeout.tv_usec = 0;
ret = amqp_consume_message(conn, &envelope, &valTimeOut, 0);
成都创新互联公司是一家专注网站建设、网络营销策划、微信小程序开发、电子商务建设、网络推广、移动互联开发、研究、服务为一体的技术型公司。公司成立10年以来,已经为近千家生料搅拌车各业的企业公司提供互联网服务。现在,服务的近千家客户与我们一路同行,见证我们的成长;未来,我们一起分享成功的喜悦。
if (AMQP_RESPONSE_NORMAL == ret.reply_type)
{
std::string strAMQPMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
}
误区: std::string strAMQPMsg = char*)envelope.message.body.bytes 存在多余的数据
误区: 没有设置接收超时,而是直接传递NULL,导致函数进入死循环
2)发送消息的时候,返回错误信息:AMQP_STATUS_SOCKET_ERROR
AMQP_STATUS_SOCKET_ERROR = -0x0009, /**< A socket error occurred */
服务器在一定时间之内,收到客户端的消息,就会主动断开连接,因此客户端需要跟服务器Broker重新建立连接,如果不想断开连接,需要发送心跳
3)确认数据是否已经发送成功
关于消费者就不用代码来获取消息了,直接在RabbitMQ Management点击某个队列的名字,然后Get Message(s) 即可获取消息内容
4)指定消息的超时时间
某些实际的应用场景中会产生许多过期的消息时间,可以通过设置amqp_basic_properties_t的超时时间参数expiration来解决队列中的超时数据过多的问题
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_EXPIRATION_FLAG;
props.expiration = amqp_cstring_bytes("10000");//超时10秒
amqp_basic_publish(conn, channnel, exchange, queue, 0, 0, &props, message);
5)声明队列,返回错误信息:AMQP_RESPONSE_SERVER_EXCEPTION
原因:1.交换机是否创建成功 2.声明的队列是否已经创建过,并且已经存在的队列跟现在的队列的属性不一致,例如auto_delete自动删除属性,或者durable持久化属性
导致的问题:当返回该错误,说明跟broker的连接已经中断,必须重新建立连接,否则,继续调用其他函数接口会一直阻塞
解决: 通过web手动删除队列
6)只知道队列的情况下获取数据
实际上说明声明的交换机和队列都必须唯一
amqp_connection_state_t connState = amqp_new_connection();
amqp_socket_t *pSocket = amqp_tcp_socket_new(connState);
if (!pSocket) {
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
std::cout << "跟消息服务器创建连接失败" << std::endl;
return;
}
int nConnStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
if (AMQP_STATUS_OK != nConnStatus) {
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
return;
}
amqp_rpc_reply_t rpcReply = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
if (AMQP_RESPONSE_NORMAL != rpcReply.reply_type)
{
std::cout << "登陆消息服务器失败" << std::endl;
return;
}
amqp_channel_open(connState, 1);
amqp_basic_consume(connState, 1, amqp_cstring_bytes("passerby-000001"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_frame_t frame;
std::cout << "登陆消息服务器成功,开始接收数据" << std::endl;
while (1)
{
amqp_envelope_t envelope;
amqp_maybe_release_buffers(connState);
timeval tvTimeout;
tvTimeout.tv_sec = 10;
tvTimeout.tv_usec = 0;
amqp_rpc_reply_t ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type)
{
if (AMQP_STATUS_SOCKET_ERROR == ret.library_error)
{
std::cout << "跟消息服务器连接中断,清理资源,重连连接" << std::endl;
break;
}
if (AMQP_STATUS_TIMEOUT == ret.library_error)
{
std::cout << "等待消息服务器消息超时,继续等待" << std::endl;
continue;
}
std::cout << "跟消息服务器连接出现异常,清理资源,重连连接" << std::endl;
break;
}
else
{
std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
std::cout << "接收到的抓拍信息:" << strRecvMsg<< std::endl;
amqp_destroy_envelope(&envelope);
continue;
}
}
amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
7)生产者不产生队列,消费者通过指定的交换机和routing-key,创建队列,然后将该队列绑定到交换机上
char const* pszExchange = "passerByExchange";
char const* pszRoutingKey = "passerby-000001";
amqp_connection_state_t connState = amqp_new_connection();
amqp_socket_t* pSocket = amqp_tcp_socket_new(connState);
if (!pSocket) {
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
return;
}
int nStatus = amqp_socket_open(pSocket, strIP.c_str(), nPort);
if (nStatus) {
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
return;
}
amqp_rpc_reply_t replyLogin = amqp_login(connState, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
if (AMQP_RESPONSE_NORMAL != replyLogin.reply_type)
{
std::cout << "登陆消息服务器失败" << std::endl;
return;
}
amqp_channel_open(connState, 1);
amqp_queue_declare_ok_t *r = amqp_queue_declare(
connState, 1, amqp_empty_bytes, 0, 0, 0, 1, amqp_empty_table);
amqp_bytes_t queueName = amqp_bytes_malloc_dup(r->queue);
if (queueName.bytes == NULL)
{
amqp_bytes_free(queueName);
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
return;
}
amqp_queue_bind(connState, 1, queueName, amqp_cstring_bytes(pszExchange),
amqp_cstring_bytes(pszRoutingKey), amqp_empty_table);
amqp_basic_consume(connState, 1, queueName, amqp_empty_bytes, 0, 1, 0,
amqp_empty_table);
amqp_frame_t frame;
while (1)
{
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
amqp_maybe_release_buffers(connState);
timeval tvTimeout;
tvTimeout.tv_sec = 10;
tvTimeout.tv_usec = 0;
ret = amqp_consume_message(connState, &envelope, &tvTimeout, 0);
if (AMQP_RESPONSE_NORMAL != ret.reply_type)
{
if (AMQP_STATUS_TIMEOUT == ret.library_error)
{
std::cout << "接收消息超时" << std::endl;
continue;
}
std::cout << "连接消息服务器异常,清理资源退出" << std::endl;
break;
}
else
{
std::string strRecvMsg((char*)envelope.message.body.bytes, envelope.message.body.len);
std::string strGBK = UTF8ToGBK(strRecvMsg.c_str());
amqp_destroy_envelope(&envelope);
}
}
amqp_channel_close(connState, 1, AMQP_REPLY_SUCCESS);
amqp_connection_close(connState, AMQP_REPLY_SUCCESS);
amqp_destroy_connection(connState);
8)amqp_basic_consume函数不能连续调用多次同时消费多个队列
代码如下:
amqp_basic_consume(connState, 1, amqp_cstring_bytes("alarm"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
amqp_basic_consume(connState, 1, amqp_cstring_bytes("capture"), amqp_empty_bytes, 0, 1, 0, amqp_empty_table);
只能执行第一句代码,第二句代码会一直阻塞
9)登陆MQ服务器,进行心跳交互代码
amqp_rpc_reply_t replyLogin = amqp_login(conn, "/", 0, 131072, 120, AMQP_SASL_METHOD_PLAIN, strUserName.c_str(), strPassword.c_str());
第五个参数,指定了跟服务器多少秒发送一次心跳,如果不发心跳,跟服务器在连接一段时间之后,断开,当然,也要考虑到长连接也可能在网络异常情况下断开