代码编织梦想

        RabbitMQ中的路由模式(Direct模式)应该是在实际工作中运用的比较多的一种模式了,这个模式和发布与订阅模式的区别在于路由模式需要有一个routingKey,在配置上,交换机类型需要注入DirectExchange类型的交换机bean对象。在交换机和队列的绑定过程中,绑定关系需要在绑定一个路由key。由于在实际的工作中不大可能会用自动确认的模式,所以我们在整合路由模式的过程中,依然采用发送消息双确认机制和消费端手动确认的机制来保证消息的准确送达与消息防丢失。话不多少,进入正题。

1. 添加配置

        在配置文件中,配置rabbitmq的相关账号信息,开启消息发送回调机制,配置文件其实和发布订阅模式是一样的。配置详情如下:

server:
  port: 10001

spring:
  application:
    name: springboot-rabbitmq-s1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    # 发送者开启 return 确认机制
    publisher-returns: true
    # 发送者开启 confirm 确认机制
    publisher-confirm-type: correlated

2. 创建配置类

        创建配置类RabbitMQConfig,用于声明交换机、队列,建立队列和交换机的绑定关系,注入RabbitTemplate的bean对象。配置类详情如下:

package com.study.rabbitmq.config;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author alen
 * @DATE 2022/6/7 23:50
 */
@Slf4j
@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "direct-order-exchange";
    public static final String SMS_QUEUE = "sms-direct-queue";
    public static final String EMAIL_QUEUE = "email-direct-queue";
    public static final String WECHAT_QUEUE = "wechat-direct-queue";

    /**
     * 1.
     * 声明交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        /**
         * directExchange的参数说明:
         * 1. 交换机名称
         * 2. 是否持久化 true:持久化,交换机一直保留 false:不持久化,用完就删除
         * 3. 是否自动删除 false:不自动删除 true:自动删除
         */
        return new DirectExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 2.
     * 声明队列
     * @return
     */
    @Bean
    public Queue smsQueue() {
        /**
         * Queue构造函数参数说明
         * 1. 队列名
         * 2. 是否持久化 true:持久化 false:不持久化
         */
        return new Queue(SMS_QUEUE, true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue(EMAIL_QUEUE, true);
    }

    @Bean
    public Queue wechatQueue() {
        return new Queue(WECHAT_QUEUE, true);
    }

    /**
     * 3.
     * 队列与交换机绑定
     */
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding wechatBinding() {
        return BindingBuilder.bind(wechatQueue()).to(directExchange()).with("wechat");
    }

    /**
     * 将自定义的RabbitTemplate对象注入bean容器
     *
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启消息推送结果回调
        rabbitTemplate.setMandatory(true);
        //设置ConfirmCallback回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("==============ConfirmCallback start ===============");
                log.info("回调数据:{}", correlationData);
                log.info("确认结果:{}", ack);
                log.info("返回原因:{}", cause);
                log.info("==============ConfirmCallback end =================");
            }
        });
        //设置ReturnCallback回调
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("==============ReturnCallback start ===============");
                log.info("发送消息:{}", JSONUtil.toJsonStr(message));
                log.info("结果状态码:{}", replyCode);
                log.info("结果状态信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由key:{}", routingKey);
                log.info("==============ReturnCallback end =================");
            }
        });
        return rabbitTemplate;
    }
}

3. 消费者配置

        在消费者项目的配置文件中开启手动确认,配置详情如下:

server:
  port: 10002

spring:
  application:
    name: springboot-rabbitmq-s2
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    virtual-host: /
    username: admin
    password: admin
    listener:
      simple:
        # 表示消费者消费成功消息以后需要手工的进行签收(ack确认),默认为 auto
        acknowledge-mode: manual

4. 创建消费者

        分别创建三个消费者,DirectEmailConsumer、DirectSmsConsumer、DirectWechatConsumer来监听对应的队列,有消息后进行消费,三个消费者大同小异,分别如下

4.1 DirectEmailConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:54
 */
@Slf4j
@Service
@RabbitListener(queues = {"email-direct-queue"}) //监听队列
public class DirectEmailConsumer {

    //标记消费者逻辑执行方法
    @RabbitHandler
    public void emailMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("Email direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.2 DirectSmsConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author alen
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"sms-direct-queue"}) //监听队列
public class DirectSmsConsumer {

    @RabbitHandler
    public void smsMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("sms direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

4.3 DirectWechatConsumer

package com.study.rabbitmq.service.direct;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @Author chaoxian.wu
 * @DATE 2022/6/10 22:55
 */
@Slf4j
@Service
@RabbitListener(queues = {"wechat-direct-queue"}) //监听队列
public class DirectWechatConsumer {

    @RabbitHandler
    public void wechatlMessage(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("wechat direct --接收到消息:{}", msg);

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            if (message.getMessageProperties().getRedelivered()) {
                log.error("消息已重复处理失败,拒绝再次接收...");
                //basicReject: 拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似 false表示消息不再重新进入队列
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {
                log.error("消息即将再次返回队列处理...");
                // basicNack:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    }
}

以上就是全部的代码部分,接下来我们在进入测试,看看实际效果如何,先发布一个routingKey=sms的消息,查看是不是只有对应的一个队列中接收到消息,消息发送详情:

package com.study.rabbitmq;

import com.study.rabbitmq.entity.Order;
import com.study.rabbitmq.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
class SpringbootRabbitmqS1ApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        for (long i = 1; i < 2; i++) {
            //交换机名称
            String exchangeName = "direct-order-exchange";
            //路由key
            String routingKey = "sms";
            Order order = buildOrder(i);
            orderService.createOrder(order, routingKey, exchangeName);
        }
    }

    private Order buildOrder(long id) {
        Order order = new Order();
        order.setRequestId(id);
        order.setUserId(id);
        order.setOrderNo(UUID.randomUUID().toString());
        order.setAmount(10L);
        order.setGoodsNum(1);
        order.setTotalAmount(10L);
        return order;
    }
}

我们登录rabbitmq管理后台查看下,只有sms-direct-queue这个队列有一条消息,效果如下:

 我们启动消费者,看下是不是只有监听了sms-direct-queue这个队列的消费者有消费日志,效果如下:

再发一条routingKey=email的消息,消费的日志,效果图示如下

 到此其实已经springboot整合rabbitmq的路由模式结束了,这种模式在工作中还是比较常见的,我们演示的是单点的效果,实际工作中,不大可能会使用服务单点部署,现在都讲究服务的高可用,就得服务集群部署,又会涉及到消息重复消费的问题需要处理,我个人觉得,遇到重复消费问题,我第一时间想到的就是分布式锁,哈哈~。但是锁什么呢?肯定是消息中的具备唯一性的属性。来达到防止消息的重复消费。具体的时间,后续实现后再分享。

        整个过程中,其实还存在一个小问题没有验证,就是ReturnCallback回调机制没有触发,因为这个得发生在交换机将消息发送到队列的时候失败才会触发,那么我们就发送一个不存在的routingKey就可以触发了,我们发送一个routingKey=duanxin的消息,这个肯定不会发送成功,我们通过断点来看看效果,效果如下:

然后我们常见的就全部整合完成了,当然,开启了双确认机制,虽然我们可以检测到消息投送的结果,然后可以针对投送失败的结果进行预警。但是开启了这个操作,就必然会对消息的处理效率产生影响。所以还得根据实际业务场景而定是否需要使用这个确认机制。好了,到此就结束了,欢迎大佬讨论!!!

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/cssweb_sh/article/details/125229263

spring boot整合rabbitmq详细教程_梦里梦不出梦里梦的梦的博客-爱代码爱编程_springboot使用rabbitmq

1.首先我们简单了解一下消息中间件的应用场景   异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种1.串行的方式;2.并行的方式  (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必

spring boot入门系列之:十、spring boot整合rabbitmq_exklin的博客-爱代码爱编程_rabbitmqconstant.queue

spring-boot-rabbitmq 开发环境 开发工具:Intellij IDEA 2018.2.6 springboot: 2.0.6.RELEASE jdk:1.8.0_192 maven: 3.6.0

rabbitmq + Spring boot 整合-广播模式- 路由-(02)-爱代码爱编程

目录 rabbitmq-直接交换模式 rabbitmq-广播模式- 路由 rabbitmq-广播模式- 交换机 rabbitmq-转发模式 rabbitmq-延迟消费 在【队列上】 rabbitmq-延迟消费 在【消息上】 源码地址:https://gitee.com/caiwang/rabbitmq-project 回顾: rabbit

Spring Boot 整合RabbitMQ 配置文件方式-爱代码爱编程

1. 初始化配置 1.1 引入依赖 <!--spring整合RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp<

Spring Boot整合RabbitMq-爱代码爱编程

1.整合Spring boot 1.1添加依赖 Spring Boot为AMQP提供了自动化配置spring-boot-starter-amqp,因此首先创建Spring boot项目并添加依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.a

RabbitMQ路由模式direct-爱代码爱编程

1、目标 根据不同的路由key,将消息分发到不同的消息队列queue中,并被消费者消费 2、实现步骤 创建生产者工程:rabbitmq-direct-producer创建消费者工程:rabbitmq-direct-consumer引入spring-boot-rabbitmq的依赖进行消息的分发和测试5:查看和观察web控制台的状况3、创建生产者工程

Spring Boot整合RabbitMQ-RabbitMQ-爱代码爱编程

概述 消息中间件的应用场景主要有:异步处理、应用解耦、流量削峰等。 生产者发送消息通过不同类型的交互机发送到不同的消息队列中。 消费者只关心消息队列,与交换机无关,至于生产者如何发送的(直接发送到队列还是通过交换机的方式发送)毫不关心。 导入maven依赖 <?xml version="1.0" encoding="UTF-8"?> &l

Springboot整合RabbitMQ实现路由模型-爱代码爱编程

生产者代码 package com.example.newrabbitmq; import org.junit.jupiter.api.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org

Spring Boot集成RabbitMQ-爱代码爱编程

📢📢📢📣📣📣 哈喽!大家好,我是【一心同学】,一位上进心十足的【Java领域博主】!😜😜😜 ✨【一心同学】的写作风格:喜欢用【通俗易懂】的文笔去讲解每一个知识点,而不喜欢用【高大上】的官方陈述。 ✨【一心同学】博客的领域是【面向后端技术】的学习,未来会持续更新更多的【后端技术】以及【学习心得】。 ✨如果有对【后端技术】感兴

rocketmq:一次完整的通信流程是怎样的?_summer_west_fish的博客-爱代码爱编程

Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。 Producer 只能将消息发送到 Broker master; Consumer 则不一样,它同时和提

消息队列-功能、性能、运维对比_impl_sunny的博客-爱代码爱编程

一、功能 1.1 消费推拉模式 1.2 延迟队列 消息延迟投递,当消息产生送达消息队列时,有些业务场景并不希望消费者立刻收到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。 延迟队列一般分为两种,基于消息的延迟和基于队列的延迟: 基于消息的延迟:为每条消息设置不同的延迟时间,当队列有新消息进入的时候根据延迟时间排序,当然这样会

【rabbitmq、spring boot】spring boot整合rabbitmq-爱代码爱编程

一、环境准备 引入pom依赖 <!--rabbitmq--> <dependency> <groupId>org.springf

rabbitmq学习笔记_mq 如何配置竞争消费-爱代码爱编程

1. MQ 的基本概念 1.1 MQ概述         MQ全称 M essage Q ueue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。 小结 MQ,消息队列,存储消息的中间件 分布式系统通信两种方式:直接远程调用 和 借助第三方 完成间接通信  发送方称