代码编织梦想

文章目录

第 01 章 开篇

 

1. 用法

  • 分布式锁
  • 限流
  • 布隆过滤器
  • 延时队列
  • Geo (地理位置的存储)


2. 简介

Redis 是我们在互联网应用中使用最广泛的一个NoSQL数据库,基于 C 开发的键值对存储数据库,
Redis 这个名字是 Remote Dictionary Service 字母缩写。
很多人想到 Redis,就想到缓存。但实际上Redis 除了缓存之外,还有许多更加丰富的使用场景。比如
分布式锁,限流。

  • 支持数据持久化
  • 支持多种不同的数据结构类型之间的映射
  • 支持主从模式的数据备份
  • 自带了发布订阅系统
  • 定时器、计数器


3. 安装


方式一:直接编译安装

因为 redis 是基于 c 语言 开发的,要提前准备好 gcc 环境

 yum install gcc-c++

接下来下载并安装 redis
在这里插入图片描述

wget https://download.redis.io/releases/redis-6.2.2.tar.gz # 下载命令

下载完成之后进行解压,进入目录进行编译

make  # 编译
make install  # 安装

运行,看到如下页面代表 redis 启动成功了!

redis-service redis.config

在这里插入图片描述

安装目录:/usr/local/bin
查看默认的安装目录:

  • redis-benchmark 性能测试工具
  • redis-check-aof 修复有问题的aof 文件
  • redis-check-rdb 修复有问题的 dump.rdb 文件
  • redis-sentinel 集群启动
  • redis-server 启动服务命令
  • redis-cli 客户端,操作入口

方式二:通过 docker 安装

需要提前准备好 docker ,启动 docker 运行安装即可

docker run --name javaboy-redis -d -p 6379:6379 redis --requirepass 123

docker 上启动 redis ,可以从宿主机上获取连接,前提是宿主机上要存在 redis-cli 工具,(此链接相当远程连接!)

redis-cli -a 123

如果宿主机上没有安装 Redis,那么也可以进入到 Docker 容器种去操作 Redis:

docker exec -it javaboy-redis redis-cli -a 123

方式三:直接安装

  1. CentOS 安装:
yum install redis
  1. Ubuntu 安装:
apt-get install redis
  1. Mac 安装:
brew install redis

方式四:在线体验
reids 官网:https://redis.io/
在这里插入图片描述

选择在线体验

在这里插入图片描述

在这里插入图片描述

第 02 章 基本数据类型

 

1. 后台启动以及基本操作

首先修改 redis-config 配置文件

是否以守护进程的方式启动 redis
在这里插入图片描述
配置完成后,保存退出,再次通过 redis-server redis.conf 命令启动 redis,此时,就是在后台启
动了。
在这里插入图片描述
注意: 存储中文的时候,使用 get 命令获取 value 是,会出现中文乱码的问题,这个时候我们只要每次启动 redis-cli 命令时后面添加 -raw 即可。命令如下: redis-cli -raw
在这里插入图片描述




                                                    数据库的基本操作:

  • select
    切换数据库,例:select 1

  • flushall
    清除所有数据库里的信息。

  • flushdb
    清除当前数据库里的信息。

  • ·shutdown
    停止运行 redis

  • type
    根据 key 查询当前数据的类型。

  • exists
    查询该键值对是否存在。

  • keys n*
    查询以 “n” 开头的 key

  • keys n?
    查询以 "n"开头的只有一位的 key


2. 基本数据类型—String

Stringredis 里边最最简单的一种数据结构。在 redis 中,所以的 key 都是字符串,但是,不同的
key 对应的 value 则具备不同的数据结构,我们所说的五种不同的数据类型,主要是指 value 的数据类
型不同。

redis 中的字符串是动态字符串,内部是可以修改的,像 Java 中的 StringBuffer,它采用分配冗余空间
的方式来减少内存的频繁分配。在 redis 内部结构中,一般实际分配的内存会大于需要的内存,当字符
串小于 1M 的时候,扩容都是在现有的空间基础上加倍,扩容每次扩 1M 空间,最大 512M




                                                    string 类型的常用命令

  • set
    给一个 key 赋值。

  • get
    根据已知的 key 获取 value 的值。

  • append
    使用该命令的时候,如果 key 已经存在则在其后面直接追加,没有则创建。

  • decr
    可以实现对 value 实现 -1 的操作(前提是 value 是一个数字,否则会出现错误),如果 value 不存在,则会给一个默认的值 0 ,在默认的值的基础上进行减一的操作。

  • decrby
    使用方式和 decr 的方式相同,但是他可以设置步长,第二个参数就是步长。

  • getrange
    可以返回 key 对应 value 的子串,这里有点类似 java 中的 subString 方法,这个命令第二个参数和第三个参数就是截取字符串的起始位置和终止位置。其中 -1 表示最后一个字符串,-2 表示倒数第二个。

  • getset
    获取某个 key 并且更新

  • incr
    给某个 keyvalue 自增

  • incrby
    给某个 keyvalue 自增,但是可以设置步长。

  • incrbyfloat
    incrby 的命令用法相似,但是步长可以设置浮点数。

  • mget 和 mset
    批量获取和存储

  • setex
    设置 key 的值同时设置过期时间。

  • ttl
    查看 key 的有效期

  • psetex
    setex 命令相似,只不过这里设置的是毫秒值。

  • setnx
    默认情况下 set 命令会覆盖已经存在了的值,但是 setnx 则不会。

  • setrange
    覆盖一个已经存在 keyvalue

  • msetnx
    批量设置如果值不存在则赋值,这里只要一个 key 有值的话,批量赋值都会失败。

  • strlen
    查看一个字符串的长度。


3. 基本数据类型—String(BIT)

redis 字符串都是以二进制的方式进行存储的。例如 set k1 aa 对应的ASCII 码是9797 转为
二进制是 01100001BIT 相关的命令就是对二进制进行操作的

  • getbit
    可以返回 key 对应的 valueoffsetvalue 的值。

  • setbit

  • 修改 key 对应 valueoffset 处的值。

  • bitcount
    统计二进制中 1 的个数。这里边的偏移量指的不是二进制位置而是字符串的位置。


4. 基本数据类型—List

                                                    list 类型的常用命令

  • lpush
    表示将 value 的值,从左到右依次插入表头位置。

  • lrange
    返回列表 key 中指定区间内的元素。

  • rpush
    这个功能和 lpush 功能基本相似,只是插入数据的顺序是从右到左。

  • rpop
    出栈,移除并返回列表的尾元素。

  • lpop
    移除并返回头元素

  • lindex
    返回列表中,下标为 index 的元素

  • ltrim
    对列表进行修剪,让列表只保存在指定区间的值,不在指定区间的则舍去。

  • blpop
    一个阻塞式的弹出,相当于 lpop 指令

  • rpoplpush
    k1 的右边吐出一个值,插入到 k2 的左边列表。

  • llen
    获得列表的长度

  • linsert
    将一个新的 value 值,插入到原有的 value 之前或之后。

  • lrem
    从左边删除 nvalue (从左到右)

  • lset
    将列表中 key 下标为 index 的值替换为 value


5. 基本数据类型—Set

set集合和 list 集合的区别就是,set 里面的元素不可重复。


                                                    set 类型的常用命令

  • sadd
    添加一个元素到 key 中。

  • smembers
    获取一个 key 下面的所有元素。

  • srem
    key 集合中移除指定的元素。

  • sismember
    返回一个成员是否在集合中。(1 是有,0 是否)

  • scard
    返回集合的数量,里面有多少个元素。

  • srandmember
    随机返回集合中的元素。

  • spop
    随机返回并且出栈。

  • smove
    把一个元素从一个集合转移到另一个集合中去。

  • sdiff
    返回两个集合的差集 ( 例如:以 k2 为准,将k1k2 中的集合移除。)

  • sinter
    返回两个集合的交集(也就是公共的部分)。

  • sdiffstore
    这个指令的用法类似于 sdiff,唯一不同的是,计算出来的结果会保存到一个新的集合中。

  • sinterstore

  • 和上面的指令类似,将交集保存在一个新的集合中。

  • sunion
    求两个集合之间的并集。

  • sunionstore
    求连个集合之间的并集,并生成新的集合保存下来。


6. 基本数据类型—Hash

hash 类型的存储是键值对的存储,以 key-value 的形式存储的。

                                                    hash类型的常用命令

  • hset
    添加值,在 hash 结构中, key是一个字符串,value 是一个键值对

  • hget
    获取值。

  • hmset
    批量设置。

  • hmget
    批量获取值。

  • hdel
    删除一个指定的字段。

  • hsetnx
    如果默认情况下,keyfield 两个相同,会覆盖掉已有的 valuehsetnx 则不会覆盖

  • hvals
    获取所有的 value

  • hkeys
    获取所有的 key

  • hgetall
    同时获取所有的 keyvalue

  • hexists
    返回 hash 中的 field 是否存在。(1 表示存在,0 表示不存在!)

  • hincrby
    给指定的 value 自增。

  • hincrbyfloat
    给指定的 value 自增浮点数。

  • hlen
    返回一个 key 中,某个 field 中字符串的长度。


7. 基本数据类型—ZSet

有序的集合,类似于 Set,它里面有一个分数score可以进行检索。


                                                    zset类型的常用命令

  • zadd
    将指定的元素添加到有序的集合中。

  • zscore
    返回 member 中的 score 值。

  • zrange
    返回集合里边的一组元素。后面加上 withscores 指令,则连同 score 一起返回。

  • zrevrange
    返回一组元素,但是是倒序。

  • zcard
    可以返回元素的个数。

  • zcount
    返回 score 在某一个区间内的元素。(如果想是闭区间的话前面就加一个( 号)

  • zrangebyscore
    按照 score 的范围,返回元素。

  • zrank
    返回元素的排名。(从小到大的排名)

  • zrevrank
    返回元素的排名。(从大到小的排名)

  • zincrby
    score 自增。

  • zinterstore
    给两个集合计算交集,并存储到新的集合中。

  • zrem
    弹出一个元素。

  • zlexcount
    计算有序集合中的成员数量。(- 代表最小值,+ 代表最大值,查询 member 需要 [ 号)。

ZRANGEBYLEX k1 - +  # 查询最大值最小值的区间
ZRANGEBYLEX k1 [v1 [v3 # 查询 member
  • zrangebylex
    上面的指令升级,返回指定区间的成员。

8. 和 Key 相关的操作

  • del
    删除一个 key-value

  • dump
    序列化给定的 key

  • exists
    判断 key 是否存在。

  • expire
    给一个 key 设置有效时间。如果 key 在过期之前重新被,set 了,则过期时间会失效。

  • persist
    移除一个 key 的过期时间。

  • pttl
    ttl 一样,只不过这返回的是毫秒。


9. 补充

  1. 四种数据类型(list/set/hash/zset),如果在第一次使用的时候不存在,就会自动创建一个。
  2. 四种数据类型(list/set/hash/zset),如果里面没有元素了,那么会立即删除容器,释放内存。

10. Redis 中配置文件的含义

配置大小单位,开头定义了一些基本的度量单位,只支持 bytes,不支持 bit
大小写不敏感

在这里插入图片描述

默认情况 bind=127.0.0.1 只能接受本机的访问请求,不写的情况下,无限制接受任何 ip 地址的访问
生产环境肯定要写你应用服务器的地址;服务器是需要远程访问的,所以需要将其注释掉
如果开启了protected-mode,那么在没有设定 bind ip且没有设密码的情况下,Redis 只允许接受本机的响应。

在这里插入图片描述

protected-mode 将本机访问保护模式设置no
在这里插入图片描述

tcp-backlog :

设置 tcpbacklogbacklog 其实是一个连接队列,backlog 队列总和=未完成三次握手队列 + 已经完成三次握手队列。

在这里插入图片描述
timeout:

一个空闲的客户端维持多少秒会关闭,0 表示关闭该功能。即永不关闭。

在这里插入图片描述
tcp-keepalive:

对访问客户端的一种心跳检测,每个n 秒检测一次。
单位为秒,如果设置为 0,则不会进行 Keepalive 检测,建议设置成 60
在这里插入图片描述

daemonize:

是否为后台进程,设置为 yes 守护进程,后台启动

在这里插入图片描述

pidfile:

存放 pid 文件的位置,每个实例会产生一个不同的 pid 文件(此文件是保存 redis 进程号的文件)
在这里插入图片描述

loglevel:

指定日志记录级别,Redis 总共支持四个级别:debugverbosenoticewarning,默认为 notice
四个级别根据使用阶段来选择,生产环境选择 notice 或者 warning

在这里插入图片描述

logfile:

日志文件名称
在这里插入图片描述

databases:

设定库的数量 默认16 ,默认数据库为 0,可以使用 SELECT <dbid>命令在连接上指定数据库 id

在这里插入图片描述

设置密码:

在这里插入图片描述

访问密码的查看、设置和取消
在命令中设置密码,只是临时的。重启redis服务器,密码就还原了。
永久设置,需要再配置文件中进行设置。

在这里插入图片描述

maxclients:

设置 redis 同时可以与多少个客户端进行连接,默认情况下为 10000 个客户端。

如果达到了此限制,redis 则会拒绝新的连接请求,并且向这些连接请求方发出 “max number of clients reached” 以作回应。

在这里插入图片描述

maxmemory:

  • 建议必须设置,否则,将内存占满,造成服务器宕机。

  • 设置 redis 可以使用的内存量。一旦到达内存使用上限,redis 将会试图移除内部数据,移除规则可以通过maxmemory-policy 来指定。

  • 如果 redis 无法根据移除规则来移除内存中的数据,或者设置了“不允许移除”,那么 redis 则会针对那些需要申请内存的指令返回错误信息,比如 SETLPUSH

  • 但是对于无内存申请的指令,仍然会正常响应,比如 GET 等。如果你的 redis 是主 redis(说明你的 redis 有从 redis ),那么在设置内存使用上限时,需要在系统中留出一些内存空间给同步队列缓存,只有在你设置的是“不移除”的情况下,才不用考虑这个因素。

在这里插入图片描述

maxmemory-policy:

  • volatile-lru: 使用 LRU 算法移除 key,只对设置了过期时间的键;(最近最少使用)
  • allkeys-lru: 在所有集合 key 中,使用 LRU 算法移除 key
  • volatile-random:在过期集合中移除随机的 key,只对设置了过期时间的键
  • allkeys-random:在所有集合 key 中,移除随机的 key
  • volatile-ttl: 移除那些 TTL 值最小的 key,即那些最近要过期的 key
  • noeviction: 不进行移除。针对写操作,只是返回错误信息。

在这里插入图片描述
maxmemory-samples:

  • 设置样本数量,LRU 算法和最小 TTL 算法都并非是精确的算法,而是估算值,所以你可以设置样本的大小,redis默认会检查这么多个 key 并选择其中 LRU 的那个。
  • 一般设置 37 的数字,数值越小样本越不准确,但性能消耗越小。

在这里插入图片描述

在这里插入图片描述

第 03 章 Java 客户端

 

1. 开启远程连接

redis 默认是不支持远程连接的,需要手动开启。修改 redis-config

第一个地方是注释掉绑定。

在这里插入图片描述

第二个是开启保护模式
在这里插入图片描述

第三个是开启密码校验

在这里插入图片描述
重新启动 redis


2. Jedis 连接 Redis

创建一个空的 maven 项目,项目创建成功之后,添加 jdeis 依赖。

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.2.0</version>
    <type>jar</type>
    <scope>compile</scope>
</dependency>

创建一个测试类,测试链接 redis

package orj.javaboy.jedis;

import redis.clients.jedis.Jedis;

/**
 * @author: yueLQ
 * @date: 2021-04-25 17:20
 */
public class MyJedis {

    public static void main(String[] args) {

        // 1. 创建一个 jedis,使用默认端口的话可以写也可以不写
        Jedis jedis = new Jedis("101.200.140.74");

        // 2. 密码认证
        jedis.auth("123");

        // 3. 测试链接是否成功
        String ping = jedis.ping();
        // 4. 返回 pong 表示连接成功。
        System.out.println("ping = " + ping);

    }

}

出现如下结果代表成功

在这里插入图片描述

对于 jedis 而言,一旦连接上 redis 的服务端,剩下的操作都极其容易了。在 jedis 中方法的 apiredis 的指令一致,所以见名知意。


3. Jedis 连接池优化

在实际应用中,我们一般都是通过连接池获取。由于 jedis 对象不是线程安全的,所以当我们使用 jedsi 对象的时候,从连接池中获取 jedis ,使用完成之后再还给连接池。

创建 jedis 连接池测试。

public class JedisPoolTest {

    public static void main(String[] args) {
        // 1. 构造一个 jedis 连接池。
        JedisPool jedisPool = new JedisPool("101.200.140.74", 6379);
        // 2. 从连接池中获取一个 jedis 连接
        Jedis resource = jedisPool.getResource();
        // 3. jedis 操作
        String ping = resource.ping();
        System.out.println("ping = " + ping);
        // 4. 归还连接
        resource.close();
    }

}

如上代码看似没有问题,但是执行到第三步的时候 (第三步相当于我们的业务代码),出现异常的时候,会导致第四步关闭连接池无法执行。所以我们要对代码进行改造,第四步进行执行。

package orj.javaboy.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * @author: yueLQ
 * @date: 2021-04-25 17:47
 */
public class JedisPoolTest {

    public static void main(String[] args) {
        Jedis resource=null;
        try {
        // 1. 构造一个 jedis 连接池。
        JedisPool jedisPool = new JedisPool("101.200.140.74", 6379);
            // 2. 从连接池中获取一个 jedis 连接
             resource = jedisPool.getResource();
            // 3. jedis 操作
            String ping = resource.ping();
            System.out.println("ping = " + ping);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (resource != null) {
                // 4. 归还连接
                resource.close();
            }
        }
    }
}

通过 finally 确保 jedis一定会关闭。

利用 jdek1.7 中的 try-with-resource 特性,进行改造。

package orj.javaboy.jedis;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * @author: yueLQ
 * @date: 2021-04-25 17:47
 */
public class JedisPoolTest {

    public static void main(String[] args) {
        // 1. 构造一个 jedis 连接池。
        JedisPool jedisPool = new JedisPool("101.200.140.74", 6379);
        // 2. 从连接池中获取一个 jedis 连接
        try (Jedis resource = jedisPool.getResource()) {
            resource.auth("123");
            // 3. jedis 操作
            String ping = resource.ping();
            System.out.println("ping = " + ping);
        }
    }
}

这段代码的作用和上面的结果是一致的。但是这段代码无法实现强约束,我们可以进行进一步的改进。
代码如下:
创建一个接口 CallWithJedis

package orj.javaboy.jedis;

import redis.clients.jedis.Jedis;

/**
 * @author: yueLQ
 * @date: 2021-04-25 19:37
 */
public interface CallWithJedis {

    void callJedis(Jedis jedis);

}

创建 Redis

package orj.javaboy.jedis;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * @author: yueLQ
 * @date: 2021-04-25 19:39
 */
public class Redis {

    private JedisPool pool;

    public Redis() {
        // 给 redis 进行配置
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();

        // 连接池最大空闲数
        config.setMaxIdle(300);
        // 最大连接数
        config.setMaxTotal(300);
        // 连接的最大等待时间,单位是毫秒,如果是 -1 等待时间没有限制
        config.setMaxWaitMillis(3000);
        // 空闲时检查有效性。
        config.setTestOnBorrow(true);
        /**
         *   1. redis 地址
         *   2. 端口
         *   3. 连接超时时间,毫秒
         *   4. 密码
         */
        //config.setMaxIdle();
        pool = new JedisPool(config, "101.200.140.74",6379,10000,"123");
    }

    public void execute(CallWithJedis callWithJedis) {
        try (final Jedis resource = pool.getResource()) {
          //  resource.auth("123");
            callWithJedis.callJedis(resource);
        }
    }
}

测试:

public class JedisPoolTest {

    public static void main(String[] args) {
//        // 1. 构造一个 jedis 连接池。
//        JedisPool jedisPool = new JedisPool("101.200.140.74", 6379);
//        // 2. 从连接池中获取一个 jedis 连接
//        try (Jedis resource = jedisPool.getResource()) {
//            resource.auth("123");
//            // 3. jedis 操作
//            String ping = resource.ping();
//            System.out.println("ping = " + ping);
//        }
        final Redis redis = new Redis();
        redis.execute(jedis -> {
            System.out.println("jedis.ping() = " + jedis.ping());
        });
    }
}

测试结果:
在这里插入图片描述


4. Lettuce 简单介绍

jedis 相比较,lettuce 使用较少,但是封装更好。

lettucejedis 的比较:

  • jedis 在实现的过程中是直接连接 redis 的,在过个线程之间共享一个 jedis 实例,这个是线程不安全的,如果想在多线程场景下使用 jedis,就得使用连接池。这样每一个线程都自己的 jedis 实例。但是会消耗比较多的物力资源,比较浪费。

  • lettuce 目前是基于 netty NIO 框架来构建,所以克服了 jedis 线程不安全的问题。支持同步,异步,以及响应式调用,多个线程城可以共享一个连接实例。不用担心多线程带来的并发问题。

使用 lettuce,首先创建一个 maven 项目,添加 lettuce 依赖。

<dependency>
    <groupId>io.lettuce</groupId>
    <artifactId>lettuce-core</artifactId>
    <version>5.2.2.RELEASE</version>
</dependency>

创建 LettuceTest 测试类:

package org.javaboy;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;

/**
 * @author: yueLQ
 * @date: 2021-04-25 20:21
 */
public class LettuceTest {

    public static void main(String[] args) {
        // 1. 创建连接
        // 协议+密码+ip+端口号
        RedisClient redisClient = RedisClient.create("redis://123@101.200.140.74:6379");
        // 2. 获取连接
        StatefulRedisConnection<String, String> connect = redisClient.connect();
        // 3. 获取一个同步调用的对象
        RedisCommands<String, String> sync = connect.sync();
        sync.set("name", "javaboy");
        String name = sync.get("name");
        System.out.println("name = " + name);
    }
}

测试结果:
在这里插入图片描述

注意:这里密码的传递方式,是直接写在连接地址上的!

在这里插入图片描述

第 04 章 Redis 基本应用

 

1. Redis 发布和订阅

redis 发布和订阅是一种消息的通信模式:发送者 pub 发送消息,订阅者 sub 接受消息,redis 客户端可以订阅任意数量的频道。

在这里插入图片描述

实现:
打开一个客户端订阅 channel
在这里插入图片描述
打开另一个客户端,给 channel 发布消息 hello
在这里插入图片描述

打开第一个客户端这个时候我们发现收到了信息
在这里插入图片描述


2. Redis 做分布式锁(一)

分布式锁是 redis 比较常见的使用场景。

问题场景:

例如一个简单的用户操作,一个线程去修改用户的状态,首先从数据库中读出用户的状态,然后再内存中进行修改该,修改完成之后,在存储回去。在单线程中,上述过程没有问题。在多线程中,有读取,修改,存储,不是一个原子性的操作,所以在多线程中,这样会出现数据紊乱的问题。

基于上述问题,我们可以使用分布式锁进行约束限制程序的执行。

分布式锁实现的思路很简单,就是进来一个线程先占位,然后别的线程进行操作的时候,发现位置已经有人占位了,就会放弃或者稍后再试。

redis 中,占位一般使用 setnx 指令,先进来的线程先占位,线程操作执行完成之后,在调用 del 指令释放位置。

代码实现:(将我 jedis 上面连个约束代码类复制过来)

package org.javaboy.distributed_lock;

/**
 * @author: yueLQ
 * @date: 2021-04-25 20:50
 */
public class TestLock {

    public static void main(String[] args) {
        final Redis redis = new Redis();

        redis.execute(jedis -> {

            final Long setnx = jedis.setnx("k2", "v1");
            // 如果 setnx 等于 1 表示没有人占位,执行下面的代码
            if (setnx == 1) {
                jedis.set("name", "javaboy");
                final String name = jedis.get("name");
                System.out.println("name = " + name);
                // 删除 k2 释放资源。
                jedis.del("k2");
            } else {
              // 有人占位,停止/暂缓 操作
            }
        });
    }
}

如上代码还是存在问题,如果业务代码出现异常,或者服务器停止( del指令之前的代码出现问题),这样会导致 del 指令没有被执行或者是调用。后面的请求全部阻塞在这里,锁永远得不到释放。

要解决这个问题,我们可以给锁添加一个过期时间,确保锁在一定时间之后,能够得到释放。

package org.javaboy.distributed_lock;

/**
 * @author: yueLQ
 * @date: 2021-04-25 20:50
 */
public class TestLock {

    public static void main(String[] args) {
        final Redis redis = new Redis();

        redis.execute(jedis -> {

            final Long setnx = jedis.setnx("k2", "v1");
            // 如果 setnx 等于 1 表示没有人占位,执行下面的代码
            if (setnx == 1) {
                // 这里给 k2 设置过去时间,防止程序在运行的时候抛出异常的时候导致锁无法释放从而导致程序阻塞。
                 jedis.expire("k2",10);
                jedis.set("name", "javaboy");
                final String name = jedis.get("name");
                System.out.println("name = " + name);
                // 删除 k2 释放资源。
                jedis.del("k2");
            } else {
              // 有人占位,停止/暂缓 操作
            }
        });
    }
}

如上改造之后会出现一个问题,就是获取锁和过期时间之间,如果服务器突然挂掉。这个时候锁被占用,无法被释放,也会造成死锁,因为获取锁和过期时间是两个操作,不具有唯一的原子性。

为了解决如上问题,从 redis 2.8 开始。setnxexpire 两个指令可以通过一个命令来执行了。我们可以对上述代码在进行改进。

public class TestLock {

    public static void main(String[] args) {
        final Redis redis = new Redis();

        redis.execute(jedis -> {
            final String set = jedis.set("k5", "javaboy", new SetParams().nx().ex(10));
            // 如果 setnx 等于 1 表示没有人占位,执行下面的代码
            if (set!=null&&"OK".equals(set)) {
                // 这里给 k2 设置过去时间,防止程序在运行的时候抛出异常的时候导致锁无法释放从而导致程序阻塞。
                 jedis.expire("k2",10);
                jedis.set("name", "javaboy");
                final String name = jedis.get("name");
                System.out.println("name = " + name);
                // 删除 k2 释放资源。
                jedis.del("k2");
            } else {
              // 有人占位,停止/暂缓 操作
            }
        });
    }
}


3. Redis 做分布式锁(二)

为了防止业务代码在执行的时候抛出异常,我们给每一个锁添加了一个超时时间,超时之后锁会被自动释放。但是这也会带来一个新的问题,如果要执行的业务非常耗时可能会出现紊乱,例如:

第一个线程首先获取到锁,开始执行业务代码,但是业务代码比较耗时,执行了 8s 。这样会在第一个线程的任务还未执行成功,锁就会被释放了。此时第二个线程进来,第二个线程获取到锁,开始执行,在第二个线程刚执行 3s,同时第一个线程也执行完了。此时第一个线程会释放锁(但注意他释放的是第二个线程的锁),释放之后导致三个线程进来。

对于如上问题,我们可以从两个角度入手。

  1. 尽量避免在获取锁之后,执行耗时操作。
  2. 我们可以那,在这个锁上面做文章,将锁的 value 设置为一个随机的字符串,那么每次释放锁的时候都去比较随机的字符串,是否一致,如果一致再去释放,否则不去释放。

对于第二种角度,由于释放锁的时候,要去查看锁的 value,第二步比较 value 的值是否正确,第三步是释放资源。有三个步骤,但是三个步骤不具备原子性。那为了解决这个问题,我们需要引入 Lua 脚本。


Lua 脚本的优势:

  1. 使用方便, redis 中内置了对 Lua 脚本的支持。
  2. Lua 脚本可以在 redis 服务端执行多个redis 命令。
  3. 由于网络在很大程度上会影响到 redis 的性能,而是用 Lua 脚本可以让多个指令一次执行,有效解决网络给 redis 带来的性能问题。

redis 中,使用 Lua 脚本,大致上有两个思路:
4. 提前在 redis 服务端写好脚本,然后在 java 客户端去调用脚本(推荐)。
5. 可以直接在 java 端直接去写这个 Lua 脚本,这好之后,需要执行的话,每次将脚本发送到 redis 上进行执行。


具体实行步骤,(我们这里选择的是思路一):

首先在 redis 服务端创建 Lua 脚本,内容如下


# redis.call 方法来调用redis 指令,第一个参数是 redis 的指令,
# 第二个参数是 key,key 是一个传入的参数,key 可以是传入的很多个
# ARGV 是我们在调用 lua 脚本的时候我们可能传入两类参数,一个是 key 另一个就是其他参数。
# 如果获取的 key 和 ARGV 相等的时候就释放资源。
# 不相等的话就返回一个 0
if redis.call("get",KEYS[1])==ARGV[1] then
   return redis.call("del",KEYS[1])

else
   return 0
end

接下来,我们可以给 Lua 脚本求一个 SHA1 ,命令如下:

cat releasewherevalueequal.lua  | redis-cli -a 123 script load --pipe

在这里插入图片描述

上面这个命令相当于给他加载一个缓存。script load 这个命令会在 redis 服务器中缓存我们的 Lua 脚本,并返回脚本的 SHA1 的校验和。在这个 java 端调用的时候,传入 SHA1 校验和作为参数,这样 redis 服务端就知道执行那个脚本了。

接下来我们在 java 端调用脚本。代码如下:

/**
 * @author: yueLQ
 * @date: 2021-04-26 10:49
 */
public class LuaTest {

    public static void main(String[] args) {
        final Redis redis = new Redis();
        for (int i = 0; i < 10; i++) {
            redis.execute(jedis -> {
                // 1. 生成随机字符串
                final String value = UUID.randomUUID().toString();
                // 2. 获取锁
                final String set = jedis.set("k1", value, new SetParams().nx().ex(5));
                // 3. 判断 set 是否为 null 或者是否为 ok
                if (set != null && "OK".equals(set)) {
                    // 4. 具体的业务操作
                    jedis.set("site", "www.gtxxq007.top");
                    final String site = jedis.get("site");
                    System.out.println("site = " + site);
                    // 5. 调用 Lua 脚本,释放锁
                    // 参数一是 SHA1 标识,参数二是 key 的集合,参数三是其他参数的集合
                    jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", Arrays.asList("k1"), Arrays.asList(value));
                } else {
                    System.out.println("没有拿到锁!");
                }
            });
        }
    }
}

4. Redis 做消息队列

我们平时说到消息队列,一般指的是 activeMQrocketMQrabbitMQ 以及大数据中的 kafka。 这些是我们计较常见的消息中间件,也是非常专业的中间件。作为专业的中间件,他这里边提供了很多的功能。但是,当我们需要使用消息中间件的时候,并非每次都需要非常专业消息中间件,假如我们只有一个消息队列,只有一个消费者,那就没有必要使用上面这些专业的消息中间件,这种情况下我们可以直接使用 redis 做消息队列。redis 的消息队列不是特别的专业,他没有很多高级特性,适合于简单的应用场景。如果对于消息的可靠性有着极高的追求,那么不适合使用 redis 做消息队列。


4.1 消息队列

redis 做消息队列,使用它里面的 list 数据结构就可以实现,我们可以使用 rpush/lpush 两个指令来实现入队,然后使用 lpop/rpop 实现出队。

在客户端(例如 java 端),我们会维护一个死循环来不停的从队列中读取消息并处理。如果队列中有消息,则直接获取到。如果队列中没有消息,就会陷入死循环,直到下一次有消息进入。这种死循环会造成大量的资源浪费,这个时候我们可以是之前 blpop/brpop(阻塞式弹出)



4.2 延时消息队列

延迟队列我们可以通过 zset 来实现,因为 zset 中有 score,我们可以把时间作为 score,将 value 存储到 redis 中,然后通过轮询的方式,不断的读取消息出来。

首先,如果消息是一个字符串,直接发送即可,如果是一个对象,则需要对对象进行序列化。这里我们使用 json 实现序列化和反序列化。要在项目中添加 json 依赖。

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.3</version>
</dependency>

接下来构造一个消息对象:

package org.javaboy.distributed_lock;

/**
 * @author: yueLQ
 * @date: 2021-04-26 14:52
 */
public class JavaboyMessage {
    private String id;
    private Object data;


    @Override
    public String toString() {
        return "JavaboyMessage{" +
                "id='" + id + '\'' +
                ", data=" + data +
                '}';
    }

    public String getId() {
        return id;
    }
    
    public void setId(String id) {
        this.id = id;
    }

    public Object getData() {
        return data;
    }

    public void setData(Object data) {
        this.data = data;
    }
}

构造消息发送和消息消费的队列:

package org.javaboy.distributed_lock;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.Set;
import java.util.UUID;

/**
 * @author: yueLQ
 * @date: 2021-04-26 15:03
 */
public class DelayMessageQueue {
    private Jedis jedis;
    private String queue;


    public DelayMessageQueue(Jedis jedis, String queue) {
        this.jedis = jedis;
        this.queue = queue;
    }

    /**
     * 消息入队,data 是要发送的消息
     *
     * @param data
     */
    public void queue(Object data) {
        JavaboyMessage javaboyMessage = new JavaboyMessage();
        javaboyMessage.setId(UUID.randomUUID().toString());
        javaboyMessage.setData(data);
        // 序列化
        try {
            String value = new ObjectMapper().writeValueAsString(javaboyMessage);
            System.out.println("msg publish: " + new Date());
            // 消息发送,score 延迟五秒
            jedis.zadd(queue, System.currentTimeMillis() + 5000, value);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }

    /**
     * 消息消费
     */
    public void loop() {
        while (!Thread.interrupted()) {
            // 读取 score 在 0 到 当前时间时间戳之间的时间消息
            Set<String> zset = jedis.zrangeByScore(queue, 0, System.currentTimeMillis(), 0, 1);
            System.err.println("zset = " + zset);
            if (zset.isEmpty()) {
                // 如果消息是空的,则休息 500 毫秒然后继续。
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    break;
                }
                continue;
            }
            // 如果读取到了消息,则直接加载
            String next = zset.iterator().next();
            if (jedis.zrem(queue, next) > 0) {
                // 抢到了,接下来处理业务
                try {
                    JavaboyMessage javaboyMessage = new ObjectMapper().readValue(next, JavaboyMessage.class);
                    System.out.println("receive msg = "+new Date() + javaboyMessage);
                } catch (JsonProcessingException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

测试:

package org.javaboy.distributed_lock;

import redis.clients.jedis.Jedis;

/**
 * @author: yueLQ
 * @date: 2021-04-27 11:14
 */
public class DelayMessageTest {

    public static void main(String[] args) {
         Redis redis = new Redis();

         redis.execute(jedis->{
            // 构造一个消息队列
             DelayMessageQueue queue = new DelayMessageQueue(jedis,"javaboy-delay-queue");

             // 构建消息生产者
              Thread produce = new Thread(){
                  @Override
                  public void run() {
                      for (int i = 0; i < 5; i++) {
                          queue.queue("www.javaboy.org》》》》》"+i);
                      }
                  }
              };
              
              // 构造一个消费者
             Thread consumer = new Thread(){
                 @Override
                 public void run() {
                     queue.loop();
                 }
             };
             // 启动
             produce.start();
             consumer.start();
             // 休息七秒停止程序
             try {
                 Thread.sleep(7000);
                 // 线程停止
                 consumer.interrupt();
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         });
    }
}

5. Redis 操作位图

BIT 操作:
用户一年的签到记录,如果使用 string 类型来存储,那我们需要 365key-value,操作起来比较麻烦。通过位图可以有效的简化操作。

每天的记录占一个位,365 天就是 365 个位大概就是 46 个字节,这样可以有效的节省了存储空间。如果有一天想要统计用户一共签到多少天,统计 1 的个数即可。

对于位图的操作,可以直接的操作字符串(get/set),也可以直接的操作位(getbit/setbit)。


基本操作

零存整取:
例如存储一个 Java 字符串:

字符ASCII二进制
J7401001010
a9701100001
v11801110110

在这里插入图片描述

整存零取:
存储一个整个的字符串进去,但是通过位操作获取一个字符串。


统计
例如签到记录:
01111000111
1 表示签到,0 表示没签到,统计总的签到天数。
可以使用 bitcount 命令
在这里插入图片描述
bitcount 中,可以统计起始位置,但是注意,这个起始位置是指字符串的起始位置,而非 bit 的起始位置。

除了 bitcount 命令之外,还有一个 bitposbitpos 可以用来统计指定范围内出现的第一个 10 的位置,这个命令中的起始位置和结束位置都是字符索引,而不是 bit


bit 批处理
redis 3.2 之后,新加的功能叫做 bitfiled,可以对 bit 进行批量操作。

例如:

BITFIELD name get u4 0 # 表示获取 name 中的位, 从 0 开始,获取 4 个位,返回一个无符号的数字。

  • u 表示无符号的数字
  • i 表示有符号的数字,第一个符号就表示符号位,1 代表为负数。

bitfiled 一次也可以执行多个操作。
get 操作:
在这里插入图片描述
set 操作:
在这里插入图片描述
把无符号的 98b) 转成八位二进制,代替第八位开始代替掉八位的值。

incrby 操作:
对指定范围进行自增操作,自增操作可能出现溢出,既可能是向上溢出,也可能是乡下溢出。redis 中对溢出的处理方式是折返。8 位无符号的的数是 255 加一溢出变为 0,,八位有符号数127,加一变为 -128

也可以修改默认的溢出策略,可以改为 fail,表示执行失败

 BITFIELD name overflow fail  incrby u2 6 1

sat 表示停留在最大/最小值阶段

BITFIELD name overflow sat  incrby u2 6 1

6. Redis 中的 HyperLogLog

一般我们评估一个网站的访问量,有几个主要的参数:

  • pv : page view, 网页的浏览量。
  • uv: user view, 访问的用户。

一般来说,pv或者uv的统计,可以自己来做,也可以借助一些第三方的工具,比如 cnzz,友盟等。


如果自己实现,pv 比较简单,可以直接通过 redis 计数器就能实现。但是 uv 就不一样,uv 涉及到另一个问题,就是去重。

我们首先需要在前端给每一个用户生成唯一id,无论是登录用户还是未登录的用户,都要有一个唯一 id。这个 id 伴随着请求一起到达后端,在后端我们通过 set 集合中 sadd 命令来进行存储这个 id,最后通过 scard 统计集合的大小,进而得出 uv 数据。

如果是千万级别的 uv,需要i的存储空间就非常惊人。而且,像 uv 统计这样的数据,也不需要非常的精确。800wuv803wuv,其实差别不大。


HyperLogLog:
redis 中提供的hyperLogLog 就是专门用来解决这个问题的,hyperLogLog 提供了一套不怎么精确但是能够用的去重方案,会有误差。官方给的误差是数据是0.81%,这个精确度统计 uv 就足够了。

HyperLogLog 主要提供了两个命令:pfasddpfcount
pfadd:用来添加记录,类似于 sadd 指令 ,添加过程中,重复的记录会自动去重。
pfcount:则用来统计数据。

数据量较少的时候我们看不出来误差,在 java 中,我们多添加几个元素:

package org.javaboy.distributed_lock;

/**
 * @author: yueLQ
 * @date: 2021-04-28 22:42
 */
public class HyperLogLog {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            for (int i = 0; i < 1000; i++) {
                jedis.pfadd("uv", "u" + i, "u" + (i + 1));
            }
            long uv = jedis.pfcount("uv");
            // 理论值是 1001
            System.out.println("uv = " + uv);
        });
    }
}

这里的理论值应该是 1001,但是实际输出结果是 994,但是是在可接受范围之内的。
除了上面两种命令之外,还有一个 pfmerge,合并多个统计结果。在合并的同时,会自动去除多个集合中重复的元素。
在这里插入图片描述


7. 布隆过滤器

我们使用 HyperLogLog 用来估计一个数,有偏差但是也足够的使用,HyperLogLog 主要提供两个方法:

  • pfadd
  • pfcount

但是 hyperLogLog 没有判断是否包含的方法,例如 pfexists,pfcontains 等。上述一类的方法是没有的,但是我们却存在这样的业务。

例如我们刷今日头条,推送的内容有相似的,但是没有重复的。这个就涉及到如何在推送的过程中去重。

解决方案很多,例如将用户的浏览历史记录下来,每次推送时去比较该条消息是否已经给用户使用了,但是这种方式效率极低不推荐。


7.1 Bloom Filter 介绍

bloom filte 专门用来解决我们上述所说的去重问题。使用 bloom filter 不会像缓存一样浪费空间。但是也有一个小问题就是不太精确。

bloom filter 相当于是一个不太精确的 set 集合。我们可以用 contains 方法去判断某一个对象是否存在。但是需要注意,这个判断不是非常精确。一般来说,通过 contains 判断某个值不存在,那就一定不存在,但是判断某个值存在的话,则他可能不存在。以今日头条为例:将设我们将用户的浏览记录用 b 表示,a 表示 用户没有浏览的新闻,现在要给用户推送消息,先去 b 里面判断这条消息是否已经推送过了,如果判断结果没有推送过(b 里面没有这条记录),那就一定没有推送过。如果判断结果说有推送过(b 里面也有可能没有这条消息),这个时候该条消息就不会推送给用户,导致用户错过该条消息。当然这是概率较低的,存在这个问题。

 

7.2 Bloom Filter 原理

每一个布隆过滤器,在 redis 中对应了一个大型的位数组以及几个不同的 hash 函数。
所谓的 add 操作是这样的:
首先根据几个不同的 hash 函数给元素进行 hash 算一个整数的索引值,拿到这个索引值之后,对位数组进行的长度进行取模运算,得到一个位置,每一个 hash 函数都会得到一个位置,将位数组中对应的位置设置为 1,这样就完成了添加操作。

在这里插入图片描述

当判断元素是否存在的时候,依然要先对元素进行 hash 运算,将运算结果和位数组取模。然后去对应的位置查看是否有相应的数据,如果有,表示元素可能存在(因为有这个数据的地方也可能其他元素存储进来的),如果没有则表示元素一定不存在。

Bloom Filter 中,误判的概率和位数组的大小有很大的关系,位数组越大,误判的概率就越小当然占用的存储空间更大。反之位数组越小,误判的概率更高,所占用的存储空间就更小。

 

7.3 bloom filter 安装

官网:https://oss.redislabs.com/redisbloom/Quick_Start/

方式一: docker 安装

docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest

方式二:自己编译安装。

# 克隆下载
git clone https://github.com/RedisBloom/RedisBloom.git 
(如果上面的编译失败可以尝试下面的路径:wget https://github.com/RedisBloom/RedisBloom/archive/v2.0.3.tar.gz)
# 进入目录
cd Redisbloom
# 编译
make
# 运行,运行之前要把以前开启的 redis 关闭了
/path/to/redis-server --loadmodule ./redisbloom.so

在这里插入图片描述
如图所示,启动成功了,但是我们发现占用了前台,我们需要在后台启动,加上 redis-conf 即可:

redis-server redis.conf --loadmodule ./RedisBloom-2.0.3/redisbloom.so

在这里插入图片描述

安装完成之后执行 BF.add 命令,测试是否安装成功。
每次启动时候都输入 --loadmodule ./RedisBloom-2.0.3/redisbloom.so 比较麻烦。我们可以将要加载的模块可以再 redis.conf 中提前配置好。
在这里插入图片描述
上面配置完成之后,再次启动 redis 的时候直接添加配置文件即可。

 

7.4 Bloom Filter 的使用

主要提供两个命令,添加和判断是否存在:

  • bf.add: 添加
  • bf.madd: 批量添加
  • bf.exists: 判断是否存在
  • bf.mexists: 批量判断是否存在

使用 jedis 操作布隆过滤器:
首先添加 bloom filter 的依赖,注意这里面 bloom filter 的依赖是基于 jedis 3.0 以上的版本,低于则不兼容。

  <dependency>
     <groupId>com.redislabs</groupId>
     <artifactId>jrebloom</artifactId>
     <version>1.2.0</version>
  </dependency>

测试代码如下:

package org.javaboy.distributed_lock;

import io.rebloom.client.Client;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ShardedJedisPool;

/**
 * @author: yueLQ
 * @date: 2021-04-29 16:56
 */
public class BloomFilter {
    public static void main(String[] args) {
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxIdle(300);
        poolConfig.setMaxTotal(300);
        poolConfig.setMaxWaitMillis(3000);
        poolConfig.setTestOnBorrow(true);
        JedisPool pool = new JedisPool(poolConfig, "101.200.140.74", 6379, 3000, "123");
        Client client = new Client(pool);
        for (int i = 0; i < 100000; i++) {
            // 存入数据
            client.add("name", "javaboy-" + i);
        }
        // 检查数据是否存在
        boolean name = client.exists("name", "javaboy-999999");
        System.out.println("name = " + name);
    }
}

默认情况下,我们使用 bloom filter 过滤器他的错误率是 0.01,默认的元素大小是 100,但是这两个参数也是可以配置的。
我们可以调用 bf.reserve 进行配置。这里设置的 key 一定要不存在,否则会设置失败。

 BF.RESERVE k1 0.00001 1000000

第一个参数是 key,第二个参数是错误率,错误率越低,占用的空间越大,第三个参数是预计存储的数量,当实际存储的数量超出预计数量的时候,错误率会上升。

 

7.5 典型的应用场景

前面所说的新闻推送过滤算是一个应用场景。

解决 redis 穿透或者又叫做缓存击穿问题 (前提场景是数据量较大时使用)。假设我们有一亿条数据,现在我们查询用户要去数据库去查,效率低而且数据库压力大,所以我们会把请求会在 redis 中处理(活跃的用户存储在这里),redis 中没有的用户数据再去数据库中查询。

现在可能存在一个恶意请求,这个请求可能存在了很多不存在的用户,这个时候 redis 无法拦截下来请求,所以请求会跑到数据库里。这些恶意请求会击穿我们的缓存,甚至数据库进而引起雪崩效应。为了解决如上问题我们就可以使用布隆过滤器。


将一亿条用户数据存在 redis 中不是很现实,但是可以存在布隆过滤器中,请求来了首先判断数据是否存在,如果存在,再去数据库中查询,否则就不去数据库中查询。



8. Redis 限流

8.1 Pipeline 介绍

Pipeline (管道) 本质上是由客户端提供的 一种操作。 Pipeline 通过调整指令列表的读写顺序,可以大幅度的节省 IO 时间调高效率。


8.2 简单的限流操作

package org.javaboy.distributed_lock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;

/**
 * @author: yueLQ
 * @date: 2021-04-30 9:51
 * <p>
 * 在这里我们定义一个方法,去验证某些方法是否被允许
 */
public class RateLimiter {

    private Jedis jedis;

    public RateLimiter(Jedis jedis) {
        this.jedis = jedis;
    }

    /**
     * 限流方法
     *
     * @param user     操作的用户,相当于是限流的对象
     * @param action   具体的操作
     * @param period   时间窗,限流的周期
     * @param maxCount 限流的次数
     * @return
     */
    public boolean isAllowed(String user, String action, int period, int maxCount) {
        // 1. 数据我们使用 zset 保存,首先我们生成 key
        String key = user + "-" + action;
        // 2. 获取当前时间戳
        long timeMillis = System.currentTimeMillis();
        // 3. 建立管道
        Pipeline pipelined = jedis.pipelined();
        // 4. 开启任务的执行
        pipelined.multi();
        // 5. 将当前的操作先存储先来
        pipelined.zadd(key, timeMillis, String.valueOf(timeMillis));
        // 6. 移除时间窗之外的数据
        pipelined.zremrangeByScore(key, 0, timeMillis - period * 1000);
        // 7. 统计剩余的 key
        Response<Long> zcard = pipelined.zcard(key);
        // 8. 将当前的 key 设置一个过期时间,过期时间就是时间窗
        pipelined.expire(key, period + 1);
        // 9.执行管道
        pipelined.exec();
        // 10.关闭管道
        pipelined.close();
        // 11. 返回比较时间窗内的操作数
        return zcard.get()<=maxCount;
    }
// 但是这个限流有一个缺陷,当数据量很大的时候,十秒之内限制十万次,五十万次的访问量,存储的数据量较大,不是很适合

    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis1 -> {
            RateLimiter rateLimiter = new RateLimiter(jedis1);
            for (int i = 0; i < 20; i++) {
            boolean allowed = rateLimiter.isAllowed("coding", "public", 3, 3);
            System.out.println("allowed = " + allowed);
            }
        });
    }
}


8.3 深入限流操作

redis 4.0 开始提供了一个 redis-cell 模块,该模块使用了漏斗算法,提供了一个非常好用的限流指令。

漏斗算法就像他的名字一样,是一个漏斗,请求从这个漏斗大口进,从小口出进入到系统当中,这样无论是多大的访问量,最终进入到系统中的请求都是固定的。一般的实现方式可以使用队列,队列可以给他设置一个容量,这样的话访问量可以大批的塞入队列,队列满了之后丢弃之后的访问量。

 

使用漏斗算法,首先我们要安装 redis-cell 模块:

https://github.com/brandur/redis-cell
安装步骤:
在这里插入图片描述

# 下载
wget https://github.com/brandur/redis-cell/releases/download/v0.2.4/redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz

# 解压
tar -zxvf redis-cell-v0.3.0-x86_64-unknown-linux-gnu.tar.gz

# 解压之后我们发现直接解压到当前目录下了,我们直接创建一个新的目录进行存储。
mkdir redis-cell

#  移动文件
mv libredis_cell.* redis-cell

接下来修改 redis.conf 文件,加载新的模块。
在这里插入图片描述
然后重新启动 redis,启动成功之后如果出现如下命令,说明 redis-cell 就安装成功了。
在这里插入图片描述
CL.THROTTLE 该命令一共有五个参数:

  1. 第一个参数是 key 限流的操作
  2. 参数二,漏斗的容量
  3. 参数三,时间窗内可以操作的次数
  4. 参数四,时间窗
  5. 参数五,每次漏出的数量

在这里插入图片描述
执行完成之后,返回值也有五个:

  1. 参数一,0 表示允许, 1 表示拒绝。
  2. 参数二,是漏斗的容量。
  3. 参数三,是漏斗的剩余空间
  4. 如果拒绝了,多长时间后,可以重试
  5. 多长时间后,漏斗完全可以空出来

8.4 lettuce 扩展

首先定义一个命令接口:


 package org.javaboy;

import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.annotation.Command;

import java.util.List;

/**
 * @author: yueLQ
 * @date: 2021-05-01 13:56
 */
public interface RedisCommandInterface extends Commands {

    /**
     * 定义一个方法,这个方法就是一个命令的接口。
     * @param key
     * @param init 漏斗的初始容量
     * @param count 操作次数
     * @param period 操作时间
     * @param quota  每次露出的个数
     * @return
     */
    @Command("CL.THROTTLE ?0 ?1 ?2 ?3 ?4")
    List<Object> throttle(String key,Long init,Long count,Long period,Long quota);


}

定义好扩展命令,接下来直接调用即可。

package org.javaboy;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.dynamic.RedisCommandFactory;

import java.util.List;

/**
 * @author: yueLQ
 * @date: 2021-05-01 14:09
 */
public class RedisCellTest {

    public static void main(String[] args) {
        RedisClient redisClient = RedisClient.create("redis://123@101.200.140.74:6379");
        StatefulRedisConnection<String, String> connect = redisClient.connect();
        RedisCommandFactory redisCommandFactory = new RedisCommandFactory(connect);
        RedisCommandInterface commands = redisCommandFactory.getCommands(RedisCommandInterface.class);
        List<Object> throttle = commands.throttle("javaboy-public", 10L, 10L, 60L, 1L);
        System.out.println("throttle = " + throttle);
    }
}

测试结果:
在这里插入图片描述

布隆过滤器:

package org.javaboy;

import io.lettuce.core.dynamic.Commands;
import io.lettuce.core.dynamic.annotation.Command;

import java.util.List;

/**
 * @author: yueLQ
 * @date: 2021-05-01 13:56
 */
public interface RedisCommandInterface extends Commands {

    /**
     * 定义一个方法,这个方法就是一个命令的接口。
     * @param key
     * @param init 漏斗的初始容量
     * @param count 操作次数
     * @param period 操作时间
     * @param quota  每次露出的个数
     * @return
     */
    @Command("CL.THROTTLE ?0 ?1 ?2 ?3 ?4")
    List<Object> throttle(String key,Long init,Long count,Long period,Long quota);

    /**
     *  布隆过滤器添加指令
     * @param key
     * @param data 添加的数据
     * @return
     */
    @Command("BF.ADD ?0 ?1")
    Long bfAdd(String key,Object data);

    /**
     *  布隆过滤器,判断该值是否存在。
     * @param key
     * @param data 添加的数据
     * @return
     */
    @Command("BF.Exists ?0 ?1")
    Long bfExists(String key,Object data);


}

测试:

package org.javaboy;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.dynamic.RedisCommandFactory;
import jdk.internal.org.objectweb.asm.tree.analysis.SourceValue;

/**
 * @author: yueLQ
 * @date: 2021-05-01 14:20
 * <p>
 * 布隆过滤器测试
 */
public class BloomFilterTest {

    public static void main(String[] args) {
        RedisClient redisClient = RedisClient.create("redis://123@101.200.140.74:6379");
        StatefulRedisConnection<String, String> connect = redisClient.connect();
        RedisCommandFactory redisCommandFactory = new RedisCommandFactory(connect);
        RedisCommandInterface commands = redisCommandFactory.getCommands(RedisCommandInterface.class);
        //添加
//        for (int i = 0; i < 100; i++) {
//            Long javaboy = commands.bfAdd("javaboy", i);
//            System.out.println("javaboy = " + javaboy);
//        }
        // 查询
        Long exists = commands.bfExists("javaboy", 10);
        System.out.println("exists:"+(exists==1L?"true":"false"));
    }
}



9. GeoHash算法介绍

redis3.2 开始提供了 GEO 模块,该模块使用的算法是 Geo Hash 算法。

简介
核心思想: Geo Hash 是一种地址编码的方法,使用这种方式,能够将二维的空间经纬度数据编码称一个一维字符串。

地球上的经纬度的划分:
已经过格林尼治天文台旧址的经线为 0 度经线,向东就是东经,向西就是西经。如果我们将西经定为负,那么经度的范围就是 [-180,+180]
纬度北纬 90 度到南纬 90 度,如果我们将南纬定义为负,则纬度的范围就是 [-90,+90]
接下来,以本初子午线和赤道为界,我们可以将地球上的点分配到一个二维的坐标中,具体如下:
在这里插入图片描述
Geo Hash 算法就是基于这种思想,划分的次数越多,区域越多,每个面积就越小,精确度就会更高。

Geo Hash 具体算法:
以北京天安门为例(39.9053908600,116.3980007200)
1.纬度范围在 (-90,90) 之间,中间值为 0,对于 39.9053908600(0,90) 之间,因此得到的值为 1
2. (0,90)的中间值为 4539.9053908600(0,45) 之间,因此得到一个 0
3. (0,45) 的中间值为 22.539.9053908600(22.5,45)之间,因此得到一个 1
4. 以此类推…
这样,我们得到的纬度的二进制为 101
按照如上步骤,我们可以得出经度的二进制是110
接下来将经纬度合并(经度占偶数位,纬度占奇数位,下标从 0 开始)
111001


按照 base32 (0-9,b-z去掉a i l o),对合并后的数据进行编码,编码的时候,现将二进制数据转换为十进制,然后进行编码。
将编码得到的字符串,可以拿到 geohahs.org 网站上解析。

在这里插入图片描述

Geo Hash 有哪些特点:

  1. 用一个字符串表示经纬度。
  2. Geo Hash 表示一个区域,而不是一个点。
  3. 编码格式规律,例如一个地址编码之后格式是 123,另一个地址编码之后的格式是 123456,从字符串上就可以看出来,123456 处于 123 之中。

redis 中使用

添加地址:

GEOADD city 116.3980007200 39.9053908600 beijing
GEOADD city 114.0592002900 22.5536230800 shenzhen

查看两地之间的距离:

GEOADD city 114.0592002900 22.5536230800 shenzhen

获取元素的位置:

GEOPOS city beijing

在这里插入图片描述

获取 hash值:

 GEOHASH city beijing

在这里插入图片描述
通过 hash 查看定位。

查看附近的人:

# 以北京为中心方圆两百千米以内的城市找出来三个按照远近顺序排列在,这个命令不会排出北京
 GEORADIUSBYMEMBER city beijing 200 km count 3 asc

在这里插入图片描述
当然也可以当前自己的经纬度查询

 GEORADIUS city 116.3980007200 39.9053908600 2000 km withdist withhash withcoord count 4 desc

在这里插入图片描述


10. Redis 之 Scan

scan实际上是 keys 的升级版。
可以用 keys 来查询 key,在查询的过程中,可以使用通配符。 keys 用着还算方便,但是没有分页功能。另外它的内部使用的遍历算法,效率比较低( redis 是单线程程序),特别是数据量较大的时候。

为了解决如上问题,从 redis 2.8 开始,引入了 scan
scan 具备了 keys 的功能,但是不会阻塞线程,而且可以控制每次返回的结果数。

 

10.1 基本用法

准备一万条测试数据,代码如下:

package org.javaboy.distributed_lock;

/**
 * @author: yueLQ
 * @date: 2021-05-01 17:42
 */
public class ScanTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            for (int i = 0; i < 10000; i++) {
                jedis.set("k"+i,"v"+i);
            }
        });
    }
}

scan 命令一共提供了三个命令:

  • 参数一,cursor游标
  • 参数二, key
  • 参数三,limit

cursor实际上指的是一维数组索引的位置,limit 是遍历一维数组的个数(一维数组的槽位数),所以每次返回数据的大小是不确定的。

# 游标从 0 开始,匹配 k8* 共一千个数据
scan 0 match k8* count 1000

在这里插入图片描述
 

10.2 基本原理

scan 的遍历顺序。
假设目前有三条数据:
在这里插入图片描述

在遍历过程中,发现游标的顺序是 0 2 1 3,从十进制来看好像没有规律,但是从转为二进制,则是有规律的:

00->10->01->11

这种规律就是高位进 1,从传统的二进制加法,是从右往左加,这里是从左往右加。

实际上 redis 中,它的具体计算流程给是这样:

  1. 将要就算的数字反转
  2. 给反转后的数字加 1
  3. 再反转

那么为什么不是按照 0 1 2 3 4 ....这样的顺序遍历那?主要考虑到两个问题:

  1. 字典扩容
  2. 字典缩容

在这里插入图片描述

假设我们要访问 110 时,发生了扩容,此时 scan 就会从 0110 开始遍历,之前已经被遍历过得元素,就不会被重复遍历了。

假如,我们将要访问 110时,发生了缩容,此时 scan 就会从 10 开始遍历,这个时候,也会遍历到010,但是010 之前的不会再被遍历了。所以,在发生缩容的时候,可能会返回重复的数据。


10.3 其他命令

scan 是一系列指令,除了遍历所有的 key 之外,也可以遍历某种类型的 key, 对应的命令有:

  • zscan
    遍历 zset

  • hscan
    遍历 hash

  • sscan
    遍历 set

在这里插入图片描述

第 05 章 深入理解 Redis

 

1. 阻塞 IO 与非阻塞 IO

javajdk 1.4 中,引入了 NIO,但是也有很多人在使用阻塞 IO,那么这两种 IO有什么区别那:

在阻塞的模式下,如果你从数据流中读取不到指定大小的数据量,IO 就会阻塞。比如已知会有 10 个字节,会发送过来,但是我们目前只收到了 4 个,还剩下 6 个,此时就会发生阻塞。如果是在非阻塞模式下,虽然此时只收到 4 个字节,但是读到 4 个字节就会立刻返回,不会傻傻等着,等到另外 6 个字节来的时候,再继续读取。

所以阻塞 IO 性能低于非阻塞式 IO
如果一个 Web 服务器,使用阻塞式 IO 处理请求,那么每一个请求都需要开启一个新的线程,但是如果使用非阻塞式 IO,基本上一个小小的线程池就足够使用了,因为不会发生阻塞,每个线程都能高效利用。


2. Redis 线程模型

首先一点,redis 是单线程的。单线程是如何解决高并发问题的那?
实际上,能够处理高并发的单线程应用不仅仅是 redis,除了 redis 之外,还有 node.jsnginx 等等也是单线程应用。

redis 虽然是单线程,但是运行的很快,主要有下面几个方:

  1. redis 中的所有 数据都是基于内存的,所有的计算也都是内存级别的计算。
  2. redis 是单线程的,所有一些时间复杂度较高的指令,可能会导致 redis 卡顿,例如 keys
  3. redis 在处理并发客户端连接时,使用了非阻塞是式 IO。(使用非阻塞是IO 时,会有一个问题,就是线程如何知道剩下来的数据来了?这里就涉及到了一个新的概念叫做 多路复用,其本质上就是一个事件轮训 API
  4. redis 会给每一个客户端指令通过队列来排队进行顺序处理。
  5. redis 做出响应时,也会有一个响应的队列。

3. Redis 通信协议


3.1 简介

redis通信使用了文本协议,文本协议比较费流量,但是 redis 作者认为数据库的瓶颈不在于网络流量,而在于内部逻辑,所以采用了这个比较费流量的文本协议。

这个文本协议叫做 redis serialization protocol,简称 resp

redis 协议将传输数据结构分为 5 种最小单元,单元结束的时候,加上回车换行符 \r\n

  1. 单行字符串以 + 开始,例如:+javaboy.org\r\n
  2. 多行字符串以 $ 开始,后面加上字符串的长度,例如 $11\r\njavaboy.org\r\n
  3. 整数以 : 开始,例如: :1024\r\n
  4. 错误消息以 - 开始
  5. 数组以 * 开始,后面的加上数组的长度

需要注意的是,如果是客户端连接服务端,只能使用第 5 种。


3.2 实战

准备工作:
做两件事情:
为了方便客户端连接 redis,我们关闭 redis 的保护模式(在 redis.conf 配置文件中)

protected-mode no

同时关闭密码

requirepass xxxxx

配置完成之后重启 redis

示例代码:

package org.javaboy;

import java.io.IOException;
import java.net.Socket;

/**
 * @author: yueLQ
 * @date: 2021-05-01 22:56
 */
public class JavaboyRedisClient {

    private Socket socket;

    public JavaboyRedisClient() {
        try {
            socket=new Socket("101.200.140.74",6379);
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("redis 连接失败!");
        }
    }

    /**
     *  执行 redis 中的 set 命令,[set,key,value]
     * @param key
     * @param value
     * @return
     */
    public String set(String key,String value) throws IOException {
        StringBuilder builder = new StringBuilder();
        builder.append("*3")
                .append("\r\n")
                .append("$")
                .append("set".length())
                .append("\r\n")
                .append("set")
                .append("\r\n")
                .append("$")
                .append(key.getBytes().length)
                .append("\r\n")
                .append(key)
                .append("\r\n")
                .append("$")
                .append(value.getBytes().length)
                .append("\r\n")
                .append(value)
                .append("\r\n");
        System.out.println("builder = " + builder.toString());
        socket.getOutputStream().write(builder.toString().getBytes());
        byte[] buf =new byte[1024];
        socket.getInputStream().read(buf);
        return new String(buf);
    }

    /**
     * 在 redis 中的 get 命令 [get,key]
     * @param key
     * @return
     */
    public String get(String key) throws IOException {
        StringBuffer buffer = new StringBuffer();
        buffer.append("*2")
                .append("\r\n")
                .append("$")
                .append("get".length())
                .append("\r\n")
                .append("get")
                .append("\r\n")
                .append("$")
                .append(key.getBytes().length)
                .append("\r\n")
                .append(key)
                .append("\r\n");
        System.out.println("buffer = " + buffer.toString());
        socket.getOutputStream().write(buffer.toString().getBytes());
        byte[] buf=new byte[1024];
        socket.getInputStream().read(buf);
        return new String(buf);
    }

    public static void main(String[] args) throws IOException {
        JavaboyRedisClient javaboyRedisClient = new JavaboyRedisClient();
        // 添加数据
//        String set = javaboyRedisClient.set("k1", "v1");
//        System.out.println("set = " + set);
        // 获取数据
        String k1 = javaboyRedisClient.get("k1");
        System.out.println("k1 = " + k1);
    }
}

4. redis 数据持久化方式

redis 是一个缓存工具,也叫做 nosql 数据库,既然是数据库,那么必然支持数据的持久化操作。在 redis 中,数据库的持久化方式有两种:

  1. rdb 快照的方式
  2. AOF 日志

4.1 rdb 快照

redis 使用操作系统的多进程机制来实现快照持久化: redis 在持久化时,会调用 glibc 函数 fork 一个子进程,然后将快照持久化操作完全交给子进程去处理。而父进程则继续处理客户端请求。在这个过程中,子进程能够看到的内存中的数据在子进程产生的一瞬间就固定下来了。再也不会改变,这就是为什么 redis 持久化叫做快照。

具体配置

默认情况下,快照的持久化方式就是开启的。默认情况下会产生一个 dump.rdb 文件,这个文件就是备份下来的文件。当 redis 启动的时候,会默认加载该文件,从该文件中回复数据。

具体配置在 redis.conf 中:
在这里插入图片描述
快照频率:
第一个表示 900 秒内如果有一个键被修改,则进行快照。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

生成快照文件在哪个位置进行配置,默认情况下表示在当前文件夹下
在这里插入图片描述

备份流程:

  1. redis 运行中,我们可以向 redis 发送 save 命令来创建快照。但是需要注意 save 是一个阻塞命令,redis 在收到 save 命令开始处理备份操作之后,在处理完成之前,将不再处理其他的请求,其他命令将会被挂起,所以 save 使用的不多。

  2. 所以我们一般可以使用 bgsavebgsavefork 一个子进程去处理备份的事情不影响父进程处理客户端请求。

  3. 我们定义的备份规则,如果有规则满足,也会自动触发 bgsave

  4. 另外,当我们执行 shutdown 命令的时候,也会触发 save 命令,备份工作完成之后,redis 才会关闭。

  5. redis 搭建主从复制时,在从机连上主机之后,会自动发送一条 sync 同步命令,主机收到命令之后,首先执行 bgsave 快照,对数据进行快照,然后将快照的数据发送给从机,进行同步。

4.2 AOF

与快照持久化不同 aof 持久化是将被执行的命令追加到文件的末尾。在恢复时,只需要把记录下来的命令从头到尾执行一遍即可。

默认情况下, aof 是没有开启的。我们需要手动开启。

开启 aof 设置

在这里插入图片描述

aof 的文件名
在这里插入图片描述
备份的时机,默认的时每秒备份一次
在这里插入图片描述
表示 aof 文件在压缩时,是否还继续同步操作:
在这里插入图片描述

第一个配置表示,表示当前目前 aof 文件大小超过上一次重写时的 aof 文件大小的百分之多少的时候,再次进行重写。
第二个配置之前没有重写过,则启动时的 aof 大小为依据,同时要求 aof 文件至少要大于 64 m

在这里插入图片描述
同时避免快照备份方式的影响,记得将快照备份关闭:

在这里插入图片描述
BGREWRITEAOF 手动保存命令。


5. Redis 事务

正常来说,一个可以商用的数据库往往都有比较完善的事务支持,redis 当然也不例外。相对于关系型数据库中的事务模型, redis 中的事务要简单很多。因为简单,所以 redis 中的事务模型不太严格,所以我们不能像使用关系型数据库中的事务那样来使用 redis

在关系型数据库中,和事务相关的三个命令分别是:

  • begin
  • commit
  • rollback

redis 中也有对应的指令:

  • multi 开启
  • exec 执行
  • discard 回滚

 

原子性

注意 redis的事务,并不能算作原子性。他仅仅具备隔离性,也就是说当前的事务可以不被其他事务打断。

由于每一次事务操作涉及到的指令还是比较多的,为了提高执行效率,我们在使用客户端的时候,可以通过pipeline 来优化这个指令的执行。

redis 中还有一个 watch 指令, watch 可以用来监控一个 key,通过这种监控,我们可以监控 redis 事务我们可以确保在 exec 之前,watch 的键没被修改过。


代码实现

package org.javaboy.distributed_lock;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

import java.util.List;

/**
 * @author: yueLQ
 * @date: 2021-05-03 8:28
 */
public class TransactionTest {

    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis -> {
            TransactionTest transactionTest = new TransactionTest();
            transactionTest.saveMoney(jedis,"javaboy",1000);
        });
    }

    public Integer saveMoney(Jedis jedis,String userId,Integer money){
        while (true){
            String user = jedis.get(userId);
            if (user==null){
                jedis.set(userId,"0");
            }
            String watch = jedis.watch(userId);
           Integer value =Integer.parseInt(jedis.get(userId))+money;
           // 开启事务
            Transaction multi = jedis.multi();
            multi.set(userId,String.valueOf(value));
            List<Object> exec = multi.exec();
            if (exec!=null){
                break;
            }
        }
        return Integer.parseInt(jedis.get(userId));
    }
}



6. Redis 主从同步

CAP:

在分布式环境下,CAP 原理是很基本的东西,所有分布式存储系统,都只能在 CAP 中选择两项实现:

  • cconsistent 一致性
  • aavailability 可用性
  • p:partition tolerance 分布式容忍性

在一个分布式系统中,这三个只能满足其中两项,在一个分布式中, p 是必须要实现的,ca 只能选择其中一项。大部分情况下,大多数网站架构选择了 ap

redis 中,实际上就是保证了最终一致。

redis 中,当搭建主从服务之后,如果主从机之间的断开了了连接,redis 是依然可以操作的,相当于它满足可用性,但是此时主从两个节点中的数据是有差异的(此时不满足数据的一致性)。但是 redis 保证最终一致,也就是当网络恢复的时候,从机会追赶主机,尽量保证最终一致性。


6.1 基本环境搭建

主从复制在一定程度上可以扩展 redis 的性能,redis 主从复制和关系型数据库的主从复制有些类似,从机能够精确的复制主机上的内容。实现主从复制之后,一方面能够实现数据的读写分离,降低 master 的压力,另一方面也能实现数据的备份。

配置放置
假设我们有三台虚拟机,地址如下:

101.200.140.74:6379  # 主机
106.52.168.110:6379 #从机
1.117.75.163:6379 # 从机

修改上述三台 redis 配置文件:
关闭保护模式:

在这里插入图片描述
如果开启了 AOF 模式,关闭 AOF

在这里插入图片描述

log 日志文件的位置:
在这里插入图片描述

启动三台 redis 实例
在主机中存入数据 set k1 v1

我们在两台从机中去关联主机,连接的指令是:

SLAVEOF 101.200.140.74 6379

我们可以使用如下命令去查看三台机器的信息

info replication

在这里插入图片描述
上面的是主机信息,下图是从机的信息
在这里插入图片描述

如上所示即主从复制安装成功。

有的时候我们会给主机设置密码,而这个时候我们发现从机虽然有信息,但是状态 down 没有连接上:
在这里插入图片描述
我们需要在 redis.conf 中给从机添加主机的验证:
在这里插入图片描述
重新启动从机,查看信息
在这里插入图片描述

我们发现只要从机服务停止,再次启动从机的时候他就会成为 master 节点,而我们需要重新的去使用节点连接的 SLAVEOF 命令,我们发现很麻烦。我们可以再配置文件中直接将 ip+端口 配置好,当每次服务停止再次开启服务的时候,从机即可自动去的连接主机。
在这里插入图片描述

解决完如上问题之后,我们的主从复制就算搭建好了。
此时我们在从机就可以获取到刚才在主机中存储的数据了。证明主从复制搭建成功。


注意:

  1. 如果主机已经运行了一段时间,并且存储了一些数据,此时从机连接上来,那么从机将会把主机上的所有数据进行备份,而不是从连接的那个时间点开始备份。
  2. 配置了主从复制之后,主机上可读可写,但是从机只能读取不能写入。但是我们可以修改从机的 redis.conf 文件中 replica-read-only的值让从机也可以执行写入的操作。

在这里插入图片描述
在这里插入图片描述

  1. 在整个主从复制的结构中,如果主机不幸的挂掉了,重启之后,他依然是主机。主从复制操作也能够继续进行。

 

6.2 复制流程

每一个 master 都有一个 replacation ID。这是一个较大的伪随机字符串,标记了一个给定的数据集。每个 master 也有一个偏移量,master 将自己产生的复制流发送个 slave 时,发送多少个字节的数据,自身的偏移量就会增加多少,目的是当有新的操作修改自己的数据集时,他可以以此更新 slave 的状态。复制偏移量即使再没有一个 slave 连接到 master 的时候,也会自增,所以基本上每一对给定的 replication ID offset 都会标记一个 master 数据集的特定版本。当 slave 连接到 master时,他们使用 PSYNC 命令来发送他们记录的旧的 master replication ID 和他们至今为止处理的偏移量。通过这种方式,master 能够仅发送 slave 所需要的增量部分。但是如果 master 的缓冲区中没有足够的命令积压缓冲记录,或者如果 slave 引用了不在知道的历史记录(replication ID),则会转而进行一个全量的重新同步。这种情况下,slave会得到一个完整的数据集副本,从头开始。

简单来说,是如下的几个步骤:

  1. slave 启动成功之后,连接到 master 后会发送一个 sync 命令。
  2. master 接收到命令之后,启动后台存盘进程。同时收集所有接收到用于修改数据集命令。
  3. 在后台进程执行完毕之后, master 将传送整个数据文件到 slave,来完成一次完全的同步。
  4. 全量复制:而 slave 服务在接受到数据库文件数据之后,将其存盘并加载到内存中。
  5. 增量复制: master 继续将新的所有收集到的修改命令依次传入给 slave完成同步。
  6. 但是只要重新连接 master,一次完全同步(全被复制),将被自动执行。

6.3接力赛(薪火相传)

上面我们搭建的主从复制都是下图这样的:
在这里插入图片描述
实际上,一主二仆的主从复制也可搭建如下这种结构:
在这里插入图片描述

搭建这种结构也很简单我只之需要将上文中的 1.117.75.163:6379 修改为 106.52.168.110:6379 他的从机即可。执行如下命令:

SLAVEOF 106.52.168.110 6379

6.3 Jedis 操作哨兵模式(反客为主)

在一台服务器上的根目录下新建 myredis 文件夹,(为了节省资源我们在一台服务器上搭建主从复制)。

复制 redis.conf 文件到新建的文件夹中,接着将redis.conf 文件复制三份,如下:
在这里插入图片描述

文件内容如下:
redis6379.conf:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

查看命令和上面的相同。配置完成之后 slave 命令去关联 master

当我们的主机宕机的时候我们可以使用如下指令 SLAVEOF no one 重新推举主机:

在这里插入图片描述

但是如上有个问题就是,如果 redis 服务停止了的话,重新启动的后,如果 master 节点再次出现问题的话。我们都要手动进行配置,所以很麻烦,接下来我们来实现自动化的配置。

myredis 文件夹中创建 sentinel.conf 文件,写入如下内容:

sentinel monitor mymaster 81.68.158.166 6381 1

启动哨兵:

redis-sentinel sentinel.conf

在这里插入图片描述
当我们关掉主机之后,会在从机之中选举出来新的主机。

复制延时

由于所有的写操作都是先在 Master上操作,然后同步更新到 Slave 上,所以从 Master 同步到 Slave 机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave 机器数量的增加也会使这个问题更加严重。

新选举的 master 节点规则如下:

  • 选择优先级靠前的,优先级在 redis.conf 中默认:replica-priority 100,值越小优先级越高。
    在这里插入图片描述

  • 选择偏移量最大的,偏移量是指获取原数据最全的。

  • 选择 runid 最小的从服务,每个 redis 实例的启动都会生成一个 40 位的 runid


6.4 Spring Boot 操作哨兵模式

创建项目,添加依赖:
在这里插入图片描述

修改 application.yml 文件

spring:
 redis:
   timeout: 5000
   sentinel:
     master: mymaster
     nodes: 81.68.158.166:26379

创建测试:

@SpringBootTest
class SentinelApplicationTests {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Test
    void contextLoads() {
        while (true) {
            try {
                String k1 = stringRedisTemplate.opsForValue().get("k1");
                System.out.println("k1 = " + k1);
            } catch (Exception e) {

            } finally {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}


7. Redis 集群搭建

redis 集群架构图:
在这里插入图片描述

redis 集群运行原理如下:

  1. 所有的redis 节点彼此互联 (PING-PONG 机制),内部使用二进制协议化传输速度和带宽。
  2. 节点的 fail 是通过集群中超过半数节点失效是才生效。
  3. 客户端与 redis 节点直连,不需要中间 proxy 层,客户端不需要连接集群所有节点,连接集群中任何一个可用节点即可。
  4. Redis-cluster 把所有的物理节点映射到 【0-16383】 slot上,cluster负责维护 node < - > slot < - > valueredis 集群中内置了 16384 个哈希槽,当我们需要在 redis 集群中放置一个 key-value时, redis 先对 key 使用 crc16 算法算出一个结果,然后把结果对 16384 去余数。这样每一个 key 都会对应一个编号在 0-16383 之间的的哈希槽,redsi 会根据节点数量大致均等的将哈希槽隐射到不同的节点上。

如何投票

投票的过程中集群的所有 master 参与,如果半数以上的 master 节点与 master 节点通信超过 cluster-node-timeout 设置的时间,认为当前 master 节点挂掉。

如何判断当前节点不可用

  1. 如果集群中的任意 master挂掉,且当前 master 没有 slave。集群进入 fail 状态,也可以理解为集群的 slot映射 【0-16383】 不完整的时候进入 fail 状态。
  2. 如果集群中超过半数以上的 master 挂掉,无论是否有 slave,集群进入 fail状态,当前集群不可用时,所有对集群的操作都不可用,收到 (err) CLUSTERDOWN The cluster is down 的错误。

ruby 环境
redis 5. 以上的版本不需要我们去搭建 ruby 环境了。
redis 集群管理工具 redis-trib.rb 依赖 ruby 环境,首先要安装 ruby 环境:

yum install ruby
yum install rubygems

如果安装失败可以参考如下文章:https://blog.csdn.net/fengye_yulu/article/details/77628094

7.1 集群搭建

创建文件夹 redis-cluster ,解压并安装 redis,修改 reids.conf文件。

  1. 去掉 bind 和保护模式

  2. 开启集群模式
    在这里插入图片描述

  3. 修改文件节点名称
    在这里插入图片描述

  4. 开启连接超时时间。
    在这里插入图片描述

  5. 修改端口
    在这里插入图片描述

完成以上配置之后,将 7001 文件夹复制 5 份。
在这里插入图片描述

修改其他五个文件夹里的 redis.conf 文件。
在这里插入图片描述
然后启动 redis 实例,启动结果如下:
在这里插入图片描述

启动完成之后,连接客户端,输入如下命令:

redis-cli --cluster create 81.68.158.166:7001 81.68.158.166:7002 81.68.158.166:7003 81.68.158.166:7004 81.68.158.166:7005 81.68.158.166:7006

创建结果如下(由于我的虚拟机内存较小,所以我们只是用了三个 redis 实例)
在这里插入图片描述
正如上图所示,我们的 redis 集群就正式搭建成功了,我们可以进入任意一个 redis 客户端中查看信息。

redis-cli -p 7001 -c #  连接客户端
cluster info  # 查看节点信息

在这里插入图片描述

7.2 主从 + 密码以及动态节点的添加

7.3 Jedis操作

package org.javaboy.distributed_lock;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.JedisPoolConfig;

import java.util.HashSet;

/**
 * @author: yueLQ
 * @date: 2021-05-17 21:08
 */
public class RedisCluster {
    public static void main(String[] args) {
        HashSet<HostAndPort> nodes = new HashSet<>();
        nodes.add(new HostAndPort("81.68.158.166",7001));
        nodes.add(new HostAndPort("81.68.158.166",7002));
        nodes.add(new HostAndPort("81.68.158.166",7003));
        JedisPoolConfig config = new JedisPoolConfig();
        // 连接池最大空闲数
        config.setMaxIdle(300);
        // 最大连接数
        config.setMaxTotal(300);
        // 连接的最大等待时间,单位是毫秒,如果是 -1 等待时间没有限制
        config.setMaxWaitMillis(3000);
        // 空闲时检查有效性。
        config.setTestOnBorrow(true);
        JedisCluster cluster = new JedisCluster(nodes,15000,15000,config);

        String set = cluster.set("k1", "v1");
        System.out.println("set = " + set);
        String k1 = cluster.get("k1");
        System.out.println("k1 = " + k1);
        cluster.close();
    }
}

8. Redis Stream

redis 5.0开始,推出了 Stream 功能。在 stream 中,有一个消息链表,所有加入链表中的消息都会被串起来。每条消息都有唯一的 ID,还有对应的消息内容,所谓消息内容,就是键值对。

一个 stream 上可以有很多个消费者,每一个消费者都有一个游标,这个游标根据消息的消费情况在链表上移动。多个消费者之间相互独立互不影响。


8.1 基本操作

  • xadd
    添加消息,* 号代表示随机生成表示,添加完成之后返回生成的 id

在这里插入图片描述

  • xdel
    删除消息

  • xrange
    获取消息列表,- 表示最小值,+ 表示最大值
    在这里插入图片描述

  • del
    删除 stream

  • xlen
    长度


8.2 消息消费

xread 消费
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

消费组的创建

在这里插入图片描述

XREADGROUP group c1 c block 0  count 1 streams javaboy >

group c1:上面生成消费组的名称
c:消费者的名称
block 0:阻塞读取,0 表示永远阻塞
count 1:只读取一条数据
> 符号:从游标的后面开始读取


9. Info 命令

在这里插入图片描述
在这里插入图片描述


10. Redis 策略

过期策略

redis 中所有的 key 都可以设置过期时间,所有被设置了过期时间的 key都会存放在一个独立的字典中,删除这些 key 有两种不同的策略:

  1. 定时任务定期删除,redis默认每秒进行 10 次过期扫描,每次从字典中虽然拿出来 20key,删除这 20key 中已经过期的 key,如果删除的 key的比例超过 1/4,就再拿出来 20 个进行删除,以此类推。
  2. 当客户端访问的时候,在去查看 key 是否过期,如果已经过期,则删除。

需要注意的是,如果是主从架构的话,从机是不会进行 key 的过期扫描。主机的 key 过期之后,会自动的同步到主机上去。

 

LRU,最近最少使用策略

因为 redis 是基于内存的,当 redis 所使用的内存超过物理内存的限制的时候,内存中的数据会和磁盘产生频繁的交换,这种交换行为会让 redis 的性能急剧下降,所以在实际开发的过程中,我们不允许 redis 出现交换(swap)的行为。

redis 实际内存超过系统可用内存之后,redis 提供了几种策略:

  1. noeviction:默认策略,此时写操作将停止,删除和读取可以进行。
  2. volatile-lru:淘汰设置了过期时间的 key,最近最少使用的 key 会被淘汰,如果一个 key没有设置过期时间,则不会被淘汰。
  3. volatile-ttl:淘汰设置了过期时间的 key,根据 keyttl值,ttl 越小,越优先淘汰,如果一个key 没有设置过期时间,则不会淘汰。
  4. volatile-random:淘汰设置了过期时间的 key,随意设置了过期时间的 key,如果一个 key没有设置过期时间,则不会被淘汰。
  5. allkeys-lru:淘汰所有的 key,最近最少使用的 key 会被淘汰。
  6. allkeys-random:淘汰所有的 key 随机淘汰。

 

11. lazy free

redis 是一个单线程程序,如果直接删除一个很大的 key,可能造成卡顿。

所谓的懒惰删除也就是异步删除,key 不在主进程中删除。

unlink(redis 4.0之后生效)
在这里插入图片描述
异步清除数据库
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


12. 加密通信

准备两台虚拟机,81.68.158.166101.200.140.74
在这里插入图片描述

spiped 下载地址:https://github.com/Tarsnap/spiped/archive/1.6.1.tar.gz

101.200.140.74 下安装:

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

make # 执行编译工作
make install # 执行安装

生成随机的秘钥文件,当前目录文件下执行如下命令:

dd if=/dev/urandom bs=32 count=1 of=redis.key

启动一个 spiped 进程

spiped -d -s '[192.168.91.128]:6479' -t  '[192.168.91.128]:6379' -k redis.key

spiped6479 端口上运行,这里的 ip 地址写的是内网 ip-d 进行解密。


81.68.158.166 下,安装 spiped,将 101.200.140.74 下生成 reids.key 文件复制过来。
执行如下命令:

scp redis.key 81.68.158.166:/redis-cluster/redis-6.2.3/

在这里插入图片描述
执行如下命令:

spiped -e -s '[192.168.91.129]:6379'  -t '[192.168.91.128]:6479'  -k /redis.key

我监听自己的 6379 监听到之后加密,转发大 128 上的 6479 端口上去。-e 是进行加密。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/m0_46159525/article/details/115912555

redis安装及简单使用-爱代码爱编程

redis安装 在官网下载redis安装包: http://redis.io/download 我下载的是 redis-3.0.6 版本 http://download.redis.io/releases/redis

redis简单使用及用json字符串的方式解决对象存储问题-爱代码爱编程

http://lumingfeng.xyz        想在windows下使用熟悉一下redis,首先在它的github上下载压缩包:https://github.com/dmajkic/redis 下载到本地之后解压

redis 简单使用_kenight_的博客-爱代码爱编程

官网: https://redis.io 安装使用: $ wget http://download.redis.io/releases/redis-4.0.2.tar.gz $ tar xzf redis-4.0.2.tar.gz $ cd redis-4.0.2 $ make 如果提示 The program 'make' can

一看就懂的redis简单操作及简介_青衫仗剑的博客-爱代码爱编程

redis简介 RedisClient提供api实现对redis的操作 redis是一个开源(BSD许可)的高性能的非关系型数据库,它可以作为数据库,缓存和消息中间件,它支持多种类型的数据结构,如(Strings)字符串,(Hashes)散列,(Lists)列表,(Sets)集合,(Sorted Sets)有序集合(它是一个接口,里面只

redis的简单使用_insisting2018的博客-爱代码爱编程

这里结合SSM框架使用,Mac系统 首先需要安装Redis 安装包:Redis安装包解压安装包,放在usr/local下,改名redis进入终端:cd /usr/local/redismakesudo make inst

redis的简单使用实例_gp_宣泄笔记的博客-爱代码爱编程

一、实验环境: win10 + redis3.2 + php7二、php-redis / redis /redis;三、redis常用的五种数据类型,不做详细说明四、php + mysql + redis 简单应用数据库名称:redis 数据表:redis_user模拟 php 操作Mysql + redis 的 CURD 操作 1、config.php配

redis简单使用_仑小杰的博客-爱代码爱编程_redis简单使用

1. 安装 安装包下载 安装包下载好以后,直接解压就可以使用,无需安装。 这是解压后的目录结构,其中redis-cli是客户端,redis-server是服务端。 双击启动redis服务端 双击启动redi

redis简单使用教程_小龍神的博客-爱代码爱编程_redis使用

一、Redis简介 什么是Redis?全称:REmote DIctionary Server,是可支持网络、可基于内存亦可持久化的日志型、Key-Value高性能数据库,并提供多种语言的API,它通常被称为数据结构服务器,

redis 简单用法例子_技术刘,的博客-爱代码爱编程

0.为梦想加油,奔跑吧! 1.安装Redis 2.在项目中引入依赖包 <dependency> <groupId>redis.clients</groupId> <artifa

redis简单使用-爱代码爱编程

介绍 Redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。它支持字符串、哈希表、列表、集合、有序集合,位图,hyperloglogs等数据类型。内置复制、Lua脚本、LRU收回、事务以及不同级别磁盘持久化功能,同时通过Redis Sentinel提供高可用,通过Redis Cluster提供自动分区

redis 的简单使用-爱代码爱编程

文章目录 Redis 与 各个数据库之间的区别Redis 数据结构简介string(字符串)尝试一下list(列表)尝试一下set(集合)尝试一下hash(散列(无序))尝试一下zset(有序集合)尝试一下 Redis 与 各个数据库之间的区别 名称类型数据存储选项查询类型附加功能redis使用内存存储(in-memory)的非关系型数据库字

简单总结redis使用流程-爱代码爱编程

以下仅是我本人对redis的使用流程: 1、使用docker拉取redis镜像(零配置). 2、启动redis,记得暴露端口,默认6379. 3、本地可视化工具——Redis.DesktopManger连接redis. 4、连接成功后可以直接在console操作redis命令. 5、关于Springboot项目的redis使用,maven引入依赖

redis的简单使用-爱代码爱编程

1. Redis的简介 1.1NoSQL非关系型数据库概述 什么是NOSQL Not Only SQL:不仅仅是SQL,指的就是非关系型数据库。以前学习过MySQL,它是关系型数据库。 非关系型数据库是关系型数据库有益的补充,不能代替关系型数据库。 非关系型数据库严格上不是一种数据库,应该是一种数据结构化存储方法的集合,可以是文档或者键值对等。

Redis简单使用(入门级)-爱代码爱编程

文章目录 1. Redis2.Redis常用数据类型3. Redis命令操作3.1 string类型3.2 hash类型3.3 list类型3.4 set类型3.5 sortedset类型3.6 通用命令4. Redis持久化配置解释4.1 修改配置文件`redis.windows.conf`RDB配置AOF模式配置5. Jedis操作redis5