前言
RabbitMQ已经提供了非常丰富的api,以及支持延时队列的实现(死信队列、延时插件等)。当然这也足够我们使用了;我的项目中对RabbitMQ的封装以及扩展我觉得非常的方便,接下来和大家分享一下;我个人觉得主要封装了消息的唯一Id
、延时发送消息处理参数的泛型化
、多线程异步发送消息的强大机制
;
自定义的MqHelper
@Slf4j
public class RabbitMqHelper {
private final RabbitTemplate rabbitTemplate;
private final MessagePostProcessor processor = new BasicIdMessageProcessor();
private final ThreadPoolTaskExecutor executor;
public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(15);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("mq-async-send-handler");
// 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
}
/**
* 根据exchange和routingKey发送消息
*/
public <T> void send(String exchange, String routingKey, T t) {
log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey,t);
// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
// 2.设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500);
// 3.发送消息,同时设置消息id
rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);
}
/**
* 根据exchange和routingKey发送消息,并且可以设置延迟时间
*/
public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
// 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
// 2.设置发送超时时间为500毫秒
rabbitTemplate.setReplyTimeout(500);
// 3.发送消息,同时设置消息id
rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);
}
/**
* 根据exchange和routingKey 异步发送消息,并指定一个延迟时间
*
* @param exchange 交换机
* @param routingKey 路由KEY
* @param t 数据
* @param <T> 数据类型
*/
public <T> void sendAsyn(String exchange, String routingKey, T t, Long time) {
String requestId = MDC.get(REQUEST_ID_HEADER);
CompletableFuture.runAsync(()->{
try {
MDC.put(REQUEST_ID_HEADER, requestId);
if(time != null && time > 0){
Thread.sleep( time);
}
send(exchange, routingKey, t);
}catch (Exception e){
log.error("推送消息异常,t:{},",t,e);
}
}, executor);
}
/**
* 根据exchange和routingKey 异步发送消息
*
* @param exchange 交换机
* @param routingKey 路由KEY
* @param t 数据
* @param <T> 数据类型
*/
public <T> void sendAsyn(String exchange, String routingKey, T t){
sendAsyn(exchange, routingKey, t, null);
}
}
代码解析
让我们逐步解析RabbitMqHelper
类及其各个方法的作用。该类主要负责与RabbitMQ进行消息的发送,包括同步发送、延迟发送以及异步发送。
代码解析
类定义
@Slf4j
public class RabbitMqHelper {
private final RabbitTemplate rabbitTemplate;
private final MessagePostProcessor processor = new BasicIdMessageProcessor();
private final ThreadPoolTaskExecutor executor;
public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(15);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("mq-async-send-handler");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
}
}
RabbitTemplate
:Spring提供的用于与RabbitMQ进行交互的核心类。它负责发送和接收消息。MessagePostProcessor
:processor
是一个消息处理器,这里使用了BasicIdMessageProcessor
,用于在发送消息时添加附加信息(如消息ID)。ThreadPoolTaskExecutor
:用于异步任务处理的线程池,允许在后台执行消息发送任务。
构造函数
public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
executor = new ThreadPoolTaskExecutor();
//配置核心线程数
executor.setCorePoolSize(10);
//配置最大线程数
executor.setMaxPoolSize(15);
//配置队列大小
executor.setQueueCapacity(99999);
//配置线程池中的线程的名称前缀
executor.setThreadNamePrefix("mq-async-send-handler");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//执行初始化
executor.initialize();
}
- 线程池配置:设置了线程池的核心线程数、最大线程数、队列容量及拒绝策略。
CallerRunsPolicy
策略会在线程池达到最大线程数时,将任务交给调用者线程执行。
同步发送消息
public <T> void send(String exchange, String routingKey, T t) {
log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey, t);
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.setReplyTimeout(500);
rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);
}
- 日志记录:记录发送消息的基本信息(交换机、路由键、消息内容)。
- 消息标识:生成唯一的消息ID,用于消息确认。
- 超时时间:设置发送消息的超时时间为500毫秒。
- 发送消息:使用
rabbitTemplate
发送消息,同时指定消息处理器processor
和消息ID。
延迟发送消息
public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
String id = UUID.randomUUID().toString(true);
CorrelationData correlationData = new CorrelationData(id);
rabbitTemplate.setReplyTimeout(500);
rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);
}
- 延迟处理器:
DelayedMessageProcessor
是一个自定义处理器,用于设置消息的延迟时间。 - 其他逻辑:与
send
方法类似,包括生成唯一ID、设置超时时间和发送消息。
异步发送消息(带延迟)
public <T> void sendAsyn(String exchange, String routingKey, T t, Long time) {
String requestId = MDC.get(REQUEST_ID_HEADER);
CompletableFuture.runAsync(() -> {
try {
MDC.put(REQUEST_ID_HEADER, requestId);
if (time != null && time > 0) {
Thread.sleep(time);
}
send(exchange, routingKey, t);
} catch (Exception e) {
log.error("推送消息异常,t:{},", t, e);
}
}, executor);
}
- MDC:用于日志记录的MDC(Mapped Diagnostic Context),传递请求ID。
- 延迟:如果
time
参数非空且大于0,线程会休眠指定时间以实现延迟。 - 异步执行:使用
CompletableFuture.runAsync
在后台线程池中异步执行消息发送。
异步发送消息(不带延迟)
public <T> void sendAsyn(String exchange, String routingKey, T t) {
sendAsyn(exchange, routingKey, t, null);
}
- 重载方法:调用前述的异步发送方法,但不设置延迟时间。
总结
RabbitMqHelper
类:封装了与RabbitMQ交互的逻辑,包括同步、延迟和异步发送消息。send
方法:同步发送消息,并设置消息标识和超时时间。sendDelayMessage
方法:发送延迟消息,使用DelayedMessageProcessor
来处理延迟逻辑。sendAsyn
方法:异步发送消息,可以选择是否设置延迟。
每个方法中都配置了消息ID、超时时间以及可能的延迟处理,确保消息的正确发送和处理。线程池用于管理异步任务,提高系统的处理能力。
自定义的消息处理器
BasicIdMessageProcessor
public class BasicIdMessageProcessor implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
String requestId = MDC.get(REQUEST_ID_HEADER);
if (requestId == null) {
requestId = UUID.randomUUID().toString(true);
}
// 写入RequestID标示
message.getMessageProperties().setHeader(REQUEST_ID_HEADER, requestId);
return message;
}
}
DelayedMessageProcessor
public class DelayedMessageProcessor extends BasicIdMessageProcessor {
private final long delay;
public DelayedMessageProcessor(Duration delay) {
this.delay = delay.toMillis();
}
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 1.添加消息id
super.postProcessMessage(message);
// 2.添加延迟时间
message.getMessageProperties().setHeader("x-delay", delay);
return message;
}
}
RabbitMQ的配置类
@Configuration
@ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class})
public class MqConfig implements EnvironmentAware{
private String defaultErrorRoutingKey;
private String defaultErrorQueue;
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
factory.setAfterReceivePostProcessors(message -> {
Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER);
if(header != null) {
MDC.put(REQUEST_ID_HEADER, header.toString());
}
return message;
});
return factory;
}
@Bean
public MessageConverter messageConverter(ObjectMapper mapper){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
// 2.配置自动创建消息id,用于识别不同消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
/**
* <h1>消息处理失败的重试策略</h1>
* 本地重试失败后,消息投递到专门的失败交换机和失败消息队列:error.queue
*/
@Bean
@ConditionalOnClass(MessageRecoverer.class)
@ConditionalOnMissingBean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
// 消息处理失败后,发送到错误交换机:error.direct,RoutingKey默认是error.微服务名称
return new RepublishMessageRecoverer(
rabbitTemplate, ERROR_EXCHANGE, defaultErrorRoutingKey);
}
/**
* rabbitmq发送工具
*
*/
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(RabbitTemplate.class)
public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate){
return new RabbitMqHelper(rabbitTemplate);
}
/**
* 专门接收处理失败的消息
*/
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange(ERROR_EXCHANGE);
}
@Bean
public Queue errorQueue(){
return new Queue(defaultErrorQueue, true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey);
}
@Override
public void setEnvironment(Environment environment) {
String appName = environment.getProperty("spring.application.name");
this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName;
this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName);
}
}
MqConfig
类是一个Spring配置类,用于配置RabbitMQ的相关设置和组件。下面是对该类及其方法的详细解析:
类和接口实现
@Configuration
@ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class})
public class MqConfig implements EnvironmentAware {
// ...
}
@Configuration
:指示这是一个Spring配置类。@ConditionalOnClass
:仅在类路径中存在MessageConverter
和AmqpTemplate
类时,才会加载这个配置类。implements EnvironmentAware
:该类实现了EnvironmentAware
接口,用于在配置类中访问Spring环境信息。
属性
private String defaultErrorRoutingKey;
private String defaultErrorQueue;
defaultErrorRoutingKey
:默认的错误路由键。defaultErrorQueue
:默认的错误队列名称。
Bean定义
rabbitListenerContainerFactory
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
factory.setAfterReceivePostProcessors(message -> {
Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER);
if (header != null) {
MDC.put(REQUEST_ID_HEADER, header.toString());
}
return message;
});
return factory;
}
@Bean
:声明一个Spring Bean,名称为rabbitListenerContainerFactory
。@ConditionalOnProperty
:仅当配置属性spring.rabbitmq.listener.type
的值为simple
时,才会创建这个Bean,默认情况下也会创建。SimpleRabbitListenerContainerFactory
:用于创建SimpleMessageListenerContainer
的工厂。SimpleRabbitListenerContainerFactoryConfigurer
:用于配置工厂的默认配置。ConnectionFactory
:RabbitMQ连接工厂。ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>>
:用于定制SimpleMessageListenerContainer
的自定义配置。setAfterReceivePostProcessors
:配置消息处理后处理器,处理接收到的消息,将消息头中的请求ID放到MDC
中,以便日志记录。
messageConverter
@Bean
public MessageConverter messageConverter(ObjectMapper mapper) {
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
@Bean
:声明一个Spring Bean,类型为MessageConverter
。Jackson2JsonMessageConverter
:使用Jackson库将消息内容转换为JSON格式。setCreateMessageIds(true)
:自动生成消息ID,以便在消息传递和处理过程中跟踪消息。
republishMessageRecoverer
@Bean
@ConditionalOnClass(MessageRecoverer.class)
@ConditionalOnMissingBean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(
rabbitTemplate, ERROR_EXCHANGE, defaultErrorRoutingKey);
}
@Bean
:声明一个Spring Bean,类型为MessageRecoverer
。@ConditionalOnClass
:仅当类路径中存在MessageRecoverer
时才会创建这个Bean。@ConditionalOnMissingBean
:仅当没有其他类型为MessageRecoverer
的Bean时,才会创建这个Bean。RepublishMessageRecoverer
:用于消息处理失败时,将消息重新发布到错误交换机和队列。
rabbitMqHelper
@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(RabbitTemplate.class)
public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate) {
return new RabbitMqHelper(rabbitTemplate);
}
@Bean
:声明一个Spring Bean,类型为RabbitMqHelper
。@ConditionalOnMissingBean
:仅当没有其他类型为RabbitMqHelper
的Bean时,才会创建这个Bean。@ConditionalOnClass
:仅当类路径中存在RabbitTemplate
时才会创建这个Bean。RabbitMqHelper
:封装了RabbitMQ消息发送的辅助工具类。
errorMessageExchange
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange(ERROR_EXCHANGE);
}
@Bean
:声明一个Spring Bean,类型为DirectExchange
。DirectExchange
:用于定义一个直接交换机,用于将消息路由到指定的队列。
errorQueue
@Bean
public Queue errorQueue() {
return new Queue(defaultErrorQueue, true);
}
@Bean
:声明一个Spring Bean,类型为Queue
。Queue
:用于定义一个队列,true
表示队列是持久化的,即消息会在RabbitMQ重启后依然存在。
errorBinding
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey);
}
@Bean
:声明一个Spring Bean,类型为Binding
。Binding
:将队列与交换机绑定,并使用指定的路由键进行消息路由。
环境设置
@Override
public void setEnvironment(Environment environment) {
String appName = environment.getProperty("spring.application.name");
this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName;
this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName);
}
setEnvironment
:从Environment
中获取应用名称,并根据应用名称设置默认的错误路由键和错误队列名称。
总结
MqConfig
类负责RabbitMQ的配置,包括消息转换器、消息处理失败的重试策略、发送工具和错误处理机制。它使用了Spring的条件化注解来确保在特定条件下创建Bean,并在EnvironmentAware
接口的setEnvironment
方法中根据应用名称动态设置错误队列和路由键。
评论( 0 )