代码编织梦想

背景

前段时间有个小项目需要使用延迟任务,谈到延迟任务,我脑子第一时间一闪而过的就是使用消息队列来做,比如RabbitMQ的死信队列又或者RocketMQ的延迟队列,但是奈何这是一个小项目,并没有引入MQ,我也不太想因为一个延迟任务就引入MQ,增加系统复杂度,所以这个方案直接就被pass了。

虽然基于MQ这个方式走不通了,但是这个项目中使用到Redis,所以我就想是否能够使用Redis来代替MQ实现延迟队列的功能,于是我就查了一下有没有现成可用的方案,别说,还真给我查到了两种方案,并且我还仔细研究对比了这两个方案,发现要想很好的实现延迟队列,并不简单。

监听过期key

基于监听过期key的方式来实现延迟队列是我查到的第一个方案,为了弄懂这个方案实现的细节,我还特地去扒了扒官网,还真有所收获

1、Redis发布订阅模式

一谈到发布订阅模式,其实一想到的就是MQ,只不过Redis也实现了一套,并且跟MQ贼像,如图:
在这里插入图片描述图中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。

生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。

2、keyspace notifications

在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。当消费者监听这些channel时,就可以感知到Redis中数据的变化。

这个功能Redis官方称为keyspace notifications,字面意思就是键空间通知。

这些默认的channel被分为两类:

  • 以__keyspace@:为前缀,后面跟的是key的名称,表示监听跟这个key有关的事件。举个例子,现在有个消费者监听了__keyspace@0:sanyou这个channel,sanyou就是Redis中的一个普通key,那么当sanyou这个key被删除或者发生了其它事件,那么消费者就会收到sanyou这个key删除或者其它事件的消息
  • 以__keyevent@:为前缀,后面跟的是消息事件类型,表示监听某个事件同样举个例子,现在有个消费者监听了__keyevent@0:expired这个channel,代表了监听key的过期事件。那么当某个Redis的key过期了(expired),那么消费者就能收到这个key过期的消息。如果把expired换成del,那么监听的就是删除事件。具体支持哪些事件,可从官网查。

上述db是指具体的数据库,Redis不是默认分为16个库么,序号从0-15,所以db就是0到15的数字,示例中的0就是指0对应的数据库。
在这里插入图片描述

3、延迟队列实现原理

通过对上面的两个概念了解之后,应该就对监听过期key的实现原理一目了然了,其实就是当这个key过期之后,Redis会发布一个key过期的事件到__keyevent@__:expired这个channel,只要我们的服务监听这个channel,那么就能知道过期的Key,从而就算实现了延迟队列功能。

所以这种方式实现延迟队列就只需要两步:

  • 发送延迟任务,key是延迟消息本身,过期时间就是延迟时间
  • 监听__keyevent@__:expired这个channel,处理延迟任务

4、demo

好了,基本概念和核心原理都说完了之后,又到了show me the code环节。

好巧不巧,Spring已经实现了监听__keyevent@__:expired这个channel这个功能,__keyevent@__:expired中的*代表通配符的意思,监听所有的数据库。

所以demo写起来就很简单了,只需3步即可

引入pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置类

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听
在这里插入图片描述
当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件
在这里插入图片描述
所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息。

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("获取到延迟消息:" + new String(body));
    }

}

整个工程目录也简单
在这里插入图片描述
代码写好,启动应用

之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s

set sanyou task 

expire sanyou 5

如果上面都理论都正确,不出意外的话,5s后MyRedisKeyExpiredEventListener应该可以监听到sanyou这个key过期的消息,也就相当于拿到了延迟任务,控制台会打印出获取到延迟消息:sanyou。

于是我满怀希望,静静地等待了5s。。

5、4、3、2、1,时间一到,我查看控制台,但是控制台并没有按照预期打印出上面那句话。

为什么会没打印出?难道是代码写错了?正当我准备检查代码的时候,官网的一段话道出了真实原因。
在这里插入图片描述
我给大家翻译一下上面这段话讲的内容。

上面这段话主要讨论的是key过期事件的时效性问题,首先提到了Redis过期key的两种清除策略,就是面试八股文常背的两种:

  • 惰性清除。当这个key过期之后,访问时,这个Key才会被清除
  • 定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除

再后面那段话是核心,意思是说,key的过期事件发布时机并不是当这个key的过期时间到了之后就发布,而是这个key在Redis中被清理之后,也就是真正被删除之后才会发布。

到这我终于明白了,上面的例子中即使我设置了5s的过期时间,但是当5s过去之后,只要两种清除策略都不满足,没人访问sanyou这个key,后台的定时清理的任务也没扫描到sanyou这个key,那么就不会发布key过期的事件,自然而然也就监听不到了。

至于后台的定时清理的任务什么时候能扫到,这个没有固定时间,可能一到过期时间就被扫到,也可能等一定时间才会被扫到,这就可能会造成了客户端从发布到监听到的消息时间差会大于等于过期时间,从而造成一定时间消息的延迟,这就着实有点坑了。。

5、坑

除了上面测试demo的时候遇到的坑之外,在我深入研究之后,还发现了一些更离谱的坑。

丢消息太频繁

Redis的丢消息跟MQ不一样,因为MQ都会有消息的持久化机制,可能只有当机器宕机了,才会丢点消息,但是Redis丢消息就很离谱,比如说你的服务在重启的时候就消息会丢消息。

Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。

消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种。

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。
在这里插入图片描述
如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。

当消费者监听了以__keyevent@__:开头的消息,那么会导致所有的key发生了事件都会被通知给消费者。

举个例子,某个消费者监听了__keyevent@*__:expired这个channel,那么只要key过期了,不管这个key是张三还会李四,消费者都能收到。

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。

所以,综上能够得出一个非常重要的结论,那就是监听Redis过期Key这种方式实现延迟队列,不稳定,坑贼多

那有没有比较靠谱的延迟队列的实现方案呢?这就不得不提到我研究的第二种方案了。

Redisson实现延迟队列

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。

先来个demo,后面再来说说这种实现的原理。

1、demo

引入pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

封装了一个RedissonDelayQueue类

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延迟任务:{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

启动项目,在浏览器输入如下连接,添加任务

http://localhost:8080/add?task=sanyou

静静等待5s,成功获取到任务。
在这里插入图片描述

2、实现原理

如下图就是上面demo中,一个延迟队列会在Redis内部使用到的channel和数据类型
在这里插入图片描述
SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。
  • SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

有了这些概念之后,再来看看整体的运行原理图
在这里插入图片描述

  • 生产者在提交任务的时候将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳

  • 客户端会有一个延迟任务,为了区分,后面我都说是客户端延迟任务。这个延迟任务会向Redis Server发送一段lua脚本,Redis执行lua脚本中的命令,并且是原子性的
    在这里插入图片描述
    这段lua脚本主要干了两件事:

  • 将到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列

  • 获取到redisson_delay_queue_timeout:SANYOU中目前最早到过期时间的延迟任务的到期时间戳,然后发布到redisson_delay_queue_channel:SANYOU这个channel中
    当客户端监听到redisson_delay_queue_channel:SANYOU这个channel的消息时,会再次提交一个客户端延迟任务,延迟时间就是消息(最早到过期时间的延迟任务的到期时间戳)- 当前时间戳,这个时间其实也就是redisson_delay_queue_channel:SANYOU中最早到过期时间的任务还剩余的延迟时间。

此处可以等待10s,好好想想。。

这样,一旦时间来到了上面说的最早到过期时间任务的到期时间戳,redisson_delay_queue_timeout:SANYOU中上面说的最早到过期时间的任务已经到期了,客户端的延迟任务也同时到期,于是开始执行lua脚本操作,及时将到了延迟时间的任务放到目标队列中。然后再次发布剩余的延迟任务中最早到期的任务到期时间戳到channe中,如此循环往复,一直运行下去,保证redisson_delay_queue_timeout:SANYOU中到期的数据能及时放到目标队列中。

所以,上述说了一大堆的主要的作用就是保证到了延迟时间的任务能够及时被放到目标队列。

这里再补充两个特殊情况,图中没有画出:

第一个就是如果redisson_delay_queue_timeout:SANYOU是新添加的任务(队列之前有或者没有任务)是队列中最早需要被执行的,也会发布消息到channel,之后就按时上面说的流程走了。

添加任务代码如下,也是通过lua脚本来的
在这里插入图片描述
第二种特殊情况就是项目启动的时候会执行一次客户端延迟任务。项目在重启时,由于没有客户端延迟任务的执行,可能会出现redisson_delay_queue_timeout:SANYOU队列中有到期但是没有被放到目标队列的可能,重启就执行一次就是为了保证到期的数据能被及时放到目标队列中。

3、与第一种方案比较

现在来比较一下第一种方案和Redisson的这种方案,看看有没有第一种方案的那些坑。

第一个任务延迟的问题,Redisson方案理论上是没有延迟的,但是当消息数量增加,消费者消费缓慢这个情况下可能会导致延迟任务消费的延迟。

第二个丢消息的问题,Redisson方案很大程度上减轻了丢消息的可能性,因为所有的任务都是存在list和sorted set两种数据类型中,Redis有持久化机制,就算Redis宕机了,也就可能会丢一点点数据。

第三个广播消费任务的问题,这个是不会出现的,因为每个客户端都是从同一个目标队列中获取任务的。

第四个问题是Redis内部channel发布事件的问题,跟这种方案不沾边,就更不可能存在了。

所以,通过上面的对比可以看出,Redisson这种实现方案就显得更加的靠谱了。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/z1ztai/article/details/129669032

使用redis实现延时任务_殷十娘的博客-爱代码爱编程

前提 最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。 候选方案对比 下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。     如果应用的数据量不高,实时性要求比较低,选用调度框架和MySQL进行短间隔轮询这个方案是最优的方案

如何实现延迟队列-爱代码爱编程

延迟队列的需求各位应该在日常开发的场景中经常碰到。比如: 用户登录之后5分钟给用户做分类推送; 用户多少天未登录给用户做召回推送; 定期检查用户当前退款账单是否被商家处理等等场景。 一般这种场景和定时任务还是有很大的区别,定时任务是你知道任务多久该跑一次或者什么时候只跑一次,这个时间是确定的。延迟队列是当某个事件发生的时候需要延迟多久触发配套事件,

面试官:延迟队列有哪些实现方案?说说你的看法-爱代码爱编程

推荐阅读: 阿里二面凉经:设计模式+缓存+Spring+虚拟机+MySQL+中间件+并发等难题,全部迎刃而解阿里巴巴字节跳动那些大厂必问的HTTP该怎么学?我建议你看看这篇文章!蚂蚁、字节、PDD社招面经Java岗(分布式+线程安全+MySQL+CAS)延迟队列的需求各位应该在日常开发的场景中经常碰到。比如: 用户登录之后5分钟给用户做分类推送; 用

延迟队列有哪些实现方案?说说你的看法-爱代码爱编程

Java架构学习交流 2020-02-04 15:59:18 推荐阅读:我凭借这份pdf拿下了蚂蚁金服、字节跳动、小米等大厂的offer 延迟队列的需求各位应该在日常开发的场景中经常碰到。比如: 用户登录之后5分钟给用户做分类推送; 用户多少天未登录给用户做召回推送; 定期检查用户当前退款账单是否被商家处理等等场景。 一般这种场景和定

Redis两种常见持久化以及两种不常见持久化-爱代码爱编程

Redis提供了RDB持久化和AOF持久化   RDB机制的优势和略施   RDB持久化是指在指定的时间间隔内将内存中的数据集快照写入磁盘。   也是默认的持久化方式,这种方式是就是将内存中数据以快照的方式写入到二进制文件中,默认的文件名为dump.rdb。 可以通过配置设置自动做快照持久化的方式。我们可以配置redis在n秒内如果超过m个key

redis 简单使用-爱代码爱编程

文章目录 第 01 章 开篇1. 用法2. 简介3. 安装第 02 章 基本数据类型1. 后台启动以及基本操作2. 基本数据类型---`String`3. 基本数据类型---`String(BIT)`4. 基本数据类型---`List`5. 基本数据类型---`Set`6. 基本数据类型---`Hash`7. 基本数据类型---`ZSet`8. 和

Redis、Kafka 和 Pulsar 消息队列对比-爱代码爱编程

导语 | 市面上有非常多的消息中间件,rabbitMQ、kafka、rocketMQ、pulsar、 redis等等,多得令人眼花缭乱。它们到底有什么异同,你应该选哪个?本文尝试通过技术演进的方式,以redis、kafka和 pulsar为例,逐步深入,讲讲它们架构和原理,帮助你更好地理解和学习消息队列。文章作者:刘德恩,腾讯IEG研发工程师。 一

一天吃透redis-爱代码爱编程

一天时间把redis吃透! https://mp.weixin.qq.com/s/aOiadiWG2nNaZowmoDQPMQ -----建议参考博客 1.小伙子为什么选择用redis 因为传统的关系型数据库已经不适用所有的场景,比如双十一的秒杀,或者是APP的流量访问高峰;这些都会把数据打穿,所有我们引入了消息中间件—redis。2.讲讲redis

redis集群方案应该怎么做?都有哪些方案?_卡卡的java架构笔记的博客-爱代码爱编程

使用codis方案:目前用的多的集群方案,基本和twemproxy一致的效果,但它支持在节点数量改变情况下,旧节点数据可恢复到新hash节点。 Redis cluster3.0自带的集群:特点在于他的分布式算法不是一致性hash,而是hash槽的概念,以及自身支持节点设置从节点。 在业务代码层实现:起几个毫无关联的Redis实例,在代码层对key进行h

redis-cluster集群、redis持久化、redis作mysql的缓存服务器、配置gearman实现redis和mysql数据同步_lpf_linux的博客-爱代码爱编程

一 Cluster集群 Cluster 集群简介  Redis 的哨兵模式基本已经可以实现高可用,读写分离 ,但是在这种模式下每台 Redis 服务器都存储相同的数据,很浪费内存,所以在redis3.0上加入了 Cluster 集群模式,实现了 Redis 的分布式存储,也就是说每台 Redis 节点上存储不同的内容。 其结构特点: 1、所

redis(三) 持久化 rdb+aof-爱代码爱编程

哈喽,大家好,我是有勇气的牛排(全网同名)🐮🐮🐮 有问题的小伙伴欢迎在文末评论,点赞、收藏是对我最大的支持!!!。 文章目录 1 RDB持久化1.1 官方介绍:1.2 配置文件1.2.1 Redis 6.0.16

浅谈redis及应用场景-爱代码爱编程

Redis是一种开源的数据存储系统,它被广泛用于高性能、高可扩展性、低延迟的数据存储和缓存。Redis支持多种数据结构,包括字符串、哈希表、列表、集合、有序集合等,提供了丰富的操作命令,使得开发人员可以方便地进行数据存储和检

一文教你彻底打败redis bigkey和hotkey问题-爱代码爱编程

前言 bigkey和hotkey是Redis生产中两个比较常见的问题,本文从它们的概念、危害、发现、解决的角度,来分析一下这两个问题。 bigkey 概念 通俗易懂的讲,Big Key就是某个key对应的value很

5.docker入门到精通—安装redis集群(理论)-爱代码爱编程

**面试题:**1-2 亿条数据需要缓存,请问如何设计这个存储案例: **回答:**单机单台 100%不可能,肯定是分布式存储,用 redis 如何落地?(一般业界有 3种 解决方案) 方案一、哈希取余分区 2亿条记录就是 2 亿个 k,v,我们单机不行必须要分 布式多机,假设有 3 台机器构成一

6.docker入门到精通—配置3主3从redis集群-爱代码爱编程

【前提】:docker安装了redis:6.0.8镜像 3主3从redis 集群配置 关闭防火墙+启动 docker 后台服务 systemctl start docker 新建 6 个 docker 容器 redis 实例 docker run -d --name redis-node-1

redis(七):持久化:aof和rdb-爱代码爱编程

前言 上一篇介绍了 Redis 实现消息队列的三种方式。这节开始介绍 Redis 的持久化问题。 我们都知道 Redis 是基于内存的数据库,而内存又是易失性的,一旦遇到断电或异常重启等问题时,内存中的数据就会丢失。所以