第⼀部分:延迟消息的实现原理和知识点
使⽤RabbitMQ来实现延迟任务必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现上述需求。
消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每⼀个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取⼩的。所以⼀个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不⼀样(不同的队列设置)。这⾥单讲单个消息的TTL,因为它才是实现延迟任务的关键。
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是⼀样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:
当上⾯的消息扔到队列中后,过了3分钟,如果没有被消费,它就死了。不会被消费者消费到。这个消息后⾯的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去。单靠死信还不能实现延迟任务,还要靠Dead Letter Exchange。
Dead Letter Exchanges
Exchage的概念在这⾥就不在赘述。⼀个消息在满⾜如下条件下,会进死信路由,记住这⾥是路由⽽不是队列,⼀个路由可以对应很多队列。1. ⼀个消息被Consumer拒收了,并且reject⽅法的参数⾥requeue是false。也就是说不会被再次放在队列⾥,被其他消费者使⽤。2. 上⾯的消息的TTL到了,消息过期了。
3. 队列的长度满了。排在前⾯的消息会被丢弃或者扔到死信路由上。
Dead Letter Exchange其实就是⼀种普通的exchange,和创建其他exchange没有两样。只是在某⼀个设置Dead Letter Exchange的队列中有消息过期了,会⾃动触发消息的转发,发送到Dead Letter Exchange中去。
实现延迟队列
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建⽴2个队列,⼀个⽤于发送消息,⼀个⽤于消息过期后的转发⽬标队列。
⽣产者输出消息到Queue1,并且这个消息是设置有有效时间的,⽐如3分钟。消息会在Queue1中等待3分钟,如果没有消费者收掉的话,它就是被转发到Queue2,Queue2有消费者,收到,处理延迟任务。完成延迟任务的实现。
第⼆部分:具体实现例⼦
1、新建⽴消息队列配置⽂件rabbitmq.properties
1 #rabbitmq消息队列的属性配置⽂件properties 2 rabbitmq.study.host=192.168.56.101 3 rabbitmq.study.username=duanml
4 rabbitmq.study.password=1qaz@WSX 5 rabbitmq.study.port=5672
6 rabbitmq.study.vhost=studymq 7
8 #Mail 消息队列的相关变量值 9 mail.exchange=mailExchange
10 mail.exchange.key=mail_queue_key11 12
13 #Phone 消息队列的相关变量值14 phone.topic.key=phone.one
15 phone.topic.key.more=phone.one.more16
17 #delay 延迟消息队列的相关变量值
18 delay.directQueue.key=TradePayNotify_delay_2m19 delay.directMessage.key=TradePayNotify_delay_3m
2、新建⽴配置⽂件,申明延迟队列相关的配置信息如:spring-rabbigmq-dlx.xml
1
2  7     14 15     16      17      19      20      22              24      26      27      29      31      33     34      38     39 40      41      47      48      52      58      61              63              67      73      76              77              81      82      84              87       
3、新建⽴延迟队列测试Controller
1 package org.seckill.web; 2
3 import org.seckill.dto.SeckillResult; 4 import org.seckill.entity.Seckill;
5 import org.seckill.utils.rabbitmq.Impl.MQProducerImpl; 6 import org.seckill.utils.rabbitmq.MQProducer; 7 import org.slf4j.Logger;
8 import org.slf4j.LoggerFactory;
9 import org.springframework.amqp.core.Message;
10 import org.springframework.beans.factory.annotation.Autowired; 11 import org.springframework.beans.factory.annotation.Value; 12 import org.springframework.stereotype.Controller;
13 import org.springframework.web.bind.annotation.RequestMapping; 14 import org.springframework.web.bind.annotation.ResponseBody; 15
16 import java.util.Date; 17 18 /**
19 *
Title: org.seckill.web
20 *
Company:东软集团(neusoft)
21 *Copyright:Copyright(c)2018
22 * User: 段美林23 * Date: 2018/5/30 17:33 24 * Description: 消息队列测试 25 */
26 @Controller
27 @RequestMapping(\"/rabbitmq\") 28 public class RabbitmqController { 29
30 private final Logger logger = LoggerFactory.getLogger(this.getClass()); 31 40
41 @Value(\"${delay.directQueue.key}\") 42 private String delay_directQueue_key; 43
44 @Value(\"${delay.directMessage.key}\") 45 private String delay_directMessage_key; 46 52
53 @Autowired
54 private MQProducerImpl delayMQProducerImpl;111 112 /**
113 * @Description: 消息队列114 * @Author:
115 * @CreateTime:116 */
117 @ResponseBody
118 @RequestMapping(\"/sendDelayQueue\")
119     public SeckillResult 123             Seckill seckill = new Seckill(); 124        //第⼀种情况,给队列设置消息ttl,详情见配置⽂件125             for (int i = 0; i < 2; i++) { 126                 seckill.setSeckillId(1922339387 + i);127                 seckill.setName(\"delay_queue_ttl_\" + i); 128                 String msgId = delayMQProducerImpl.getMsgId(); 129                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId); 130                 delayMQProducerImpl.sendDataToRabbitMQ(delay_directQueue_key, message);131             } 132         //第⼆种情况,给消息设置ttl133             for (int i = 0; i < 2; i++) { 134                 seckill.setSeckillId(1922339287 + i); 135                 seckill.setName(\"delay_message_ttl_\" + i); 136                 String msgId = delayMQProducerImpl.getMsgId(); 137                 Message message = delayMQProducerImpl.messageBuil(seckill,msgId);138                 if (message != null) { 139                     //给消息设置过期时间ttl,为3分钟 140                     message.getMessageProperties().setExpiration(\"180000\"); 141                     delayMQProducerImpl.sendDataToRabbitMQ(delay_directMessage_key, message);142                 }143             } 144             result = new SeckillResult 146             logger.error(e.getMessage(), e);147         } 148         return result;149     } 150 151 } 4、编写延迟消息确认类和监听类: NotifyConfirmCallBackListener.java 1 package org.seckill.rabbitmqListener.notify; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; 6 import org.springframework.amqp.rabbit.support.CorrelationData; 7  8 /** 9  * Title: org.seckill.rabbitmqListener.notify Company:东软集团(neusoft) Copyright:Copyright(c)2018 13  * Date: 2018/6/3 0:27 14  * Description: 延迟任务测试--->消息确认回调类15  */ 16 public class NotifyConfirmCallBackListener implements ConfirmCallback {17 18     private final Logger logger = LoggerFactory.getLogger(this.getClass());19 20     /** 21      * Confirmation callback.22      * 23      * @param correlationData correlation data for the callback.24      * @param ack             true for ack, false for nack 25      * @param cause           An optional cause, for nack, when available, otherwise null.26      */ 27     public void confirm(CorrelationData correlationData, boolean ack, String cause) { 28         logger.info(\"延迟测试---确认消息完成-------->confirm--:correlationData:\" + correlationData.getId() + \29     }30 } NotifyConsumerListener.java 1 package org.seckill.rabbitmqListener.notify; 2 3 import com.alibaba.fastjson.JSONObject; 4 import com.rabbitmq.client.Channel; 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.core.Message; 8 import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; 9 10 /** 11  * Title: org.seckill.rabbitmqListener.notify Company:东软集团(neusoft) Copyright:Copyright(c)2018 15  * Date: 2018/6/3 0:27 16  * Description: 订单通知队列监听服务17  * 实现延迟任务的功能18  */ 19 public class NotifyConsumerListener implements ChannelAwareMessageListener {20 21 22     private final Logger logger = LoggerFactory.getLogger(this.getClass());23 24     /** 25      * Callback for processing a received Rabbit message. 26      * Implementors are supposed to process the given Message,27      * typically sending reply messages through the given Session.28      * 29      * @param message the received AMQP message (never  33     public void onMessage(Message message, Channel channel) throws Exception {34         try { 35             //将字节流对象转换成Java对象 36 //            Seckill seckill=(Seckill) new ObjectInputStream(new ByteArrayInputStream(message.getBody())).readObject();37 38             String returnStr = new String(message.getBody(),\"UTF-8\");39             JSONObject jsStr = JSONObject.parseObject(returnStr);40 41             logger.info(\"延迟测试--消费开始:名称为--===>\" + jsStr.getString(\"name\") + \"----->返回消息:\" + returnStr + \"||||消息的Properties:--》\" + message.getMessageProperties());42 43             //TODO 进⾏相关业务操作44 45             //成功处理业务,那么返回消息确认机制,这个消息成功处理OK 46             channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);47 48         } catch (Exception e) { 49             if (message.getMessageProperties().getRedelivered()) { 50                 //消息已经进⾏过⼀次轮询操作,还是失败,将拒绝再次接收本消息51                 logger.info(\"消息已重复处理失败,拒绝再次接收...\"); 52                 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息53 54                 //TODO 进⾏相关业务操作55 56             } else { 57                 //消息第⼀次接收处理失败后,将再此回到队列中进⾏  再⼀次轮询操作58                 logger.info(\"消息即将再次返回队列处理...\"); 59                 //处理失败,那么返回消息确认机制,这个消息没有成功处理,返回到队列中 60                 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);61             }62         }63     } } NotifyFailedCallBackListener.java 1 package org.seckill.rabbitmqListener.notify; 2 3 import org.slf4j.Logger; 4 import org.slf4j.LoggerFactory; 5 import org.springframework.amqp.core.Message; 6 import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; 7  8 /** 9  * Title: org.seckill.rabbitmqListener.notify Company:东软集团(neusoft) Copyright:Copyright(c)2018 13  * Date: 2018/6/3 0:28 14  * Description: 延迟任务测试----> 消息发送失败回调类15  */ 16 public class NotifyFailedCallBackListener implements ReturnCallback {17 18     private final Logger logger = LoggerFactory.getLogger(this.getClass());19 20     /** 21      * Returned message callback.22      * 23      * @param message    the returned message.24      * @param replyCode  the reply code.25      * @param replyText  the reply text.26      * @param exchange   the exchange.27      * @param routingKey the routing key.28      */ 29     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {30         logger.info(\"延迟测试------------->return--message:\" +31                 new String(message.getBody()) + 32                 \33                 \34     }35 } 5、编写消息队列的操作类和接⼝: MQProducer.java 1 package org.seckill.utils.rabbitmq;  2 3 import org.springframework.amqp.core.Message; 4 import org.springframework.amqp.core.MessagePostProcessor;  5 import org.springframework.amqp.rabbit.support.CorrelationData;  6   7 /** 8  * Title: org.seckill.utils.rabbitmq Company:东软集团(neusoft) Copyright:Copyright(c)2018 12  * Date: 2018/5/30 11:49 13  * Description: No Description 14  */ 15 public interface MQProducer { 16 17     /** 18      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 19      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 20      * @param message 21      */ 22     void sendDataToRabbitMQ(java.lang.Object message); 23 24     /** 25      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 26      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 27      * @param message 28      * @param messagePostProcessor 29      */ 30     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor); 31 32     /** 33      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 34      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 35      * @param message 36      * @param messagePostProcessor 37      * @param correlationData 38      */ 39     void sendDataToRabbitMQ(java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 40 41     /** 42      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 43      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 44      * @param routingKey 45      * @param message 46      */ 47     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message); 48 49     /** 50      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 51      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 52      * @param routingKey 53      * @param message 54      * @param correlationData 55      */ 56     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 57 58     /** 59      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 60      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 61      * @param routingKey 62      * @param message 63      * @param messagePostProcessor       */ 65     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor); 66 67     /** 68      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key. 69      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 70      * @param routingKey 71      * @param message 72      * @param messagePostProcessor 73      * @param correlationData 74      */ 75     void sendDataToRabbitMQ(java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData); 76 77     /** 78      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 79      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 80      * @param exchange 81      * @param routingKey 82      * @param message 83      */ 84     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message); 85 86     /** 87      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 88      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。       * @param exchange 90      * @param routingKey 91      * @param message 92      * @param correlationData 93      */ 94     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, CorrelationData correlationData); 95 96     /** 97      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 98      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 99      * @param exchange100      * @param routingKey 101      * @param message 102      * @param messagePostProcessor103      */ 104     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor);105 106     /** 107      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.108      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。109      * @param exchange110      * @param routingKey111      * @param message 112      * @param messagePostProcessor113      * @param correlationData114      */ 115     void sendDataToRabbitMQ(java.lang.String exchange, java.lang.String routingKey, java.lang.Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData);116 117     Message messageBuil(Object handleObject, String msgId);118 119     String getMsgId();120 } MQProducerImpl.java 1 package org.seckill.utils.rabbitmq.Impl;  2 3 import com.alibaba.fastjson.JSONObject;  4 import org.seckill.utils.rabbitmq.MQProducer;  5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 import org.springframework.amqp.AmqpException;  8 import org.springframework.amqp.core.Message; 9 import org.springframework.amqp.core.MessageBuilder; 10 import org.springframework.amqp.core.MessagePostProcessor; 11 import org.springframework.amqp.core.MessageProperties; 12 import org.springframework.amqp.rabbit.core.RabbitTemplate; 13 import org.springframework.amqp.rabbit.support.CorrelationData; 14 import org.springframework.stereotype.Component; 15 16 import java.io.UnsupportedEncodingException; 17 import java.util.UUID; 18  19 /** 20  * Title: org.seckill.utils.rabbitmq.Impl Company:东软集团(neusoft) Copyright:Copyright(c)2018 24  * Date: 2018/6/2 22:54 25  * Description: 消息⽣产者操作主体类 26  */ 27 @Component 28 public class MQProducerImpl implements MQProducer{ 29 30     private static final Logger logger = LoggerFactory.getLogger(MQProducerImpl.class); 31 32     private RabbitTemplate rabbitTemplate; 33 34     /** 35      * Sets the rabbitTemplate. 36      * 37      * You can use getRabbitTemplate() to get the value of rabbitTemplate 39      * @param rabbitTemplate rabbitTemplate 40      */ 41     public void setRabbitTemplate(RabbitTemplate rabbitTemplate) { 42         this.rabbitTemplate = rabbitTemplate; 43     } 44 45     /** 46      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 47      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 48      * 49      * @param message 50      */ 51     public void sendDataToRabbitMQ(Object message) { 52         try { 53             if (message instanceof Message){ 54                 Message messageSend = (Message) message; 55                 String msgId = messageSend.getMessageProperties().getCorrelationId(); 56                 CorrelationData correlationData = new CorrelationData(msgId); 57                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,correlationData); 58             }else { 59                 rabbitTemplate.convertAndSend(message); 60             } 61         } catch (AmqpException e) { 62             logger.error(e.getMessage(), e); 63         }      } 65 66     /** 67      * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 68      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 69      * 70      * @param message 71      * @param messagePostProcessor 72      */ 73     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor) { 74         try { 75             if (message instanceof Message){ 76                 Message messageSend = (Message) message; 77                 String msgId = messageSend.getMessageProperties().getCorrelationId(); 78                 CorrelationData correlationData = new CorrelationData(msgId); 79                 rabbitTemplate.convertAndSend(rabbitTemplate.getRoutingKey(),message,messagePostProcessor,correlationData); 80             }else { 81                 rabbitTemplate.convertAndSend(message, messagePostProcessor); 82             } 83         } catch (AmqpException e) { 84             logger.error(e.getMessage(), e); 85         } 86     } 87 88     /**       * Convert a Java object to an Amqp Message and send it to a default exchange with a default routing key. 90      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。 91      * 92      * @param message 93      * @param messagePostProcessor 94      * @param correlationData 95      */ 96     public void sendDataToRabbitMQ(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) { 97         try { 98             rabbitTemplate.convertAndSend(message, messagePostProcessor, correlationData); 99         } catch (AmqpException e) { 100             logger.error(e.getMessage(), e); 101         }102     }103 104     /** 105      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.106      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。107      * 108      * @param routingKey109      * @param message110      */ 111     public void sendDataToRabbitMQ(String routingKey, Object message) {112         try { 113             if (message instanceof Message){ 114                 Message messageSend = (Message) message; 115                 String msgId = messageSend.getMessageProperties().getCorrelationId();116                 CorrelationData correlationData = new CorrelationData(msgId); 117                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);118             }else { 119                 rabbitTemplate.convertAndSend(routingKey, message);120             } 121         } catch (AmqpException e) { 122             logger.error(e.getMessage(), e);123         }124     }125 126     /** 127      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.128      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。129      * 130      * @param routingKey131      * @param message 132      * @param correlationData133      */ 134     public void sendDataToRabbitMQ(String routingKey, Object message, CorrelationData correlationData) {135         try { 136             rabbitTemplate.convertAndSend(routingKey, message, correlationData);137         } catch (AmqpException e) { 138             logger.error(e.getMessage(), e);139         }140     }141 142     /** 143      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.144      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。145      * 146      * @param routingKey147      * @param message 148      * @param messagePostProcessor149      */ 150     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor) {151         try { 152             if (message instanceof Message){ 153                 Message messageSend = (Message) message; 154                 String msgId = messageSend.getMessageProperties().getCorrelationId();155                 CorrelationData correlationData = new CorrelationData(msgId); 156                 rabbitTemplate.convertAndSend(routingKey,message,messagePostProcessor,correlationData);157             }else { 158                 rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor);159             } 160         } catch (AmqpException e) { 161             logger.error(e.getMessage(), e);162         }163     }1 165     /** 166      * Convert a Java object to an Amqp Message and send it to a default exchange with a specific routing key.167      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。168      * 169      * @param routingKey170      * @param message 171      * @param messagePostProcessor172      * @param correlationData173      */ 174     public void sendDataToRabbitMQ(String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {175         try { 176             rabbitTemplate.convertAndSend(routingKey, message, messagePostProcessor, correlationData);177         } catch (AmqpException e) { 178             logger.error(e.getMessage(), e);179         }180     }181 182     /** 183      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.184      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。185      * 186      * @param exchange187      * @param routingKey188      * @param message1      */ 190     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message) {191         try { 192             if (message instanceof Message){ 193                 Message messageSend = (Message) message; 194                 String msgId = messageSend.getMessageProperties().getCorrelationId();195                 CorrelationData correlationData = new CorrelationData(msgId); 196                 rabbitTemplate.convertAndSend(routingKey,message,correlationData);197             }else { 198                 rabbitTemplate.convertAndSend(exchange, routingKey, message);199             } 200         } catch (AmqpException e) { 201             logger.error(e.getMessage(), e);202         }203     }204 205     /** 206      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.207      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。208      * 209      * @param exchange210      * @param routingKey211      * @param message 212      * @param correlationData213      */ 214     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, CorrelationData correlationData) {215         try { 216             rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);217         } catch (AmqpException e) { 218             logger.error(e.getMessage(), e);219         }220     }221 222     /** 223      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key. 224      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。225      * 226      * @param exchange227      * @param routingKey228      * @param message 229      * @param messagePostProcessor230      */ 231     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) {232         try { 233             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor);234         } catch (AmqpException e) { 235             logger.error(e.getMessage(), e);236         }237     }238 239     /** 240      * Convert a Java object to an Amqp Message and send it to a specific exchange with a specific routing key.241      * 由于配置了JSON转换,这⾥是将Java对象转换成JSON字符串的形式。242      * 243      * @param exchange244      * @param routingKey245      * @param message 246      * @param messagePostProcessor247      * @param correlationData248      */ 249     public void sendDataToRabbitMQ(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData) {250         try { 251             rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor, correlationData);252         } catch (AmqpException e) { 253             logger.error(e.getMessage(), e);254         }255     }256 257     /** 258      * 构建Message对象,进⾏消息发送259      * @param handleObject260      * @param msgId261      * @return262      */ 263     public Message messageBuil(Object handleObject, String msgId) {2         try { 265             //先转成JSON 266             String objectJSON = JSONObject.toJSONString(handleObject);267             //再构建Message对象 268             Message messageBuil = MessageBuilder.withBody(objectJSON.getBytes(\"UTF-8\")).setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)269                     .setCorrelationId(msgId).build();270             return messageBuil; 271         } catch (UnsupportedEncodingException e) { 272             logger.error(\"构建Message出错:\" + e.getMessage(),e);273             return null;274         }275     }276 277     /** 278      * ⽣成唯⼀的消息操作id279      * @return280      */ 281     public String getMsgId() { 282         return UUID.randomUUID().toString();283     }284 285 } ⾄此就完成了延迟消息队列的所有代码实现,              null)30      * @param channel the underlying Rabbit Channel (never null)31      * @throws Exception Any.32      */
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- 91gzw.com 版权所有 湘ICP备2023023988号-2
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务