重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
你用的是IBM MQ还是Apache的? 一般通过JAVA的JMS可以取得。
成都创新互联专注于信宜企业网站建设,自适应网站建设,电子商务商城网站建设。信宜网站建设公司,为信宜等地区提供建站服务。全流程定制设计,专业设计,全程项目跟踪,成都创新互联专业和态度为您提供的服务
例如IBM MQ里有个MQQueue 对象
// 获取队列实例
MQQueue queue = qMgr.accessQueue("TEST_QUEUE", openOptions);
//获取当前队列最长消息的长度
queue.getMaximumMessageLength()
//获取当前队列最长深度
queue.getMaximumMessageLength()
等等功能都是提供的,具体你下载个WebSphere MQ API 找到MQQueue一看便知。
{
//前面是准备管理器和队列
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage rcvMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;
//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。
gmo.waitInterval = 5000;
queue.get(rcvMessage, gmo);
//后面就是操作消息的部分【略】
}catch(Exception e){{
//前面是准备管理器和队列
MQQueueManager qMgr = new MQQueueManager(qManager);
int openOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_OUTPUT | MQConstants.MQOO_INQUIRE;
MQQueue queue = qMgr.accessQueue(qName, openOptions);
MQMessage rcvMessage = new MQMessage();
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.options = gmo.options + MQConstants.MQGMO_WAIT + MQConstants.MQGMO_SYNCPOINT;
//读取五秒超时,这里目的是要有个读取阻塞,和Socket编程类似。
gmo.waitInterval = 5000;
queue.get(rcvMessage, gmo);
//后面就是操作消息的部分【略】
}catch(Exception e){
之前写了一个ActiveMQ发送消息的例子。现在记录一下java接收ActiveMQ消息的代码。都是本人工作中写过的。希望给大家一点帮助。代码如下:
Java代码
package com.syxp.dns.receive;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
publicclass ReceiveMessageFromMQ {
privatestatic String url = "tcp://localhost:61616";
privatestatic String user = "";
privatestatic String password = "";
privatestatic Logger logger = Logger.getLogger(ReceiveMessageFromMQ.class);
publicvoid receiveMessage(){
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
// 创建连接
Connection connection;
try {
connection = connectionFactory.createConnection();
connection.start();
// 创建Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目标,就创建主题也可以创建队列
Destination destination = session.createQueue("integratedalarm.subject");
// 创建消息消费者
MessageConsumer consumer = session;
// 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null
Message message = consumer.receive(1000);
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
logger.info("接收的消息:"+"\n"+text);
} else {
logger.info("接收的消息:"+"\n"+message);
}
consumer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
publicstaticvoid main(String[] args) {
ReceiveMessageFromMQ receiveMessageFromMQ = new ReceiveMessageFromMQ();
receiveMessageFromMQ.receiveMessage();
}
}
上面有详细的注释,运行了上面的接收的方法之后,会打印出一条相应队列的未接收消息。在ActiveMQ的监视控制页面中,可以看到有一条消息已经被消费。
下面是RabbitMQ的消息确认机制:“为了确保消息不会丢失,RabbitMQ支持消息确认机制。客户端在接受到消息并处理完后,可以发送一个ack消息给RabbitMQ,告诉它该消息可以安全的删除了。假如客户端在发送ack之前意外死掉了,那么RabbitMQ会将消息投递到下一个consumer客户端。如果有多个consumer客户端,RabbitMQ在投递消息时是轮询的。RabbitMQ如何判断客户端死掉了?唯一根据是客户端连接是否断开。这里没有超时机制,也就是说客户端可以处理一个消息很长时间,只要没断开连接,RabbitMQ就一直等待ack消息。”我现在遇到的问题是这样的:我这边有几条线程去消息队列里取数据,但是会有异常数据导致线程挂掉,就是上边的“客户端在发送ack之前意外死掉了”,RabbitMQ会将消息投递到下一个consumer客户端,这样一条异常数据会把我的所有线程挂掉,我现在想实现这样的功能:如果有异常数据导致进程挂掉,那么我不让RabbitMQ将这条消息投递到下一个consumer客户端,而是放到另一个地方或者另外处理,请问该如何实现呢?
我用的方法是:
MQQueueManager qMgr = new MQQueueManager("BVMTEST");
System.out.println("queue manager is connected!");
int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;
/* 打开队列 */
com.ibm.mq.MQQueue queue = qMgr.accessQueue("test1", openOptions);
然后在调用queue.getCurrentDepth()的方法的时候居然报了异常:
MQJE001: 完成代码是 2,原因为 2038
如果我不在此处调用这个方法,而在后面进行
queue.put(outMsg, new MQPutMessageOptions());方法,居然可以成功放入测试信息.
给你一个有用的代码大全:
密码:exn4
ActiveMQ持久化消息的二种方式;
1、持久化为文件
这个装ActiveMQ时默认就是这种,只要设置消息为持久化就可以了。涉及到的配置和代码有:
persistenceAdapter
kahaDB directory="${activemq.base}/data/kahadb"/
/persistenceAdapter
producer.Send(request, MsgDeliveryMode.Persistent, level, TimeSpan.MinValue);
2、持久化为MySql
首先需要把MySql的驱动放到ActiveMQ的Lib目录下,我用的文件名字是:mysql-connector-java-5.0.4-bin.jar
接下来修改配置文件
persistenceAdapter
jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/
/persistenceAdapter
在配置文件中的broker节点外增加
bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"
property name="driverClassName" value="com.mysql.jdbc.Driver"/
property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/
property name="username" value="activemq"/
property name="password" value="activemq"/
property name="maxActive" value="200"/
property name="poolPreparedStatements" value="true"/
/bean
从配置中可以看出数据库的名称是activemq,需要手动在MySql中增加这个库。
然后重新启动消息队列,会发现多了3张表
1:activemq_acks
2:activemq_lock
3:activemq_msgs