重庆分公司,新征程启航

为企业提供网站建设、域名注册、服务器等服务

Java搭建RabbitMq消息中间件过程是怎么样的

这期内容当中小编将会给大家带来有关Java搭建RabbitMq消息中间件过程是怎么样的,文章内容丰富且以专业的角度为大家分析和叙述,阅读完这篇文章希望大家可以有所收获。

网站建设哪家好,找创新互联!专注于网页设计、网站建设、微信开发、小程序制作、集团企业网站建设等服务项目。为回馈新老客户创新互联还提供了咸丰免费建站欢迎大家使用!

前言

当系统中出现“生产“和“消费“的速度或稳定性等因素不一致的时候,就需要消息队列。

名词

exchange: 交换机  routingkey: 路由key  queue:队列

控制台端口:15672

exchange和queue是需要绑定在一起的,然后消息发送到exchange再由exchange通过routingkey发送到对应的队列中。

使用场景

1.技能订单3分钟自动取消,改变状态

2.直播开始前15分钟提醒

3.直播状态自动结束

流程

生产者发送消息 —> order_pre_exchange交换机 —> order_per_ttl_delay_queue队列

—> 时间到期 —> order_delay_exchange交换机 —> order_delay_process_queue队列 —> 消费者

第一步:在pom文件中添加

  org.springframework.boot   spring-boot-starter-amqp

第二步:在application.properties文件中添加

spring.rabbitmq.host=172.xx.xx.xxxspring.rabbitmq.port=5672spring.rabbitmq.username=rabbitspring.rabbitmq.password=123456spring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000spring.rabbitmq.publisher-confirms=truespring.rabbitmq.publisher-returns=truespring.rabbitmq.template.mandatory=true

第三步:配置 OrderQueueConfig

package com.tuohang.platform.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;/** * rabbitMQ的队列设置(生产者发送的消息,永远是先进入exchange,再通过路由,转发到队列) *  *  * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Configurationpublic class OrderQueueConfig {  /**   * 订单缓冲交换机名称   */  public final static String ORDER_PRE_EXCHANGE_NAME = "order_pre_exchange";  /**   * 发送到该队列的message会在一段时间后过期进入到order_delay_process_queue 【队列里所有的message都有统一的失效时间】   */  public final static String ORDER_PRE_TTL_DELAY_QUEUE_NAME = "order_pre_ttl_delay_queue";  /**   * 订单的交换机DLX 名字   */  final static String ORDER_DELAY_EXCHANGE_NAME = "order_delay_exchange";  /**   * 订单message时间过期后进入的队列,也就是订单实际的消费队列   */  public final static String ORDER_DELAY_PROCESS_QUEUE_NAME = "order_delay_process_queue";  /**   * 订单在缓冲队列过期时间(毫秒)30分钟   */  public final static int ORDER_QUEUE_EXPIRATION = 1800000;  /**   * 订单缓冲交换机   *    * @return   */  @Bean  public DirectExchange preOrderExange() {    return new DirectExchange(ORDER_PRE_EXCHANGE_NAME);  }  /**   * 创建order_per_ttl_delay_queue队列,订单消息经过缓冲交换机,会进入该队列   *    * @return   */  @Bean  public Queue delayQueuePerOrderTTLQueue() {    return QueueBuilder.durable(ORDER_PRE_TTL_DELAY_QUEUE_NAME)        .withArgument("x-dead-letter-exchange", ORDER_DELAY_EXCHANGE_NAME) // DLX        .withArgument("x-dead-letter-routing-key", ORDER_DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key        .withArgument("x-message-ttl", ORDER_QUEUE_EXPIRATION) // 设置订单队列的过期时间        .build();  }  /**   * 将order_pre_exchange绑定到order_pre_ttl_delay_queue队列   *   * @param delayQueuePerOrderTTLQueue   * @param preOrderExange   * @return   */  @Bean  public Binding queueOrderTTLBinding(Queue delayQueuePerOrderTTLQueue, DirectExchange preOrderExange) {    return BindingBuilder.bind(delayQueuePerOrderTTLQueue).to(preOrderExange).with(ORDER_PRE_TTL_DELAY_QUEUE_NAME);  }  /**   * 创建订单的DLX exchange   *   * @return   */  @Bean  public DirectExchange delayOrderExchange() {    return new DirectExchange(ORDER_DELAY_EXCHANGE_NAME);  }  /**   * 创建order_delay_process_queue队列,也就是订单实际消费队列   *   * @return   */  @Bean  public Queue delayProcessOrderQueue() {    return QueueBuilder.durable(ORDER_DELAY_PROCESS_QUEUE_NAME).build();  }  /**   * 将DLX绑定到实际消费队列   *   * @param delayProcessOrderQueue   * @param delayExchange   * @return   */  @Bean  public Binding dlxOrderBinding(Queue delayProcessOrderQueue, DirectExchange delayOrderExchange) {    return BindingBuilder.bind(delayProcessOrderQueue).to(delayOrderExchange).with(ORDER_DELAY_PROCESS_QUEUE_NAME);  }  /**   * 监听订单实际消费者队列order_delay_process_queue   *    * @param connectionFactory   * @param processReceiver   * @return   */  @Bean  public SimpleMessageListenerContainer orderProcessContainer(ConnectionFactory connectionFactory,      OrderProcessReceiver processReceiver) {    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();    container.setConnectionFactory(connectionFactory);    container.setQueueNames(ORDER_DELAY_PROCESS_QUEUE_NAME); // 监听order_delay_process_queue    container.setMessageListener(new MessageListenerAdapter(processReceiver));    return container;  }}

消费者 OrderProcessReceiver :

package com.tuohang.platform.config;import java.util.Objects;import org.apache.tools.ant.types.resources.selectors.Date;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;import org.springframework.stereotype.Component;import com.rabbitmq.client.Channel;/** * 订单延迟处理消费者 *  *  * @author Administrator * @version 1.0 * @Date 2018年9月18日 */@Componentpublic class OrderProcessReceiver implements ChannelAwareMessageListener {  private static Logger logger = LoggerFactory.getLogger(OrderProcessReceiver.class);  String msg = "The failed message will auto retry after a certain delay";  @Override  public void onMessage(Message message, Channel channel) throws Exception {    try {      processMessage(message);    } catch (Exception e) {      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做      channel.basicPublish(OrderQueueConfig.ORDER_PRE_EXCHANGE_NAME, OrderQueueConfig.ORDER_PRE_TTL_DELAY_QUEUE_NAME, null,          msg.getBytes());    }  }    /**   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)   *   * @param message   * @throws Exception   */  public void processMessage(Message message) throws Exception {    String realMessage = new String(message.getBody());    logger.info("Received <" + realMessage + ">");    // 取消订单    if(!Objects.equals(realMessage, msg)) {//      SpringKit.getBean(ITestService.class).resetSexById(Long.valueOf(realMessage));      System.out.println("测试111111-----------"+new Date());      System.out.println(message);    }  }}

或者

/** * 测试 rabbit 消费者 *  *  * @author Administrator * @version 1.0 * @Date 2018年9月25日 */@Component@RabbitListener(queues = TestQueueConfig.TEST_DELAY_PROCESS_QUEUE_NAME)public class TestProcessReceiver {  private static Logger logger = LoggerFactory.getLogger(TestProcessReceiver.class);  String msg = "The failed message will auto retry after a certain delay";  @RabbitHandler  public void onMessage(Message message, Channel channel) throws Exception {    try {      processMessage(message);      //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);    } catch (Exception e) {      // 如果发生了异常,则将该消息重定向到缓冲队列,会在一定延迟之后自动重做      channel.basicPublish(TestQueueConfig.TEST_PRE_EXCHANGE_NAME, TestQueueConfig.TEST_PRE_TTL_DELAY_QUEUE_NAME, null,          msg.getBytes());    }  }    /**   * 处理订单消息,如果订单未支付,取消订单(如果当消息内容为FAIL_MESSAGE的话,则需要抛出异常)   *   * @param message   * @throws Exception   */  public void processMessage(Message message) throws Exception {    String realMessage = new String(message.getBody());    logger.info("Received < " + realMessage + " >");    // 取消订单    if(!Objects.equals(realMessage, msg)) {      System.out.println("测试111111-----------"+new Date());    }else {      System.out.println("rabbit else...");    }  }}

生产者

/**   * 测试rabbitmq   *    * @return   */  @RequestMapping(value = "/testrab")  public String testraa() {    GenericResult gr = null;    try {      String name = "test_pre_ttl_delay_queue";  long expiration = 10000;//10s 过期时间      rabbitTemplate.convertAndSend(name,String.valueOf(123456)); // 在单个消息上设置过期时间 //rabbitTemplate.convertAndSend(name,(Object)String.valueOf(123456), new ExpirationMessagePostProcessor(expiration));    } catch (ServiceException e) {      e.printStackTrace();      gr = new GenericResult(StateCode.ERROR, languageMap.get("network_error"), e.getMessage());    }        return getWrite(gr);  }

上述就是小编为大家分享的Java搭建RabbitMq消息中间件过程是怎么样的了,如果刚好有类似的疑惑,不妨参照上述分析进行理解。如果想知道更多相关知识,欢迎关注创新互联行业资讯频道。


标题名称:Java搭建RabbitMq消息中间件过程是怎么样的
当前地址:http://cqcxhl.com/article/ijsohe.html

其他资讯

在线咨询
服务热线
服务热线:028-86922220
TOP