代码编织梦想

前言

在日常的开发中我们常常会遇到需要在一个事情完成之后的一段时间后做另一件事情(例如:下单成功后半小时未付款取消订单、用户注册成功五分钟后提醒用户绑定邮箱等),这种业务场景的特点是开始时间不确定,因此传统的定时任务不适合处理此类业务。

RabbitMQ 延时消息说明

RabbitMQ使用死信队列实现消息延时消费的原理就是消息发送到一个没有消费者的队列里,给队列或消息设置过期时间(消息和队列都设置了过期时间,按时间短的为准),等消息过期后自动发送到另一个队列里,消费者消费第二个队列里的消息就实现了消息的延时消费,延时的时间就是消息的过期时间。

RabbitMQ延时队列示意图

  • 延时路由key: 普通的路由key。
  • 死信路由key: 普通的路由key。
  • 延时交换机: 一个普通的交换机,延时消息发送到此交换机。
  • 延时队列: 根据延时路由key绑定到延时交换机,设置此队列的消息过期时间TTL、设置死信交换机、设置死信路由key。
  • 死信交换机: 一个普通交换机,叫它死信交换机的原因是它会接收延时队列里过期的消息(延时消息)。
  • 死信队列: 一个普通的队列,叫它死信队列的原因是它根据死信路由key绑定到死信交换机,最终延时队列里过期的消息(延时消息)会到此队列。延时消息的消费者从此队列消费消息。

Spring Boot + RabbitMQ 实现消息的延时消费

1、导入依赖

注: 导入 web 依赖是为了方便测试。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.qixi</groupId>
    <artifactId>qixi-mq-delay</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>qixi-mq-delay</name>
    <description>MQ Delay project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

2、编写配置

server:
  port: 8080

spring:
  rabbitmq:
    host: 192.168.40.154
    port: 5672
    username: admin
    password: 123456
    virtual-host: /

3、定义交换机、队列、路由key

package com.qixi.mq.delay.common.constant;

/**
 * RabbitMQ常量
 *
 * @author ZhengNC
 * @date 2020/9/21 11:40
 */
public interface RabbitConstant {
    /**
     * 交换机
     */
    interface Exchanges{

        /**
         * 死信交换机
         */
        String deadExchange = "spring.boot.dead.exchange";

        /**
         * 延时交换机
         */
        String ttlExchange = "spring.boot.ttl.exchange";
    }

    /**
     * 队列
     */
    interface Queues{

        /**
         * 死信队列
         */
        String deadQueue = "spring.boot.dead.queue";

        /**
         * 延时队列
         */
        String ttlQueue = "spring.boot.ttl.queue";
    }

    /**
     * 路由key
     */
    interface RouterKey{

        /**
         * 死信路由key
         */
        String deadRouteKey = "dead.route.key";

        /**
         * 延时路由key
         */
        String ttlRouteKey = "ttl.route.key";
    }
}

4、配置 RabbitMQ 绑定关系

package com.qixi.mq.delay.config;

import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * RabbitMQ配置
 *
 * @author ZhengNC
 * @date 2020/9/14 10:40
 */
@Configuration
public class RabbitConfig {

    /**
     * 死信队列
     *
     * @return
     */
    @Bean("deadQueue")
    public Queue deadQueue(){
        return new Queue(RabbitConstant.Queues.deadQueue);
    }

    /**
     * 延时队列
     *
     * @return
     */
    @Bean("ttlQueue")
    public Queue ttlQueue1(){
        Map<String, Object> args = new HashMap<>(16);
        // 指定消息到期后发送到的死信交换机
        args.put("x-dead-letter-exchange", RabbitConstant.Exchanges.deadExchange);
        // 指定消息到期后发送到的路由键
        args.put("x-dead-letter-routing-key", RabbitConstant.RouterKey.deadRouteKey);
        // 声明队列的过期时间(TTL)(单位:毫秒)
        args.put("x-message-ttl", 10000);
        return QueueBuilder
                .durable(RabbitConstant.Queues.ttlQueue)
                .withArguments(args)
                .build();
    }

    /**
     * 死信队列交换机
     *
     * @return
     */
    @Bean("deadExchange")
    public DirectExchange deadExchange(){
        return new DirectExchange(RabbitConstant.Exchanges.deadExchange);
    }

    /**
     * 延时队列交换机
     *
     * @return
     */
    @Bean("ttlExchange")
    public DirectExchange ttlExchange(){
        return new DirectExchange(RabbitConstant.Exchanges.ttlExchange);
    }

    /**
     * 绑定延时队列和延时交换机
     *
     * @param ttlQueue
     * @param ttlExchange
     * @return
     */
    @Bean
    public Binding ttlQueue_ttlExchange(
            @Qualifier("ttlQueue") Queue ttlQueue,
            @Qualifier("ttlExchange") DirectExchange ttlExchange){
        return BindingBuilder.bind(ttlQueue)
                .to(ttlExchange)
                .with(RabbitConstant.RouterKey.ttlRouteKey);
    }

    /**
     * 绑定死信队列和死信交换机
     *
     * @param deadQueue
     * @param deadExchange
     * @return
     */
    @Bean
    public Binding deadQueue_deadExchange(
            @Qualifier("deadQueue") Queue deadQueue,
            @Qualifier("deadExchange") DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue)
                .to(deadExchange)
                .with(RabbitConstant.RouterKey.deadRouteKey);
    }
}

5、延时消息生产者

package com.qixi.mq.delay.producer;

import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 延时消息生产者
 *
 * @author ZhengNC
 * @date 2020/9/21 14:15
 */
@Service
public class TTLProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送一条延时消息
     *
     * @param message
     */
    public void sendTTLMessage(String message){
        rabbitTemplate.convertAndSend(
                RabbitConstant.Exchanges.ttlExchange,
                RabbitConstant.RouterKey.ttlRouteKey,
                message);
    }
}

6、延时消息的消费者

package com.qixi.mq.delay.consumer;

import com.qixi.mq.delay.common.constant.RabbitConstant;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;

/**
 * 延时消息的消费者
 *
 * @author ZhengNC
 * @date 2020/9/21 14:08
 */
@Component
public class TTLConsumer {

    /**
     * 消费延时消息
     *
     * @param message
     */
    @RabbitListener(queues = RabbitConstant.Queues.deadQueue)
    public void ttlConsumer(String message){
        System.out.println("消费了一条消息,消费时间:"
                + DateTimeFormatter.ofPattern("HH:mm:ss")
                .format(LocalTime.now()));
        System.out.println(message);
    }
}

7、编写接口测试发送消息

统一接口响应格式:

package com.qixi.mq.delay.common.dto;

import lombok.Getter;
import lombok.Setter;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 统一接口响应格式
 *
 * @author ZhengNC
 * @date 2020/9/15 14:24
 */
@Getter
@Setter
public class ResponseEntity<T> {

    private String code = "200";

    private String msg = "success";

    private String timestamp = DateTimeFormatter
            .ofPattern("yyyy-MM-dd HH:mm:ss")
            .format(LocalDateTime.now());

    private T data;

    public ResponseEntity(){}

    public ResponseEntity(T data) {
        this.data = data;
    }

    public ResponseEntity(String code, String msg, T data) {
        this.code = code;
        this.msg = msg;
        this.data = data;
    }

    public static ResponseEntity success(){
        return new ResponseEntity();
    }

    public static ResponseEntity fail(){
        return new ResponseEntity("500", "fail", null);
    }
}

http接口:

package com.qixi.mq.delay.controller;

import com.qixi.mq.delay.common.dto.ResponseEntity;
import com.qixi.mq.delay.producer.TTLProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;

/**
 * @author ZhengNC
 * @date 2020/9/21 14:27
 */
@RestController
@RequestMapping("ttl")
public class TTLProducerController {

    @Autowired
    private TTLProducer producer;

    /**
     * 发送延时消息
     *
     * @return
     */
    @GetMapping("send")
    public ResponseEntity autoSend(){
        DateTimeFormatter timeFormatter = DateTimeFormatter.ofPattern("HH:mm:ss");
        StringBuilder message = new StringBuilder("这是一条延时消息,消息的发送时间为:");
        message.append(timeFormatter.format(LocalTime.now()));
        producer.sendTTLMessage(message.toString());
        return ResponseEntity.success();
    }
}

8、测试结果

消费了一条消息,消费时间:14:48:38
这是一条延时消息,消息的发送时间为:14:48:28

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/sinat_27245917/article/details/108711746

论程序的健壮性——就看Redis-爱代码爱编程

“众里寻他千百度,蓦然回首,那人却在,灯火阑珊处”。多年的IT生涯,一直希望自己写的程序能够有很强的健壮性,也一直希望能找到一个高可用的标杆程序去借鉴学习,不畏惧内存溢出、磁盘满了、断网、断电、机器重启等等情况。但意想不到的是,这个标杆程序竟然就是从一开始就在使用的分布式缓存——Redis。 Redis(Remote Dictionary S

ES分词-爱代码爱编程

## 什么是分词 ``` 把文本转换为一个个的单词,分词称之为analysis。es默认只对英文语句做分词,中文不支持,每个中文字都会被拆分为独立的个体。 ``` ## es内置分词器 ``` - standard:默认分词,单词会被拆分,大小会转换为小写。 - simple:按照非字母分词。大写转为小写。 - whitespace:按照空格分词。

Java3 方法-爱代码爱编程

方法 目录 何为方法方法的定义以调用方法重载命令行传参可变参数递归什么是方法? Java方法时语句的集合,在一起执行一个功能 方法时解决一类问题的步骤的有序 组合方法包含于类或对象中方法在程序中被创建,在其他地方被引用设计方法的原则:方法的本意是功能块,实现某个功能的语句快的集合。我么设计方法的时候,最好保持的方法的原子性,就是一个方法只完成一个功能

深入理解NIO底层原理--epoll多路复用-爱代码爱编程

在使用选择和轮询时,我们管理用户空间上的所有内容,并在每次调用时发送集合以等待。要添加另一个套接字,我们需要将其添加到集合中并再次调用select/poll。 epoll系统调用帮助我们创建和管理内核中的上下文。我们将任务分为3个步骤: 使用epoll_create在内核中创建上下文使用epoll_ctl在上下文中添加或删除文件描述符使用epoll_w

简易解析excel数据-爱代码爱编程

笔记存根: maven依赖: <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.9<

设计模式初探之单例模式-爱代码爱编程

设计模式初探之单例模式 单例模式概念: ​ 单例模式一种常见得设计模式,顾名思义就是只能有一个实例。 单例模式应用场景: ​ 线程池、缓存、日志对象以及各种配置的读取。 单例模式实现方法: 懒汉模式:能实现懒加载,但是在并发情况下使用synchronized 对性能有所影响。饿汉模式:不能懒加载,消耗较大,在并发情况下是线程安全的。Holder

想在生产搞事情?那试试这些 Redis 命令-爱代码爱编程

点关注,不迷路;持续更新Java相关技术及资讯!!! 内容源于群友投稿!感谢支持! 小z最近又双叒叕犯错了 事情是这样的,前一段时间小z公司生产交易偶发报错,一番排查下来最终原因是因为 Redis 命令执行超时。 可是令人不解的是,生产交易仅仅使用 Redis set 这个简单命令,这个命令讲道理是不可能会执行这么慢。 那到底是什么导致这

Spring Boot学习 RabbitMQ 消息队列(3) 延时队列的介绍和使用 保证消息可靠性的方法 解决 消息丢失 消息重复 消息积压-爱代码爱编程

Spring Boot学习 RabbitMQ 消息队列 基本概念 web端操作 SpringBoot整合 4.延时队列 实现定时任务 设置队列TTL的实现方式: 此次实现方式:使用同一个交换机的不同路由键实现 代码实现: 4.1 创建组件 package com.rwp.gulimail.order.config;

《面试心经》---Redis基础-爱代码爱编程

一叶障目,不见Offer 前言 Redis 在当今后端技术的世界有着举足轻重的地位。几乎所有的后端技术面试官都会围绕 Redis 的原理和使用对面试者进行全方位、无死角的盘问。来自Redis的盘问或许会迟到,但它绝不会缺席。看着一个个自信的小眼神被面试官折磨的黯淡无光,老衲实属不忍。在一个月黑风高的夜晚,终于下定决心,收集整理Redis面试资

Netty连接池FixedChannelPool连接未释放问题的排查总结-爱代码爱编程

1 前言 前几天我们又遇到了一个Netty报从连接池获取连接超时异常从而导致整个服务不可用的异常,报的具体异常信息是Exception accurred when acquire channel from channel pool:TimeoutException。当时自己看了这个异常信息,有种似曾相识的感觉,印象中自己第一次接触到该异常是不久前也遇到了

用数组模拟队列-爱代码爱编程

思路导图 代码块如下 public class ArrayQueueDemo { public static void main(String[] args) { //先新建一个ArrayQueue类 ArrayQueue queue = new ArrayQueue(3); Scanner s

进程间的通信-爱代码爱编程

@进程间通信: 1.之前学过socket,完成客户端与服务器的通信 2.通过队列完成进程之间的通信: 1 import multiprocessing 2 3 def send_msg(q): 4 """发送数据""" 5 data=[11,22,33,44] 6 for temp in data: