代码编织梦想

一:RabbitMQ消息Ack确认机制

1.确认种类

RabbitMQ的消息确认有两种。

  • 消息发送确认:这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。

发送确认分为两步:
一是确认是否到达交换器,
二是确认是否到达队列。

  • 消费接收确认:这种是确认消费者是否成功消费了队列中的消息。

2.消息发送确认

2.1 声明队列及交换机

//声明队列
    @Bean(name = "topic-queue1")
    public Queue topicQueue1(){
        return new Queue("topic-queue1");
    }
    @Bean(name = "topic-queue2")
    public Queue topicQueue2(){
        return new Queue("topic-queue2");
    }
    @Bean(name = "topic-queue3")
    public Queue topicQueue3(){
        return new Queue("topic-queue3");
    }
    //声明交换机
    //通配符模式下的交换机
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange("topic-exchange");
    }

    @Bean
    public Binding bindQueue1ToTopicExchange(@Qualifier("topic-queue1")Queue queue,TopicExchange topicExchange){
        //* 代表一个词
        //# 代表零个或者多个词
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.123.123");
    }
    @Bean
    public Binding bindQueue2ToTopicExchange(@Qualifier("topic-queue2")Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
    }
    @Bean
    public Binding bindQueue3ToTopicExchange(@Qualifier("topic-queue3")Queue queue,TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.#");
    }

2.2 生产者配置application

	#消息发送到交换机的确认
    publisher-confirms: true
    #消息由交换机转发到队列的确认
    publisher-returns: true

2.3.生产者MyConfirmCallBack

通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。
通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发

@Component
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
{
    @Autowired
    RabbitTemplate rabbitTemplate;

            //初始化加载方法
            @PostConstruct
            public void rabbit(){
                rabbitTemplate.setConfirmCallback(this);
                rabbitTemplate.setReturnCallback(this);
            }


    //通过实现confirmCallback接口,确认消息是否投递到了交换机中
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        System.out.println("消息投递的结果:"+correlationData);
        System.out.println("消息投递是否成功"+b);
        System.out.println("失败的原因"+s);
    }
    //交换机转发到队列的确认
    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息主体:"+message);
        System.out.println("应答码:"+i);
        System.out.println("原因描述:"+s);
        System.out.println("交换机:"+s1);
        System.out.println("路由健:"+s2);
    }
}

3.生产者SendMessageController

@RestController
public class SendMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("/sendMail/{message}")
    public String sendMail(@PathVariable("message")String message){
        rabbitTemplate.convertAndSend("topic-exchange","ex.123",message);
        return "发送";
    }
 }

3.消费者监听

//消息确认 /sendMail/{message}
    @RabbitListener(queues = "topic-queue3")
    @RabbitHandler
    public void  topicQueue3(String str, Channel channel, Message message){
        try{
            System.out.println("监听到了错误消息"+str);
            //1.当前消息的唯一标识。deliveryTag
            //2.消息的确认,false 代表确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        }catch (Exception ex){

        }
    }

在这里插入图片描述
在这里插入图片描述

二:消息延迟队列

1.什么是延迟队列

延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

  1. 订单超时关闭:在支付场景中,一般上订单在创建后30分钟或1小时内未支付的,会自动取消订单。
  2. 短信或者邮件通知:在一些注册或者下单业务时,需要在1分钟或者特定时间后进行短信或者邮件发送相关资料的。本身此类业务于主业务是无关联性的,一般上的做法是进行异步发送。
  3. 重试场景:比如消息通知,在第一次通知出现异常时,会在隔几分钟之后进行再次重试发送。

2.使用要求

RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过过期时间TTL和死信队列来模拟延迟队列。

2.1 过期时间TTL

RabbitMQ中可以对队列和消息分别设置TTL,TTL表明了一条消息可在队列中存活的最大时间。当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在TTL时间后死亡成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。

2.2 死信队列

设置了TTL的消息或队列最终会成为Dead Letter,当消息在一个队列中变成死信之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定此DLX的队列就是死信队列。

2.3 具体实现

在这里插入图片描述
延迟任务通过消息的TTL和Dead Letter Exchange来实现。我们需要建立2个队列,一个用于发送消息,一个用于消息过期后的转发目标队列

3.实例

3.1 rabbitConfig配置文件

//延迟队列
    //正常的交换机
    @Bean(name = "topic-Exchange")
    public TopicExchange topicExchange(){
        return new TopicExchange("topic-exchange");
    }
    //死信的交换机
    @Bean(name="dead-topic-Exchange")
    public TopicExchange DeadTopicExchange(){
        return new TopicExchange("dead-topic-exchange");
    }
    //正常的队列
    @Bean(name = "topic-queue")
    public Queue queue(){
        return new Queue("topic-queue");
    }
    //死信的队列
    @Bean(name = "dead-topic-queue")
    public Queue DeadQueue(){
        Map map = new HashMap<>();
        map.put("x-dead-letter-exchange","topic-exchange");
        map.put("x-dead-letter-routing-key","ex.123");
        //下面变量的含义:1.死信队列的名称,2.是否持久化,3.是否独享,排外,4.是否自动删除
        return new Queue("dead-topic-queue",true,false,false,map);
    }

    //绑定正常的交换机与队列
    @Bean
    public Binding bindQueueToExchange(@Qualifier("topic-queue")Queue queue,@Qualifier("topic-Exchange")TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("ex.*");
    }
    //绑定死信交换机与死信队列
    @Bean
    public Binding bindDeadQueueToDeadExchange(@Qualifier("dead-topic-queue")Queue queue,@Qualifier("dead-topic-Exchange")TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("any");
    }

3.2 生产者SendMessageController

@RequestMapping("/sendDeadMail/{message}")
    public String sendDeadMail(@PathVariable("message")String message){
        rabbitTemplate.convertAndSend("dead-topic-exchange", "any", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
               //设置30s过期,过期转发到指定路由 message.getMessageProperties().setExpiration("30000");
                return message;
            }
        });
        return "发送";
    }

3.3 消费者监听

//监听延迟消息
    @RabbitListener(queues = "topic-queue")
    public void  DeadTopicQueue(String str, Channel channel, Message message){

        try {
            System.out.println("监听到了延迟消息"+str);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

等死信队列过期后,转发到正常队列,监听器监听正常队列
30s过后,监听到消息

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接: https://blog.csdn.net/qq_45145809/article/details/109329795

零基础应该选择学习 C、C++、Java、python、web前端、C#、PHP、Linux选哪个编程语言好呢?-爱代码爱编程

众多的语言,到底哪一门才是适合我呢?   小白:大佬,大佬,编程语言也太多了,到底我应该选择哪一种呢? 大佬:首先呢,我们先对常见的编程语言,生动形象的总结一下。 编程界的法老C大叔,因年长、稳重被众人所熟知,“上古时期”好像就存在了,是诸多后兴语言的前辈 高级语言中,偏底层(厉害)的存在,常用于系统和硬件编程中。 跟随C大叔学习武

基于java的田径运动会报名系统-爱代码爱编程

设计运动会报名系统,采用JSP和mysql数据库开发技术,目的在于实现对运动员信息计算机化,网络化管理,达到在线信息查询、修改、更新及维护。完成对运动员信息录入、查询、统计和维护等功能设计。系统运行结果表明,程序稳定,操作简便、界面友好,具有较强的实用性。 田径运动会报名管理系统就是给学生进行网上报名,管理员管理报名信息的一种通用管理平台,从而方便管理人

实时推荐-2数据中文分词-爱代码爱编程

思路 效果 当用户点击某件商品,退出后发现下面都是相关商品 面临的问题 1.用户的随意性(今天点开了A,明天点开了B) 2.历史性(比如购面的是一件T恤,现在需要可能以后就不需要了) 3.实时性(当获得商品的速度应该是非常的快的,不能像离线运算计算结果需要好多秒,必须要求速度!) 思路 针对问题提供解决方案 1.计

《java程序设计基础》静态成员-爱代码爱编程

static称为静态修饰符,它可以修饰类中的成员。被static修饰的成员称为静态成员,也称为类成员,而不用static修饰的成员称为实例成员。1.实例成员、 在类定义中如果成员变量或成员方法没有用static修饰,则该成员为实例成员。对实例成员,我们并不陌生,因为在此之前编写的程序中,用到的都是实例成员。 必须先创建对象,再利用对象来调用方法,而无法不通

Java: trim()方法和strip()方法之间的区别-爱代码爱编程

Java在JDK11中的String类中引入strip方法,trim()和strip()功能类似,主要区别是: trim()可以去除字符串前后的半角空白字符strip()可以去除字符串前后的全角和半角空白字符半角和全角 半角是我们多数人在打字的时候使用的状态,如果我们不去刻意调整半全角,它会一直伴随着我们,因为半角状态下,人们已经习惯了这种打字模式,半

27岁学习java还有出路吗?-爱代码爱编程

三字经:苏老泉,二十七。始发愤,读书籍。 27岁想要转行Java行业是否有出路,对于这个问题没有绝对肯定的回答,主要还是看你自己到底是否想要从事这方面的岗位工作,是否想要学好,是否能拿出相应的执行力出来。27岁这个年龄并不大,我们尝试着把27转换成35,为什么有的人事业小成、家庭幸福,有的人却还在一无所有的起点上,一样通顺。努力,从来都不是一件与时间线性

iOS中的锁和线程同步-爱代码爱编程

线程同步和iOS中的锁 一、自旋锁(OSSpinLock) 一直占用cpu,相当于while循环已经不被推荐,因为可能会产生优先级反转;优先级反转 多任务的调度有许多算法,常见的有FIFO、优先级、时间片轮询、短任务优先;系统一般采取混合算法,因为不同进程特点不一样,单一的调度算法不能满足所有的场景;比如短任务算法需要在线程之间的切换消耗 和 任务

队列的顺序/链式 存储实现(Java版)-爱代码爱编程

队列的定义 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作, 而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。 进行插入操作的端称为入队,进行删除的操作成为出队。 队列的顺序存储实现 package algorithm.datastructure.queue; /* * 队列 * 队列

RabbitMQ的6种工作模式详解-爱代码爱编程

目录 RabbitMQ几种工作模式1.Work queues代码实例1.生产者1.application.yml2.RabbitMqConfig3.TestSend2.消费者1.配置相同,config相同2.RabbitMqListen监听获取消息3.运行2.Publish发布订阅模式代码实例1.生产者1.RabbitMqConfig3.TestS

双11的秒杀系统,是如何设计的?-爱代码爱编程

双11临近,又到了剁手时节。每年双11电商的秒杀玩法层出不穷,秒杀系统几乎成了所有互联网公司的“标配”。 秒杀系统中涉及到的很多技术点也是一二线大厂面试重点考察的点。秒杀系统也是我在面试中问得比较多的,时间充裕的同学可以看下B站阿里面试官关于秒杀系统的干货分享:淘宝秒杀系统怎么设计? 举个简单例子:双11,商家以4499的价格上架了某iphone,

证明ArrayList是线程不安全的-爱代码爱编程

import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueu

Stream流的这些操作,你得知道,对你工作有很大帮助-爱代码爱编程

Stream流 Stream(流)是一个来自数据源的元素队列并支持聚合操作: 元素是特定类型的对象,形成一个队列。 Java中的Stream并不会存储元素,而 是按需计算。数据源 流的来源。 可以是集合,数组等。聚合操作类似SQL语句一样的操作, 比如filter, map, reduce, find, match, sorted 等。Stream流操