小蔡学Java

项目中封装的MQ

2024-01-20 23:42 703 0 消息中间件 RabbitMQ消息中间件消息可靠性消费消息消费幂等性

前言

RabbitMQ已经提供了非常丰富的api,以及支持延时队列的实现(死信队列、延时插件等)。当然这也足够我们使用了;我的项目中对RabbitMQ的封装以及扩展我觉得非常的方便,接下来和大家分享一下;我个人觉得主要封装了消息的唯一Id延时发送消息处理参数的泛型化多线程异步发送消息的强大机制;

自定义的MqHelper

@Slf4j
public class RabbitMqHelper {

    private final RabbitTemplate rabbitTemplate;
    private final MessagePostProcessor processor = new BasicIdMessageProcessor();
    private final ThreadPoolTaskExecutor executor;

    public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(15);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("mq-async-send-handler");

        // 设置拒绝策略:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
    }

    /**
     * 根据exchange和routingKey发送消息
     */
    public <T> void send(String exchange, String routingKey, T t) {
        log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey,t);
        // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
        String id = UUID.randomUUID().toString(true);
        CorrelationData correlationData = new CorrelationData(id);
        // 2.设置发送超时时间为500毫秒
        rabbitTemplate.setReplyTimeout(500);
        // 3.发送消息,同时设置消息id
        rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);
    }

    /**
     * 根据exchange和routingKey发送消息,并且可以设置延迟时间
     */
    public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
        // 1.设置消息标示,用于消息确认,消息发送失败直接抛出异常,交给调用者处理
        String id = UUID.randomUUID().toString(true);
        CorrelationData correlationData = new CorrelationData(id);
        // 2.设置发送超时时间为500毫秒
        rabbitTemplate.setReplyTimeout(500);
        // 3.发送消息,同时设置消息id
        rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);
    }


    /**
     * 根据exchange和routingKey 异步发送消息,并指定一个延迟时间
     *
     * @param exchange 交换机
     * @param routingKey 路由KEY
     * @param t 数据
     * @param <T> 数据类型
     */
    public <T> void sendAsyn(String exchange, String routingKey, T t, Long time) {
        String requestId = MDC.get(REQUEST_ID_HEADER);
        CompletableFuture.runAsync(()->{
            try {
                MDC.put(REQUEST_ID_HEADER, requestId);
                if(time != null && time > 0){
                    Thread.sleep( time);
                }
                send(exchange, routingKey, t);
            }catch (Exception e){
                log.error("推送消息异常,t:{},",t,e);
            }
        }, executor);
    }


    /**
     * 根据exchange和routingKey 异步发送消息
     *
     * @param exchange 交换机
     * @param routingKey 路由KEY
     * @param t 数据
     * @param <T> 数据类型
     */
    public <T> void sendAsyn(String exchange, String routingKey, T t){
        sendAsyn(exchange, routingKey, t, null);
    }

}

代码解析

让我们逐步解析RabbitMqHelper类及其各个方法的作用。该类主要负责与RabbitMQ进行消息的发送,包括同步发送、延迟发送以及异步发送。

代码解析

类定义

@Slf4j
public class RabbitMqHelper {
    private final RabbitTemplate rabbitTemplate;
    private final MessagePostProcessor processor = new BasicIdMessageProcessor();
    private final ThreadPoolTaskExecutor executor;
    
    public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        executor = new ThreadPoolTaskExecutor();
        //配置核心线程数
        executor.setCorePoolSize(10);
        //配置最大线程数
        executor.setMaxPoolSize(15);
        //配置队列大小
        executor.setQueueCapacity(99999);
        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix("mq-async-send-handler");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
    }
}
  • RabbitTemplate:Spring提供的用于与RabbitMQ进行交互的核心类。它负责发送和接收消息。
  • MessagePostProcessorprocessor是一个消息处理器,这里使用了BasicIdMessageProcessor,用于在发送消息时添加附加信息(如消息ID)。
  • ThreadPoolTaskExecutor:用于异步任务处理的线程池,允许在后台执行消息发送任务。

构造函数

public RabbitMqHelper(RabbitTemplate rabbitTemplate) {
    this.rabbitTemplate = rabbitTemplate;
    executor = new ThreadPoolTaskExecutor();
    //配置核心线程数
    executor.setCorePoolSize(10);
    //配置最大线程数
    executor.setMaxPoolSize(15);
    //配置队列大小
    executor.setQueueCapacity(99999);
    //配置线程池中的线程的名称前缀
    executor.setThreadNamePrefix("mq-async-send-handler");
    // 设置拒绝策略
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //执行初始化
    executor.initialize();
}
  • 线程池配置:设置了线程池的核心线程数、最大线程数、队列容量及拒绝策略。CallerRunsPolicy策略会在线程池达到最大线程数时,将任务交给调用者线程执行。

同步发送消息

public <T> void send(String exchange, String routingKey, T t) {
    log.debug("准备发送消息,exchange:{}, RoutingKey:{}, message:{}", exchange, routingKey, t);
    String id = UUID.randomUUID().toString(true);
    CorrelationData correlationData = new CorrelationData(id);
    rabbitTemplate.setReplyTimeout(500);
    rabbitTemplate.convertAndSend(exchange, routingKey, t, processor, correlationData);
}
  • 日志记录:记录发送消息的基本信息(交换机、路由键、消息内容)。
  • 消息标识:生成唯一的消息ID,用于消息确认。
  • 超时时间:设置发送消息的超时时间为500毫秒。
  • 发送消息:使用rabbitTemplate发送消息,同时指定消息处理器processor和消息ID。

延迟发送消息

public <T> void sendDelayMessage(String exchange, String routingKey, T t, Duration delay) {
    String id = UUID.randomUUID().toString(true);
    CorrelationData correlationData = new CorrelationData(id);
    rabbitTemplate.setReplyTimeout(500);
    rabbitTemplate.convertAndSend(exchange, routingKey, t, new DelayedMessageProcessor(delay), correlationData);
}
  • 延迟处理器DelayedMessageProcessor是一个自定义处理器,用于设置消息的延迟时间。
  • 其他逻辑:与send方法类似,包括生成唯一ID、设置超时时间和发送消息。

异步发送消息(带延迟)

public <T> void sendAsyn(String exchange, String routingKey, T t, Long time) {
    String requestId = MDC.get(REQUEST_ID_HEADER);
    CompletableFuture.runAsync(() -> {
        try {
            MDC.put(REQUEST_ID_HEADER, requestId);
            if (time != null && time > 0) {
                Thread.sleep(time);
            }
            send(exchange, routingKey, t);
        } catch (Exception e) {
            log.error("推送消息异常,t:{},", t, e);
        }
    }, executor);
}
  • MDC:用于日志记录的MDC(Mapped Diagnostic Context),传递请求ID。
  • 延迟:如果time参数非空且大于0,线程会休眠指定时间以实现延迟。
  • 异步执行:使用CompletableFuture.runAsync在后台线程池中异步执行消息发送。

异步发送消息(不带延迟)

public <T> void sendAsyn(String exchange, String routingKey, T t) {
    sendAsyn(exchange, routingKey, t, null);
}
  • 重载方法:调用前述的异步发送方法,但不设置延迟时间。

总结

  • RabbitMqHelper:封装了与RabbitMQ交互的逻辑,包括同步、延迟和异步发送消息。
  • send方法:同步发送消息,并设置消息标识和超时时间。
  • sendDelayMessage方法:发送延迟消息,使用DelayedMessageProcessor来处理延迟逻辑。
  • sendAsyn方法:异步发送消息,可以选择是否设置延迟。

每个方法中都配置了消息ID、超时时间以及可能的延迟处理,确保消息的正确发送和处理。线程池用于管理异步任务,提高系统的处理能力。

自定义的消息处理器

BasicIdMessageProcessor

public class BasicIdMessageProcessor implements MessagePostProcessor {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        String requestId = MDC.get(REQUEST_ID_HEADER);
        if (requestId == null) {
            requestId = UUID.randomUUID().toString(true);
        }
        // 写入RequestID标示
        message.getMessageProperties().setHeader(REQUEST_ID_HEADER, requestId);
        return message;
    }
}

DelayedMessageProcessor

public class DelayedMessageProcessor extends BasicIdMessageProcessor {

    private final long delay;

    public DelayedMessageProcessor(Duration delay) {
        this.delay = delay.toMillis();
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        // 1.添加消息id
        super.postProcessMessage(message);
        // 2.添加延迟时间
        message.getMessageProperties().setHeader("x-delay", delay);
        return message;
    }
}

RabbitMQ的配置类

@Configuration
@ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class})
public class MqConfig implements EnvironmentAware{

    private String defaultErrorRoutingKey;
    private String defaultErrorQueue;

    @Bean(name = "rabbitListenerContainerFactory")
    @ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple",
            matchIfMissing = true)
    SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
            SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
            ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
        factory.setAfterReceivePostProcessors(message -> {
            Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER);
            if(header != null) {
                MDC.put(REQUEST_ID_HEADER, header.toString());
            }
            return message;
        });
        return factory;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper mapper){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
        // 2.配置自动创建消息id,用于识别不同消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }

    /**
     * <h1>消息处理失败的重试策略</h1>
     * 本地重试失败后,消息投递到专门的失败交换机和失败消息队列:error.queue
     */
    @Bean
    @ConditionalOnClass(MessageRecoverer.class)
    @ConditionalOnMissingBean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        // 消息处理失败后,发送到错误交换机:error.direct,RoutingKey默认是error.微服务名称
        return new RepublishMessageRecoverer(
                rabbitTemplate, ERROR_EXCHANGE, defaultErrorRoutingKey);
    }

    /**
     * rabbitmq发送工具
     *
     */
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnClass(RabbitTemplate.class)
    public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate){
        return new RabbitMqHelper(rabbitTemplate);
    }

    /**
     * 专门接收处理失败的消息
     */
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange(ERROR_EXCHANGE);
    }

    @Bean
    public Queue errorQueue(){
        return new Queue(defaultErrorQueue, true);
    }

    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey);
    }

    @Override
    public void setEnvironment(Environment environment) {
        String appName = environment.getProperty("spring.application.name");
        this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName;
        this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName);
    }
}

MqConfig类是一个Spring配置类,用于配置RabbitMQ的相关设置和组件。下面是对该类及其方法的详细解析:

类和接口实现

@Configuration
@ConditionalOnClass(value = {MessageConverter.class, AmqpTemplate.class})
public class MqConfig implements EnvironmentAware {
    // ...
}
  • @Configuration:指示这是一个Spring配置类。
  • @ConditionalOnClass:仅在类路径中存在MessageConverterAmqpTemplate类时,才会加载这个配置类。
  • implements EnvironmentAware:该类实现了EnvironmentAware接口,用于在配置类中访问Spring环境信息。

属性

private String defaultErrorRoutingKey;
private String defaultErrorQueue;
  • defaultErrorRoutingKey:默认的错误路由键。
  • defaultErrorQueue:默认的错误队列名称。

Bean定义

rabbitListenerContainerFactory

@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
        SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
        ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
    factory.setAfterReceivePostProcessors(message -> {
        Object header = message.getMessageProperties().getHeader(REQUEST_ID_HEADER);
        if (header != null) {
            MDC.put(REQUEST_ID_HEADER, header.toString());
        }
        return message;
    });
    return factory;
}
  • @Bean:声明一个Spring Bean,名称为rabbitListenerContainerFactory
  • @ConditionalOnProperty:仅当配置属性spring.rabbitmq.listener.type的值为simple时,才会创建这个Bean,默认情况下也会创建。
  • SimpleRabbitListenerContainerFactory:用于创建SimpleMessageListenerContainer的工厂。
  • SimpleRabbitListenerContainerFactoryConfigurer:用于配置工厂的默认配置。
  • ConnectionFactory:RabbitMQ连接工厂。
  • ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>>:用于定制SimpleMessageListenerContainer的自定义配置。
  • setAfterReceivePostProcessors:配置消息处理后处理器,处理接收到的消息,将消息头中的请求ID放到MDC中,以便日志记录。

messageConverter

@Bean
public MessageConverter messageConverter(ObjectMapper mapper) {
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(mapper);
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}
  • @Bean:声明一个Spring Bean,类型为MessageConverter
  • Jackson2JsonMessageConverter:使用Jackson库将消息内容转换为JSON格式。
  • setCreateMessageIds(true):自动生成消息ID,以便在消息传递和处理过程中跟踪消息。

republishMessageRecoverer

@Bean
@ConditionalOnClass(MessageRecoverer.class)
@ConditionalOnMissingBean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
    return new RepublishMessageRecoverer(
            rabbitTemplate, ERROR_EXCHANGE, defaultErrorRoutingKey);
}
  • @Bean:声明一个Spring Bean,类型为MessageRecoverer
  • @ConditionalOnClass:仅当类路径中存在MessageRecoverer时才会创建这个Bean。
  • @ConditionalOnMissingBean:仅当没有其他类型为MessageRecoverer的Bean时,才会创建这个Bean。
  • RepublishMessageRecoverer:用于消息处理失败时,将消息重新发布到错误交换机和队列。

rabbitMqHelper

@Bean
@ConditionalOnMissingBean
@ConditionalOnClass(RabbitTemplate.class)
public RabbitMqHelper rabbitMqHelper(RabbitTemplate rabbitTemplate) {
    return new RabbitMqHelper(rabbitTemplate);
}
  • @Bean:声明一个Spring Bean,类型为RabbitMqHelper
  • @ConditionalOnMissingBean:仅当没有其他类型为RabbitMqHelper的Bean时,才会创建这个Bean。
  • @ConditionalOnClass:仅当类路径中存在RabbitTemplate时才会创建这个Bean。
  • RabbitMqHelper:封装了RabbitMQ消息发送的辅助工具类。

errorMessageExchange

@Bean
public DirectExchange errorMessageExchange() {
    return new DirectExchange(ERROR_EXCHANGE);
}
  • @Bean:声明一个Spring Bean,类型为DirectExchange
  • DirectExchange:用于定义一个直接交换机,用于将消息路由到指定的队列。

errorQueue

@Bean
public Queue errorQueue() {
    return new Queue(defaultErrorQueue, true);
}
  • @Bean:声明一个Spring Bean,类型为Queue
  • Queue:用于定义一个队列,true表示队列是持久化的,即消息会在RabbitMQ重启后依然存在。

errorBinding

@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(defaultErrorRoutingKey);
}
  • @Bean:声明一个Spring Bean,类型为Binding
  • Binding:将队列与交换机绑定,并使用指定的路由键进行消息路由。

环境设置

@Override
public void setEnvironment(Environment environment) {
    String appName = environment.getProperty("spring.application.name");
    this.defaultErrorRoutingKey = ERROR_KEY_PREFIX + appName;
    this.defaultErrorQueue = StringUtils.format(ERROR_QUEUE_TEMPLATE, appName);
}
  • setEnvironment:从Environment中获取应用名称,并根据应用名称设置默认的错误路由键和错误队列名称。

总结

MqConfig类负责RabbitMQ的配置,包括消息转换器、消息处理失败的重试策略、发送工具和错误处理机制。它使用了Spring的条件化注解来确保在特定条件下创建Bean,并在EnvironmentAware接口的setEnvironment方法中根据应用名称动态设置错误队列和路由键。

评论( 0 )

  • 博主 Mr Cai
  • 坐标 河南 信阳
  • 标签 Java、SpringBoot、消息中间件、Web、Code爱好者

文章目录