Skip to content

redis

[TOC]

一、引言


1.1 数据库压力过大

由于用户量增大,请求数量也随之增大,数据压力过大

1.2 数据不同步

多台服务器之间,数据不同步 ----- session 共享问题

1.3 传统锁失效

多台服务器之间的锁,已经不存在互斥性了。 ------ 分布式锁的问题

二、Redis介绍


2.1 NoSQL介绍

  • Redis就是一款NoSQL。 not only sql

  • NoSQL -> 非关系型数据库 -> Not Only SQL。

  • Key-Value:Redis。。。

  • 文档型:ElasticSearch,Solr,Mongodb。。。

  • 面向列:Hbase,Cassandra。。。

  • 图形化:Neo4j。。。

  • 除了关系型数据库都是非关系型数据库。

  • NoSQL只是一种概念,泛指非关系型数据库,和关系型数据库做一个区分。

2.2 Redis介绍

  • 有一位意大利人,在开发一款LLOOGG的统计页面,因为MySQL的性能不好,自己研发了一款非关系型数据库,并命名为Redis。Salvatore。

  • Redis(Remote Dictionary Server)即远程字典服务,Redis是由C语言去编写,Redis是一款基于Key-Value的NoSQL,而且Redis是基于内存存储数据的,Redis还提供了多种持久化机制,性能可以达到110000/s读取数据以及81000/s写入数据,Redis还提供了主从,哨兵以及集群的搭建方式,可以更方便的横向扩展以及垂直扩展。

  • 补充 为什么 redis 的 核心功能是单线程 性能还这么高?

Redis之父
1586747559955

三、Redis安装


3.1 安装Redis

Docker-Compose安装

yml
version: '3.1'
services:
  redis:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 6379:6379
      
注意: 这个镜像仓库 的 安装   默认 开启了 两种持久化方式     rdb     aof      ,不同的安装方式  持久化开启情况不一样

3.2 使用redis-cli连接Redis

/usr/local/bin docker-compose 安装的redis redis-cli 在这个路径下

进去Redis容器的内部

docker exec -it 容器id bash

在容器内部,使用redis-cli连接

链接效果
1586757208129

补充:redis 的相关命令文件 在 容器内的 /usr/local/bin 目录内部

3.3 使用图形化界面连接Redis

下载地址:https://github.com/lework/RedisDesktopManager-Windows/releases/download/2019.5/redis-desktop-manager-2019.5.zip

傻瓜式安装

RedisDesktopManager
1586757262085

四、Redis常用命令【重点


4.1 Redis存储数据的结构

常用的5种数据结构:

  • key-string:一个key对应一个值。
  • key-hash:一个key对应一个Map。
  • key-list:一个key对应一个列表。 有序的 可以重复的
  • key-set:一个key对应一个集合。 无序的 不可以重复的
  • key-zset:一个key对应一个有序的集合。 有序的 不可以重复的

另外三种数据结构:

  • HyperLogLog:计算近似值的。
  • GEO:地理位置。
  • BIT:一般存储的也是一个字符串,存储的是一个byte[]。
五种常用的存储数据结构图
1586759101828
  • key-string:最常用的,一般用于存储一个值。

  • key-hash:存储一个对象数据的。

  • key-list:使用list结构实现栈和队列结构。

  • key-set:交集,差集和并集的操作。

  • key-zset:排行榜,积分存储等操作。

4.2 string常用命令

string常用操作命令

sh
#1.  添加值
set key value
set name zs 
#2. 取值
get key
get name
#3. 批量操作
mset key value [key value...]
mset k1 v1 k2 v2 k3 v3
mget key [key...]
mget k1 k2 k3
#4. 自增命令(自增1)
incr key 
set age 18
incr age 
#5. 自减命令(自减1)
decr key
decr age
#6. 自增或自减指定数量
incrby key increment
incrby age 2
decrby key increment
 DECRBY age 2

#7. 设置值的同时,指定生存时间(每次向Redis中添加数据时,尽量都设置上生存时间)   记住
setex key second value
setex address 30 zz      设置  address  zz 键值 存活30秒

ttl key    查看key 剩余存活时间
#8. 设置值,如果当前key不存在的话(如果这个key存在,什么事都不做,如果这个key不存在,和set命令一样)   记住
setnx key value
setnx lock 1       如果redis 内没有  lock 这个键  就执行成功   有的话就执行失败 
#9. 在key对应的value后,追加内容
append key value
append name zs      name 这个键对应的值后追加  zs

#10. 查看value字符串的长度
strlen key

4.3 hash常用命令

hash常用命令

sh
#1. 存储数据
hset key field value

#2. 获取数据
hget key field

#3. 批量操作
hmset key field value [field value ...]
hmget key field [field ...]

#4. 自增(指定自增的值)
hincrby key field increment

#5. 设置值(如果key-field不存在,那么就正常添加,如果存在,什么事都不做)
hsetnx key field value

#6. 检查field是否存在
hexists key field 

#7. 删除key对应的field,可以删除多个
hdel key field [field ...]

#8. 获取当前hash结构中的全部field和value
hgetall key

#9. 获取当前hash结构中的全部field
hkeys key

#10. 获取当前hash结构中的全部value
hvals key

#11. 获取当前hash结构中field的数量
hlen key

hash应用场景

对象缓存

hmset user:1 name zs sex male

模拟购物车

用户id 为 key 商品id 为 field 商品数量 为 value

添加商品 hset cart:1001 10086 1

增加数量 HINCRBY cart:1001 10086 1

商品种类总数 HLEN cart:1001

删除商品 hdel cart:1001 10086

获取购物车所有商品: HGETALL cart:1001

4.4 list常用命令

list常用命令

sh
#1. 存储数据(从左侧插入数据,从右侧插入数据)
lpush key value [value ...]
rpush key value [value ...]

#2. 存储数据(如果key不存在,什么事都不做,如果key存在,但是     不是list结构,什么都不做)
lpushx key value
rpushx key value

#3. 修改数据(在存储数据时,指定好你的索引位置,覆盖之前索引位置的数据,index超出整个列表的长度,也会失败)
lset key index value

#4. 弹栈方式获取数据(左侧弹出数据,从右侧弹出数据)
lpop key
rpop key

#5. 获取指定索引范围的数据(start从0开始,stop输入-1,代表最后一个,-2代表倒数第二个)
lrange key start stop

#6. 获取指定索引位置的数据
lindex key index

#7. 获取整个列表的长度
llen key

#8. 删除列表中的数据(他是删除当前列表中的count个value值,count > 0从左侧向右侧删除,count < 0从右侧向左侧删除,count == 0删除列表中全部的value)
lrem key count value

#9. 保留列表中的数据(保留你指定索引范围内的数据,超过整个索引范围被移除掉)
ltrim key start stop

#10. 将一个列表中最后的一个数据,插入到另外一个列表的头部位置
rpoplpush list1 list2

list 模拟栈 statck = lpush + lpop

模拟队列 queue = lpush + rpop

小明关注了 多个a,b,c三个公众号 三个公众号依次发消息 小明的id 为1

lpush msg:a 1001 a公众号发id 为1001 的 消息

lpush msg:b 1002 b公众号发id 为1002 的 消息

lpush msg:c 1003 c公众号发id 为1003 的 消息

4.5 set常用命令

set常用命令

sh
#1. 存储数据
sadd key member [member ...]

#2. 获取数据(获取全部数据)
smembers key

#3. 随机获取一个数据(获取的同时,移除数据,count默认为1,代表弹出数据的数量)
spop key [count]

#4. 交集(取多个set集合交集)
sinter set1 set2 ...

#5. 并集(获取全部集合中的数据)
sunion set1 set2 ...

#6. 差集(获取多个集合中不一样的数据)
sdiff set1 set2 ...

# 7. 删除数据
srem key member [member ...]

# 8. 查看当前的set集合中是否包含这个值
sismember key member

应用场景 朋友圈点赞

sadd like:messageid:1 1 messageid:1 消息 被 id为1 的用户点赞

SCARD like:messageid:1 获取点赞数

sismember like:messageid:1 2 判断 id 为2 的 有没有给这个朋友圈信息点赞

抖音的关注模型

zs关注的人 sadd zslike ls ww zl

ls 关注的人 sadd lslike ww zl tq

ww关注的人 sadd wwlike zs ls zl tq

ww的粉丝 sadd wwfans zs ls

zs和ls 共同关注的人 SINTER zslike lslike

zs关注的人也关注了ww

SISMEMBER zslike ls

SISMEMBER lslike ww

或者

SINTER zslike wwfans

zs可能认识的人

SDIFF lslike zslike

4.6 zset的常用命令

zset常用命令

sh
#1. 添加数据(score必须是数值。member不允许重复的。)
zadd key score member [score member ...]

#2. 修改member的分数(如果member是存在于key中的,正常增加分数,如果memeber不存在,这个命令就相当于zadd)
zincrby key increment member

#3. 查看指定的member的分数
zscore key member

#4. 获取zset中数据的数量
zcard key

#5. 根据score的范围查询member数量
zcount key min max

#6. 删除zset中的成员
zrem key member [member...]

#7. 根据分数从小到大排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrange key start stop [withscores]

#8. 根据分数从大到小排序,获取指定范围内的数据(withscores如果添加这个参数,那么会返回member对应的分数)
zrevrange key start stop [withscores]

#9. 根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样,如果不希望等于min或者max的值被查询出来可以采用 ‘(分数’ 相当于 < 但是不等于的方式,最大值和最小值使用+inf和-inf来标识)
zrangebyscore key min max [withscores] [limit offset count]

#   ZRANGEBYSCORE zset1 10 18 withscores limit 0 2    
取score  10  18 之间的 value(包含边界值10  18 索引为 0 处开始取    取两个 


#   ZRANGEBYSCORE zset1 (10 (18 withscores limit 0 2 
取score  10  18 之间的 value(不包含边界值10  18 索引为 0 处开始取    取两个

#  ZRANGEBYscore zset1 -inf +inf withscores limit 1  3
 最小 score 最大score 的值 索引1  开始 ,取 3  


#10. 根据分数的返回去获取member(withscores代表同时返回score,添加limit,就和MySQL中一样)
#  返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
#  成员按分数值递减的次序排列
zrevrangebyscore key max min [withscores] [limit offset count]     ?????

4.7 key常用命令

key常用命令

sh
#1. 查看Redis中的全部的key(pattern:* ,xxx*,*xxx)
keys pattern

#2. 查看某一个key是否存在(1 - key存在,0 - key不存在)
exists key

#3. 删除key
del key [key ...]

#4. 设置key的生存时间,单位为秒,单位为毫秒,设置还能活多久
expire key second
pexpire key milliseconds

#5. 设置key的生存时间,单位为秒,单位为毫秒,设置能活到什么时间点    接收到的是unix 时间戳
expireat key timestamp
pexpireat key milliseconds

#6. 查看key的剩余生存时间,单位为秒,单位为毫秒(-2 - 当前key不存在,-1 - 当前key没有设置生存时间,具体剩余的生存时间)
ttl key
pttl key

#7. 移除key的生存时间(1 - 移除成功,0 - key不存在生存时间,key不存在)
persist key

#8. 选择操作的库
select 0~15

#9. 移动key到另外一个库中
move key db

4.8 库的常用命令

db常用命令

sh
#1. 清空当前所在的数据库
flushdb

#2. 清空全部数据库
flushall

#3. 查看当前数据库中有多少个key
dbsize

#4. 查看最后一次操作的时间   Lastsave 命令返回最近一次 Redis 成功将数据保存到磁盘上的时间,以 UNIX 时间戳格式表示。
lastsave        返回   integer 

#5. 实时监控Redis服务接收到的命令   在一个连接上  使用 monitor 可以监控 别的连接对redis 的操作  
monitor

五、Java连接Redis【重点


5.1 Jedis连接Redis

5.1.1 创建Maven工程

idea创建

5.1.2 导入需要的依赖
xml
<dependencies>
    <!--    1、 Jedis-->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>2.9.0</version>
    </dependency>
    <!--    2、 Junit测试-->
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <!--    3、 Lombok-->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.16.20</version>
    </dependency>
</dependencies>
5.1.3 测试
java
public class Demo1 {

    @Test
    public void set(){
        //1. 连接Redis
        Jedis jedis = new Jedis("192.168.199.109",6379);
        //2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
        jedis.set("name","李四");
        //3. 释放资源
        jedis.close();
    }

    @Test
    public void get(){
        //1. 连接Redis
        Jedis jedis = new Jedis("192.168.199.109",6379);
        //2. 操作Redis - 因为Redis的命令是什么,Jedis的方法就是什么
        String value = jedis.get("name");
        System.out.println(value);
        //3. 释放资源
        jedis.close();
    }
}

5.2 Jedis存储一个对象到Redis以byte[]的形式

5.2.1 准备一个User实体类
java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {

    private Integer id;

    private String name;

    private Date birthday;

}
5.2.2 导入spring-context依赖
xml
<!-- 4. 导入spring-context -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.18.RELEASE</version>
</dependency>
5.2.3 创建Demo测试类,编写内容
java
public class Demo2 {

    // 存储对象 - 以byte[]形式存储在Redis中
    @Test
    public void setByteArray(){
        //1. 连接Redis服务
        Jedis jedis = new Jedis("192.168.199.109",6379);
        //------------------------------------------------
        //2.1 准备key(String)-value(User)
        String key = "user";
        User value = new User(1,"张三",new Date());
        //2.2 将key和value转换为byte[]
        byte[] byteKey = SerializationUtils.serialize(key);
        byte[] byteValue = SerializationUtils.serialize(value);
        //2.3 将key和value存储到Redis
        jedis.set(byteKey,byteValue);
        //------------------------------------------------
        //3. 释放资源
        jedis.close();
    }

    // 获取对象 - 以byte[]形式在Redis中获取
    @Test
    public void getByteArray(){
        //1. 连接Redis服务
        Jedis jedis = new Jedis("192.168.199.109",6379);
        //------------------------------------------------
        //2.1 准备key
        String key = "user";
        //2.2 将key转换为byte[]
        byte[] byteKey = SerializationUtils.serialize(key);
        //2.3 jedis去Redis中获取value
        byte[] value = jedis.get(byteKey);
        //2.4 将value反序列化为User对象
        User user = (User) SerializationUtils.deserialize(value);
        //2.5 输出
        System.out.println("user:" + user);
        //------------------------------------------------
        //3. 释放资源
        jedis.close();
    }

}

5.3 Jedis存储一个对象到Redis以String的形式

5.3.1 导入依赖
xml
<!-- 导入fastJSON -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.47</version>
</dependency>
5.3.2 测试
java
public class Demo3 {

    // 存储对象 - 以String形式存储
    @Test
    public void setString(){
        //1. 连接Redis
        Jedis jedis = new Jedis("192.168.199.109",6379);
        //2.1 准备key(String)-value(User)
        String stringKey = "stringUser";
        User value = new User(2,"李四",new Date());
        //2.2 使用fastJSON将value转化为json字符串
        String stringValue = JSON.toJSONString(value);
        //2.3 存储到Redis中
        jedis.set(stringKey,stringValue);
        //3. 释放资源
        jedis.close();
    }


    // 获取对象 - 以String形式获取
    @Test
    public void getString(){
        //1. 连接Redis
        Jedis jedis = new Jedis("192.168.199.109",6379);

        //2.1 准备一个key
        String key = "stringUser";
        //2.2 去Redis中查询value
        String value = jedis.get(key);
        //2.3 将value反序列化为User
        User user = JSON.parseObject(value, User.class);
        //2.4 输出
        System.out.println("user:" + user);

        //3. 释放资源
        jedis.close();
    }
}

扩展 使用 jackson 来进行 java 对象和json串的转换

5.4 Jedis连接池的操作

使用连接池操作Redis,避免频繁创建和销毁链接对象消耗资源

java
@Test
public void pool2(){
    //1. 创建连接池配置信息
    GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
    poolConfig.setMaxTotal(100);  // 连接池中最大的活跃数
    poolConfig.setMaxIdle(10);   // 最大空闲数
    poolConfig.setMinIdle(5);   // 最小空闲数
    poolConfig.setMaxWaitMillis(3000);  // 当连接池空了之后,多久没获取到Jedis对象,就超时

    //2. 创建连接池
    JedisPool pool = new JedisPool(poolConfig,"192.168.199.109",6379);
	
    //3. 通过连接池获取jedis对象
    Jedis jedis = pool.getResource();

    //4. 操作
    String value = jedis.get("stringUser");
    System.out.println("user:" + value);

    //5. 释放资源
    jedis.close();
}

5.5 Redis的管道操作

因为在操作Redis的时候,执行一个命令需要先发送请求到Redis服务器,这个过程需要经历网络的延迟,Redis还需要给客户端一个响应。

如果我需要一次性执行很多个命令,上述的方式效率很低,可以通过Redis的管道,先将命令放到客户端的一个Pipeline中,之后一次性的将全部命令都发送到Redis服务,Redis服务一次性的将全部的返回结果响应给客户端。

java
//  Redis管道的操作
@Test
public void pipeline(){
    //1. 创建连接池
    JedisPool pool = new JedisPool("192.168.199.109",6379);
    long l = System.currentTimeMillis();

    /*//2. 获取一个连接对象
    Jedis jedis = pool.getResource();

    //3. 执行incr - 100000次
    for (int i = 0; i < 100000; i++) {
        jedis.incr("pp");
    }

    //4. 释放资源
    jedis.close();*/

    //================================
    //2. 获取一个连接对象
    Jedis jedis = pool.getResource();
    //3. 创建管道
    Pipeline pipelined = jedis.pipelined();
    //3. 执行incr - 100000次放到管道中
    for (int i = 0; i < 100000; i++) {
        pipelined.incr("qq");
    }
    //4. 执行命令
    pipelined.syncAndReturnAll();
    //5. 释放资源
    jedis.close();

        System.out.println(System.currentTimeMillis() - l);
}

5.6 Redis的应用

5.6.1 准备开放平台项目

在资料资料中

5.6.2 准备docker-compose.yml文件
yml
# docker-compose.yml
version: "3.1"
services:
  nginx:
    image: daocloud.io/library/nginx:latest
    restart: always
    container_name: nginx
    ports:
      - 80:80
    volumes:
      - ./nginx_conf:/etc/nginx/conf.d
  tomcat1:
    image: daocloud.io/library/tomcat:8.5.15-jre8
    restart: always
    container_name: tomcat1
    ports:
      - 8081:8080
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - ./tomcat1_webapps:/usr/local/tomcat/webapps  
  tomcat2:
    image: daocloud.io/library/tomcat:8.5.15-jre8
    restart: always
    container_name: tomcat2
    ports:
      - 8082:8080
    environment:
      - TZ=Asia/Shanghai
    volumes:
      - ./tomcat2_webapps:/usr/local/tomcat/webapps    
  mysql:
    image: daocloud.io/library/mysql:5.7.24
    restart: always
    container_name: mysql
    ports:
      - 3306:3306
    environment:
      - MYSQL_ROOT_PASSWORD=root
      - TZ=Asia/Shanghai
    volumes:
      - ./mysql_data:/var/lib/mysql
    command:
      --lower_case_table_names=1   # 忽略表名的大小写
5.6.3 部署项目到Tomcat容器中

打成war包,复制到数据卷路径中即可

5.6.4 修改认证功能
java
//登录  Controller
@RequestMapping("/dologin")
@ResponseBody
public AjaxMessage login(String email, String password, HttpSession session,HttpServletResponse resp) {
    //1. 调用service执行认证,返回Redis的key
    String key = adminUserService.doLoginByRedis(email,password);
    //2. 判断返回的key是否为null
    if(StringUtils.isEmpty(key)) {
        //2.1 如果为null -> 认证失败
        return new AjaxMessage(false);
    }

    //3. 查询用户的权限信息,并且放到Redis中
    menuService.setUserMenuList(key);

    //4. 将key作为Cookie写回浏览器端
    Cookie cookie = new Cookie(AdminConstants.USER_COOKIE_KEY,key);
    cookie.setPath("/");
    cookie.setMaxAge(9999999);
    resp.addCookie(cookie);

    //5. 返回认证成功信息
    return new AjaxMessage(true);
}
java
//AdminUserService实现认证功能
@Override
public String doLoginByRedis(String email, String password) throws JsonProcessingException {
    //1. 判断用户名和密码是否正确
    AdminUser user = adminUserMapper.getUserByEmail(email);
    //2. 如果user==null , 登录失败,返回null
    if(user == null){
        return null;
    }
    //3. user != null 登录成功,声明一个UUID,作为存储到Redis中的key,返回key到Controller
    //3.1 声明key和value
    String key = UUID.randomUUID().toString();
    ObjectMapper mapper = new ObjectMapper();
    String value = mapper.writeValueAsString(user);
    //3.2 将key和value存储到Redis中
    Jedis jedis = pool.getResource();
    jedis.setex(AdminConstants.SESSION_USER + key,600,value);
    jedis.close();
    //4. 返回结果
    return key;
}
java
//MenuService将用户的权限信息存储到Redis中
@Override
public void setUserMenuList(String key) throws JsonProcessingException {
    //1. 获取用户的id
    Jedis jedis = pool.getResource();
    String value = jedis.get(AdminConstants.SESSION_USER + key);
    ObjectMapper mapper = new ObjectMapper();
    AdminUser adminUser = mapper.readValue(value, AdminUser.class);

    //2. 数据库查询用户权限信息
    List<Menu> menuList = menuMapper.getUserMenu(adminUser.getId());

    //3. 存储到Redis
    String menuKey = AdminConstants.USER_MENU + key;
    String menuValue = mapper.writeValueAsString(menuList);
    jedis.setex(menuKey,600,menuValue);
    jedis.close();
}
5.6.5 修改过滤器信息
java
//在filter中获取Jedis连接池的方式
private JedisPool pool;

public void init(FilterConfig filterConfig) throws ServletException {
    ServletContext servletContext = filterConfig.getServletContext();
    WebApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(servletContext);
    pool = (JedisPool) context.getBean("jedisPool");
}
java
// 修改doFilter方法
// 获取认证信息
String key = null;
// 获取Cookie
Cookie[] cookies = request.getCookies();
if(cookies != null){
    for (Cookie cookie : cookies) {
        if(cookie.getName().equals(AdminConstants.USER_COOKIE_KEY)){
            key = cookie.getValue();
        }
    }
}
// 没有认证过,cookie中没有获取到指定的信息
if(key == null){
    response.sendRedirect(request.getContextPath() + redirectUrl);
    return;
}

// 从Session域中获取用户认证信息
// AdminUser user = (AdminUser) session.getAttribute(AdminConstants.SESSION_USER);
// 修改为从Redis中获取用户的信息
Jedis jedis = pool.getResource();
String value = jedis.get(AdminConstants.SESSION_USER + key);
if (value == null) {
    jedis.close();
    response.sendRedirect(request.getContextPath() + redirectUrl);
    return;
}

// ...
//2.2 获取权限信息
// 从Session中获取了用户的权限信息
String menuValue = jedis.get(AdminConstants.USER_MENU + key);

// 重置key的生存时间
jedis.expire(AdminConstants.SESSION_USER + key,600);
jedis.expire(AdminConstants.USER_MENU + key,600);
jedis.close();
ObjectMapper mapper = new ObjectMapper();
List<Menu> menus = mapper.readValue(menuValue, new TypeReference<List<Menu>>() {});
5.6.6 修改SystemController
java
@RequestMapping("/side")
@ResponseBody
public AjaxMessage getMenuTree(@CookieValue(value = AdminConstants.USER_COOKIE_KEY,required = false)String key, HttpSession session, HttpServletResponse response) throws JsonProcessingException {

    // 修改AdminUser user = (AdminUser) session.getAttribute(AdminConstants.SESSION_USER);
    Jedis jedis = pool.getResource();
    String value = jedis.get(AdminConstants.SESSION_USER + key);
    ObjectMapper mapper = new ObjectMapper();
    AdminUser user = mapper.readValue(value, AdminUser.class);
    if (user == null) {
        try {
            response.sendRedirect("/login.html");
        } catch (IOException e) {
            e.printStackTrace();
        }
        return new AjaxMessage(true, null, new ArrayList<>());
    }
    jedis.expire(AdminConstants.SESSION_USER + key,600);
    jedis.close();
    List<Menu> menus = menuService.getUserPermission(user.getId());
    return new AjaxMessage(true, null, menus);
}
5.6.7 重新部署

重新打成war包,复制到数据卷路径中即可

六、Redis其他配置及集群【重点


修改yml文件,以方便后期修改Redis配置信息

yml
version: '3.1'
services:
  redis:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 6379:6379
    volumes:
      - ./conf/redis.conf:/usr/local/redis/redis.conf
      - ./data:/data
    command: ["redis-server","/usr/local/redis/redis.conf"]

6.1 Redis的AUTH

方式一:通过修改Redis的配置文件,实现Redis的密码校验

shell
# redis.conf
requirepass 密码

三种客户端的连接方式

  • redis-cli:在输入正常命令之前,先输入auth 密码即可。

  • 图形化界面:在连接Redis的信息中添加上验证的密码。

  • Jedis客户端:

    • jedis.auth(password);
  • 使用JedisPool的方式

java
// 使用当前有参构造设置密码
public JedisPool(final GenericObjectPoolConfig poolConfig, final String host, int port,int timeout, final String password)

方式二:在不修改redis.conf文件的前提下,在第一次链接Redis时,输入命令:Config set requirepass 密码

后续向再次操作Redis时,需要先AUTH做一下校验。

6.2 Redis的事务

Redis的事务:一次事务操作,改成功的成功,该失败的失败。

先开启事务,执行一些列的命令,但是命令不会立即执行,会被放在一个队列中,如果你执行事务,那么这个队列中的命令全部执行,如果取消了事务,一个队列中的命令全部作废。

  • 开启事务:multi
  • 输入要执行的命令:被放入到一个队列中
  • 执行事务:exec
  • 取消事务:discard

Redis的事务向发挥功能,需要配置watch监听机制

在开启事务之前,先通过watch命令去监听一个或多个key,在开启事务之后,如果有其他客户端修改了我监听的key,事务会自动取消。

如果执行了事务,或者取消了事务,watch监听自动消除,一般不需要手动执行unwatch。

6.3 Redis持久化机制

6.3.1 RDB

RDB是Redis默认的持久化机制

  • RDB持久化文件,速度比较快,而且存储的是一个二进制的文件,传输起来很方便。

  • RDB持久化的时机:

    save 900 1:在900秒内,有1个key改变了,就执行RDB持久化。

    save 300 10:在300秒内,有10个key改变了,就执行RDB持久化。

    save 60 10000:在60秒内,有10000个key改变了,就执行RDB持久化。

  • RDB无法保证数据的绝对安全。

介绍

       在指定的时间间隔内生成内存中整个数据集的持久化快照。快照文件默认被存储在当前文件夹中,名称为dump.rdb,可以通过dir和dbfilename参数来修改默认值。

        Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文件替换件。  整个过程中,主进程是不进行任何的IO操作的,这就确保了极高的性能。

配置文件

shell
# redis是基于内存的数据库,可以通过设置该值定期写入磁盘。
# 注释掉“save”这一行配置项就可以让保存数据库功能失效
# 900秒(15分钟)内至少1个key值改变(则进行数据库保存--持久化) 
# 300秒(5分钟)内至少10个key值改变(则进行数据库保存--持久化) 
# 60秒(1分钟)内至少10000个key值改变(则进行数据库保存--持久化)
save 900 1
save 300 10
save 60 10000
 
#当RDB持久化出现错误后,是否依然进行继续进行工作,yes:不能进行工作,no:可以继续进行工作,可以通过info中的rdb_last_bgsave_status了解RDB持久化是否有错误
stop-writes-on-bgsave-error yes
 
#使用压缩rdb文件,rdb文件压缩使用LZF压缩算法,yes:压缩,但是需要一些cpu的消耗。no:不压缩,需要更多的磁盘空间
rdbcompression yes
 
#是否校验rdb文件。从rdb格式的第五个版本开始,在rdb文件的末尾会带上CRC64的校验和。这跟有利于文件的容错性,但是在保存rdb文件的时候,会有大概10%的性能损耗,所以如果你追求高性能,可以关闭该配置。
rdbchecksum yes
 
#rdb文件的名称
dbfilename dump.rdb
 
#数据目录,数据库的写入会在这个目录。rdb、aof文件也会写在这个目录
dir /data

Fork

      fork的作用相当于复制一个与当前进程一样的进程。但是是一个全新的进程,并作为原进程的子进程。

当触发快照时,redis会单独创建(fork) 一个子进程来进行持久化,会将数据先写入一个临时文件中,等到持久化结束后再用这个临时文件替换掉上次持久化的文件,整个操作过程中,主进程是不进行任何IO操作的,这样做就确保了redis的高性能,当需要恢复数据时,就读取dump.rdb文件,把这个文件读取到内存中, rdb持久化方式 性能比较高 但是缺点是 最后一次持久化之后的数据可能丢失

触发条件

  1. 通过配制文件中的save条件(可自己配置)

    shell
    save 900 1
    save 300 10
    save 60 10000
  2. 手动通过save和bgsave命令

  • save:save时只管保存,其他不管,全部阻塞
  • bgsave:redis会在后台异步的进行快照操作,同时还可以响应客户端请求。可以通过lastsave命令获取最后一次成功执行快照的事件
  1. 通过flushall命令,也会产生dump.rdb文件,但是里面是空的,无意义。
  2. 通过shutdown命令,安全退出,也会生成快照文件(和异常退出形成对比,比如:kill杀死进程的方式)

如何恢复

shell
appendonly no
dbfilename dump.rdb
dir /var/lib/redis  #可以自行指定

appendonly 设置成no,redis启动时会把/var/lib/redis 目录下的dump.rdb 中的数据恢复。dir 和dbfilename 都可以设置。我测试时appendonly 设置成yes 时候不会将dump.rdb文件中的数据恢复

优势

  1. 恢复数据的速度很快,适合大规模的数据恢复,而又对部分数据不敏感的情况
  2. dump.db文件是一个压缩的二进制文件,文件占用空间小

劣势

  1. 当出现异常退出时,会丢失最后一次快照后的数据
  2. 当fork的时候,内存的中的数据会被克隆一份,大致两倍的膨胀需要考虑。而且,当数据过大时,fork操作占用过多的系统资源,造成主服务器进程假死。

使用场景

  1. 数据备份
  2. 可容忍部分数据丢失
  3. 跨数据中心的容灾备份
6.3.2 AOF

AOF持久化机制默认是关闭的,Redis官方推荐同时开启RDB和AOF持久化,更安全,避免数据丢失。

  • AOF持久化的速度,相对RDB较慢的,存储的是一个文本文件,到了后期文件会比较大,传输困难。

  • AOF持久化时机。

    appendfsync always:每执行一个写操作,立即持久化到AOF文件中,性能比较低。 appendfsync everysec:每秒执行一次持久化。 appendfsync no:会根据你的操作系统不同,环境的不同,在一定时间内执行一次持久化。

  • AOF相对RDB更安全,推荐同时开启AOF和RDB。

介绍

以日志的形式来记录每个写操作,将Redis执行过的所有写指令记录下来(读操作不可记录),只许追加文件但不可以改写文件,redis启动之初会读取该文件重新构建数据。保存的是appendonly.aof文件

aof机制默认关闭,可以通过appendonly yes参数开启aof机制,通过appendfilename myaoffile.aof指定aof文件名称。

aof持久化的一些策略配置

shell
#aof持久化策略的配置
#no表示不执行fsync,由操作系统保证数据同步到磁盘,速度最快。
#always表示每次写入都执行fsync,以保证数据同步到磁盘。
#everysec表示每秒执行一次fsync,可能会导致丢失这1s数据。
appendfsync everysec

对于触发aof重写机制也可以通过配置文件来进行设置:

shell
# aof自动重写配置。当目前aof文件大小超过上一次重写的aof文件大小的百分之多少进行重写,即当aof文件增长到一定大小的时候Redis能够调用bgrewriteaof对日志文件进行重写。当前AOF文件大小是上次日志重写得到AOF文件大小的二倍(设置为100)时,自动启动新的日志重写过程。
auto-aof-rewrite-percentage 100
# 设置允许重写的最小aof文件大小,避免了达到约定百分比但尺寸仍然很小的情况还要重写
auto-aof-rewrite-min-size 64mb

当aop重写时会引发重写和持久化追加同时发生的问题,可以通过no-appendfsync-on-rewrite no进行配置

shell
# 在aof重写或者写入rdb文件的时候,会执行大量IO,此时对于everysec和always的aof模式来说,执行fsync会造成阻塞过长时间,no-appendfsync-on-rewrite字段设置为默认设置为no,是最安全的方式,不会丢失数据,但是要忍受阻塞的问题。如果对延迟要求很高的应用,这个字段可以设置为yes,,设置为yes表示rewrite期间对新写操作不fsync,暂时存在内存中,不会造成阻塞的问题(因为没有磁盘竞争),等rewrite完成后再写入,这个时候redis会丢失数据。Linux的默认fsync策略是30秒。可能丢失30秒数据。因此,如果应用系统无法忍受延迟,而可以容忍少量的数据丢失,则设置为yes。如果应用系统无法忍受数据丢失,则设置为no。
no-appendfsync-on-rewrite no

如何恢复

正常恢复

       将文件放到dir指定的文件夹下,当redis启动的时候会自动加载数据,注意:aof文件的优先级比dump大

异常恢复
  • 有些操作可以直接到appendonly.aof文件里去修改。

    eg:使用了flushall这个命令,此刻持久化文件中就会有这么一条命令记录,把它删掉就可以了

  • 写坏的文件可以通过 redis-check-aof --fix进行修复

优势

  1. 根据不同的策略,可以实现每秒,每一次修改操作的同步持久化,就算在最恶劣的情况下只会丢失不会超过两秒数据。 ?????

    ​ 为什么最坏的情况 丢失两秒数据? 和 刷新内存到硬盘的 方法 flushAppendOnlyFile 实现逻辑有关

    参考 (20条消息) Redis源码解析(8) AOF持久化_李兆龙的博客-CSDN博客

image-20210916151401560
  1. 当文件太大时,会触发重写机制,确保文件不会太大。
  2. 文件可以简单的读懂

劣势

  1. aof文件的大小太大,就算有重写机制,但重写所造成的阻塞问题是不可避免的
  2. aof文件恢复速度慢

总结

  1. 如果你只希望你的数据在服务器运行的时候存在,可以不使用任何的持久化方式

  2. 一般官方建议同时开启两种持久化方式。AOF进行数据的持久化,确保数据不会丢失太多,而RDB更适合用于备份数据库,留着一个做万一的手段。

  3. 性能建议:

    因为RDB文件只用做后备用途,建议只在slave上持久化RDB文件,而且只要在15分钟备份一次就够了,只保留900 1这条规则。

    如果Enalbe AOF,好处是在最恶劣情况下也只会丢失不超过两秒数据,启动脚本较简单只load自己的AOF文件就可以了。代价:1、带来了持续的IO;2、AOF rewrite的最后将rewrite过程中产生的新数据写到新文件造成的阻塞几乎是不可避免的。只要硬盘许可,应该尽量减少AOF rewrite的频率,AOF重写的基础大小默认值64M太小了,可以设到5G以上。默认超过原大小100%大小时重写可以改到适当的数值。

    如果不Enable AOF,仅靠Master-Slave Replication 实现高可用性也可以。能省掉一大笔IO也减少了rewrite时带来的系统波动。代价是如果Master/Slave同时宕掉,会丢失10几分钟的数据,启动脚本也要比较两个Master/Slave中的RDB文件,载入较新的那个。新浪微博就选用了这种架构。

6.3.3 注意事项

同时开启RDB和AOF的注意事项:

如果同时开启了AOF和RDB持久化,那么在Redis宕机重启之后,需要加载一个持久化文件,优先选择AOF文件。

如果先开启了RDB,再次开启AOF,如果RDB执行了持久化,那么RDB文件中的内容会被AOF覆盖掉。

6.4 Redis的主从架构

单机版 Redis存在读写瓶颈的问题 110000/s 的 读 81000/s 的写

主从架构
1586918773809

指定yml文件

yml
version: "3.1"
services:
  redis1:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis1
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7001:6379
    volumes:
      - ./conf/redis1.conf:/usr/local/redis/redis.conf
      - ./data:/data
    command: ["redis-server","/usr/local/redis/redis.conf"]
  redis2:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis2
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7002:6379
    volumes:
      - ./conf/redis2.conf:/usr/local/redis/redis.conf
      - ./data:/data
    links:
      - redis1:master
    command: ["redis-server","/usr/local/redis/redis.conf"]
  redis3:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis3
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7003:6379
    volumes:
      - ./conf/redis3.conf:/usr/local/redis/redis.conf
      - ./data:/data
    links:
      - redis1:master
    command: ["redis-server","/usr/local/redis/redis.conf"]
sh
# redis2和redis3从节点配置
replicaof master 6379

主从结构: 读写分离 性能扩展 容灾恢复,但是还是存在单点故障问题,就是主节点挂掉 ,redis 服务不可用了,剩下的从节点无法进行写操作,咱们希望的是 主节点挂掉,从节点能够上位。

6.5 哨兵

哨兵可以帮助我们解决主从架构中的单点故障问题

添加哨兵
1586922159978

修改了以下docker-compose.yml,为了可以在容器内部使用哨兵的配置

yml
version: "3.1"
services:
  redis1:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis1
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7001:6379
    volumes:
      - ./conf/redis1.conf:/usr/local/redis/redis.conf
      - ./conf/sentinel1.conf:/data/sentinel.conf        # 添加的内容
    command: ["redis-server","/usr/local/redis/redis.conf"]
  redis2:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis2
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7002:6379
    volumes:
      - ./conf/redis2.conf:/usr/local/redis/redis.conf
      - ./conf/sentinel2.conf:/data/sentinel.conf        # 添加的内容
    links:
      - redis1:master
    command: ["redis-server","/usr/local/redis/redis.conf"]
  redis3:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis3
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7003:6379
    volumes:
      - ./conf/redis3.conf:/usr/local/redis/redis.conf
      - ./conf/sentinel3.conf:/data/sentinel.conf        # 添加的内容 
    links:
      - redis1:master
    command: ["redis-server","/usr/local/redis/redis.conf"]

准备哨兵的配置文件,并且在容器内部手动启动哨兵即可

# 哨兵需要后台启动
daemonize yes
# 指定Master节点的ip和端口(主)
sentinel monitor master localhost 6379 2
# 指定Master节点的ip和端口(从)
sentinel monitor master master 6379 2
# 哨兵每隔多久监听一次redis架构
sentinel down-after-milliseconds master 10000




#注意  所有相关的端口号需要在防火墙开启比如   7001   7002 7003    26379(是哨兵通信端口也得开 在哨兵配置文件可以看到)

在Redis容器内部启动sentinel即可

sh
redis-sentinel sentinel.conf

注意 启动redis 服务后 ,日志可能会显示和主节点同步数据异常,连接不上主节点 这可能是网络问题 此时 从节点无法读取主节点新写入的数据 稍等一会儿 让三个节点之间连接正常再进行演示

image-20220823100742165

哨兵模式 主节点 挂掉之后 在从机中选出新的主节点,哪个从机会被选为新的主节点? 根据 配置文件的

replica-priority 100 这个配置 这个值越小 优先级越高

6.6 Redis的集群

前面的主从复制 解决了 redis 的 读写压力问题 ,分出了两个从节点 分担了 读取数据的压力

哨兵 解决了单点故障的问题 当有一个 master 节点 挂掉了 自动投票选举 出来一个 master 保证 主从架构正常执行 但是 这种架构还有一种问题 就是 存储的数据有限 ,也就是当数据量很大的时候 一个redis 存不下,上面的 一个 redis 主节点 无法存储这么多的数据 此时 就需要集群了

Redis集群在保证主从加哨兵的基本功能之外,还能够提升Redis存储数据的能力。

Redis集群架构图
1586932636778

注意 这里的从节点 跟上面的一主二从的 从节点不一样 ,这里的从节点 只负责备份主节点的数据 不负责分担主节点的读压力,而是一直看主节点干活 如果发现主节点挂掉了 马上顶替主节点干活

那么 如果某个节点上的一些数据 查询频率太高 也可以针对某一个节点 搭建 读写分离的那种 主从

主从 哨兵 和 集群 是三个概念

集群中的节点 超过半数挂掉 那么 整个集群服务就不可用了 默认情况下

如果某一个主节点挂掉 那么从节点顶上来做主节点 这个过程 是由其他节点选举

如果某一个主从节点都挂掉了 redis集群是否还能提供服务要看下面的配置

cluster-require-full-coverage yes 那么 整个集群都挂掉 默认

cluster-require-full-coverage no 该节点不可用 该节点上的插槽都不能使用 其他节点正常使用

准备yml文件

yml
# docker-compose.yml
version: "3.1"
services:
  redis1:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis1
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7001:7001
      - 17001:17001
    volumes:
      - ./conf/redis1.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]
  redis2:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis2
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7002:7002
      - 17002:17002
    volumes:
      - ./conf/redis2.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]  
  redis3:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis3
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7003:7003
      - 17003:17003
    volumes:
      - ./conf/redis3.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]  
  redis4:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis4
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7004:7004
      - 17004:17004
    volumes:
      - ./conf/redis4.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]  
  redis5:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis5
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7005:7005
      - 17005:17005
    volumes:
      - ./conf/redis5.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]  
  redis6:
    image: daocloud.io/library/redis:5.0.7
    restart: always
    container_name: redis6
    environment:
      - TZ=Asia/Shanghai
    ports:
      - 7006:7006
      - 17006:17006
    volumes:
      - ./conf/redis6.conf:/usr/local/redis/redis.conf
    command: ["redis-server","/usr/local/redis/redis.conf"]

配置文件

sh
# redis.conf
# 指定redis的端口号
port 7001
# 开启Redis集群
cluster-enabled yes
# 集群信息的文件
cluster-config-file nodes-7001.conf
# 集群的对外ip地址
cluster-announce-ip 192.168.199.109
# 集群的对外port
cluster-announce-port 7001
# 集群的总线端口
cluster-announce-bus-port 17001

启动了6个Redis的节点。 切记 开启所有相关的端口号 重启防火墙 重启docker服务

随便跳转到一个容器内部,使用redis-cli管理集群

sh
redis-cli --cluster create 192.168.200.101:7001 192.168.200.101:7002 192.168.200.101:7003 192.168.200.101:7004 192.168.200.101:7005 192.168.200.101:7006 --cluster-replicas 1

-- 注意  上面的指令 只需要 执行一次 即可 ,执行一次 集群环境就搭建好了



redis-cli 操作集群
redis-cli -c -h 192.168.200.101 -p 7001     -c 实现自动重定向

6.7 Java连接Redis集群

使用JedisCluster对象连接Redis集群

java
@Test
public void test(){
    // 创建Set<HostAndPort> nodes
    Set<HostAndPort> nodes = new HashSet<>();
    nodes.add(new HostAndPort("192.168.199.109",7001));
    nodes.add(new HostAndPort("192.168.199.109",7002));
    nodes.add(new HostAndPort("192.168.199.109",7003));
    nodes.add(new HostAndPort("192.168.199.109",7004));
    nodes.add(new HostAndPort("192.168.199.109",7005));
    nodes.add(new HostAndPort("192.168.199.109",7006));

    // 创建JedisCluster对象
    JedisCluster jedisCluster = new JedisCluster(nodes);

    // 操作
    String value = jedisCluster.get("b");
    System.out.println(value);
}

七、Redis常见问题【重点


7.1 key的生存时间到了,Redis会立即删除吗?

不会立即删除。

  • 定期删除:Redis每隔一段时间就去会去查看Redis设置了过期时间的key,会再100ms的间隔中默认查看3个key。

  • 惰性删除:如果当你去查询一个已经过了生存时间的key时,Redis会先查看当前key的生存时间,是否已经到了,直接删除当前key,并且给用户返回一个空值。

为什么单线程的 redis 性能这么高????

数据库和redis的数据一致性问题???

7.2 Redis的淘汰机制

在Redis内存已经满的时候,添加了一个新的数据,执行淘汰机制。

  • volatile-lru:在内存不足时,Redis会在设置过了生存时间的key中干掉一个最近最少使用的key。
  • allkeys-lru:在内存不足时,Redis会在全部的key中干掉一个最近最少使用的key。
  • volatile-lfu:在内存不足时,Redis会再设置过了生存时间的key中干掉一个最近最少频次使用的key。
  • allkeys-lfu:在内存不足时,Redis会再全部的key中干掉一个最近最少频次使用的key。
  • volatile-random:在内存不足时,Redis会再设置过了生存时间的key中随机干掉一个。
  • allkeys-random:在内存不足时,Redis会再全部的key中随机干掉一个。
  • volatile-ttl:在内存不足时,Redis会在设置过了生存时间的key中干掉一个剩余生存时间最少的key。
  • noeviction:(默认)在内存不足时,直接报错。

指定淘汰机制的方式:maxmemory-policy 具体策略,设置Redis的最大内存:maxmemory 字节大小

7.3 缓存的常问题

7.3.1 缓存穿透问题

缓存穿透

个人理解:指查询一个一定不存在的数据,由于缓存是不命中的,将去查询数据库,但是数据库也没有这个数据,更糟糕的是我们没有把这次查询的 null 结果 写入缓存,这将导致这个不存在的数据每次查询 都走数据库。 缓存也就没有意义了

风险: 利用不存在的数据 进行攻击 数据库瞬时压力增大 可能导致崩溃

解决:将null 结果放入 缓存 ,最好 加入一个 过期时间

缓存穿透
1586949401099
7.3.2 缓存击穿问题

缓存击穿

个人理解:对于某些热点数据 可能设置了过期时间 或者 在 第一次访问之前缓存中没有 在某一刻的高并发访问 都没有命中缓存,都查询了数据库 这称之为 缓存击穿

解决方案:加锁

​ 预先设置下热点数据

缓存击穿
1586949585287
7.3.3 缓存雪崩问题

缓存雪崩

个人理解:我们在设置缓存的 key 时 采用了相同的过期时间 导致缓存在某一时刻同时失效 请求全部转到数据库 数据库瞬时压力大 造成雪崩

解决方案:设置key 的时候 在原有的过期时间上 加上一个 随机值 比如 1-5 分钟的随机值 这样 每一个缓存key 的过期时间 重复率就会降低

缓存雪崩
1586949725602
7.3.4 缓存倾斜问题

缓存倾斜

缓存倾斜
1586949955591

sb整合redis

1.创建sb工程 添加依赖

shell
    <!--整合redis-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--阿里巴巴fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.72</version>
        </dependency>
        
        <!--整么mp 第一步    添加下面的依赖-->
        <!--mysql-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>

        <!--mybatis-plus-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.2</version>
        </dependency>

        <!--mybatis-plus代码生成器-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.3.2</version>
            <exclusions>
                <exclusion>
                    <groupId>com.baomidou</groupId>
                    <artifactId>mybatis-plus-extension</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--模板   代码生成器需要使用模板进行生成-->
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity</artifactId>
            <version>1.7</version>
        </dependency>

        <!--mybatis-plus 扩展插件  比如  分页插件依赖-->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-extension</artifactId>
            <version>3.4.2</version>
        </dependency>

2.编写配置文件

yml
server:
  port: 8080
spring:
  #redis相关配置
  redis:
    # 配置redis的主机地址,需要修改成自己的
    host: 127.0.0.1
    port: 6379
    password:
    timeout: 5000
    jedis:
      pool:
        # 连接池中的最大空闲连接,默认值也是8。
        max-idle: 500
        # 连接池中的最小空闲连接,默认值也是0。
        min-idle: 50
        # 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)
        max-active: 1000
        # 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException
        max-wait: 2000

  datasource:
    password: 123456
    username: root
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/ssm?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true

mybatis-plus:
  configuration:
      log-impl: org.apache.ibatis.logging.stdout.StdOutImpl        #  查看sql输出日志
  global-config:
    db-config:
      id-type: auto
  mapper-locations: classpath:mapper/*.xml

3.封装一个简单的工具类

java
package com.glls.sbdemo14.redis;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @date 2022/11/10
 * @desc  一个 简单 redis 工具类    感受一下  模板类API的相关用法
 */

@Component

public class RedisService {
    
    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;


    /**
     * set redis: string类型
     * @param key key
     * @param value value
     */
    public void setString(String key, String value){

        ValueOperations<String, String> valueOperations = stringRedisTemplate.opsForValue();

        valueOperations.set(key,value);

    }

    /**
     * get redis: string类型
     * @param key key
     * @return
     */
    public String getString(String key){
        return stringRedisTemplate.opsForValue().get(key);
    }



    /**
     * set redis: hash类型
     * @param key key
     * @param filedKey filedkey
     * @param value value
     */
    public void setHash(String key, String filedKey, String value){
        HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();

        HashOperations hashOperations1 = redisTemplate.opsForHash();

        hashOperations.put(key,filedKey, value);


    }

    /**
     * get redis: hash类型
     * @param key key
     * @param filedkey filedkey
     * @return
     */
    public String getHash(String key, String filedkey){
        return (String) stringRedisTemplate.opsForHash().get(key, filedkey);
    }



    /**
     * set redis:list类型
     * @param key key
     * @param value value
     * @return
     */
    public long setList(String key, String value){
        ListOperations<String, String> listOperations = stringRedisTemplate.opsForList();

        return listOperations.leftPush(key, value);
    }

    /**
     * get redis:list类型
     * @param key key
     * @param start start
     * @param end end
     * @return
     */
    public List<String> getList(String key, long start, long end){
        return stringRedisTemplate.opsForList().range(key, start, end);
    }


}

4.redisTemplate 和 stringRedisTemplate

4.1两者关系是StringRedisTemplate继承RedisTemplate

1.从StringRedisTemplate源码即可看出,如下图所示:

image-20221110114823262

  2. 两者的数据是不共通的,也就是说StringRedisTemplate只能管理StringRedisTemplate里面的数据,RedisTemplate只能管理RedisTemplate中的数据
  3. 使用的序列化类不同。
     使用的序列化哪里不同?如下所示:
     (1)RedisTemplate使用的是JdkSerializationRedisSerializer 存入数据会将数据先序列化成字节组然后再存入Redis数据库。
     (2)StringRedisTemplate使用的是StringRedisSerializer。

使用注意事项:

(1)当你的Redis数据库里面本来存的是字符串数据或者是你要存取的数据就是字符串类型数据的时候,那么你就使用StringRedisTemplate即可; (2)但是如果你的数据是复杂的对象类型,而取出的时候又不想做任何数据转换,直接从Redis里面取出一个对象,那么使用RedisTemplate是更好的选择; (3)RedisTemplate中存取数据都是字节数组。当Redis存入的数据是可读形式而非字节数组时,使用RedisTemplate取值的时候会无法获取导出数据,获得的值为null。可以使用StringRedisTemplate试试;

4.2RedisTemplate定义了5种数据结构操作

redisTemplate.opsForValue();//操作字符串 redisTemplate.opsForHash();//操作hash redisTemplate.opsForList();//操作list redisTemplate.opsForSet();//操作set redisTemplate.opsForZSet();//操作有序set

4.3StringRedisTemplate常用操作

stringRedisTemplate.opsForValue().set("test", "100",60*10,TimeUnit.SECONDS);//向redis里存入数据和设置缓存时间

stringRedisTemplate.boundValueOps("test").increment(-1);//val做-1操作

stringRedisTemplate.opsForValue().get("test")//根据key获取缓存中的val

stringRedisTemplate.boundValueOps("test").increment(1);//val +1

stringRedisTemplate.getExpire("test")//根据key获取过期时间

stringRedisTemplate.getExpire("test",TimeUnit.SECONDS)//根据key获取过期时间并换算成指定单位

stringRedisTemplate.delete("test");//根据key删除缓存

stringRedisTemplate.hasKey("546545");//检查key是否存在,返回boolean值

stringRedisTemplate.opsForSet().add("red_123", "1","2","3");//向指定key中存放set集合

stringRedisTemplate.expire("red_123",1000 , TimeUnit.MILLISECONDS);//设置过期时间

stringRedisTemplate.opsForSet().isMember("red_123", "1")//根据key查看集合中是否存在指定数据

stringRedisTemplate.opsForSet().members("red_123");//根据key获取set集合

5.应用案例

1.redisTemplate的应用

java
@Override
    public List<User> findUsers2() {
        // 先从redis 中查找数据  如果redis  中有数据 就表示 缓存命中   则直接返回
        // redis 中的数据即可
        // 如果 redis 中 没有要查找的数据    那就表示缓存没有命中   则继续查询数据库
        // 把查询数据库的结果 放入redis 中  供下次查询使用

        //1,先从redis 获取数据
        List<User>  userList = (List<User>) redisTemplate.opsForValue().get("userList");

        if(userList!=null){
            //缓存命中
            System.out.println("缓存命中,不需要查询数据库");
            return userList;
        }else{
            // 缓存未命中
            System.out.println("缓存没有命中,需要查询数据库");
            List<User> users = userMapper.findUsers();

            //把数据库查询出来的数据 放入redis  供下次使用
            redisTemplate.opsForValue().set("userList",users);
            return users;
        }
    }

2.stringRedisTemplate 的应用

java
@Override
    public List<User> findUsers3() {

        String userList = stringRedisTemplate.opsForValue().get("userList");

        if(userList!=null){
            // 缓存命中
            System.out.println("缓存命中,不查询数据库");
            List<User> users = JSON.parseArray(userList, User.class);
            return users;
        }else{
            System.out.println("缓存没有命中 需要查询数据库");
            List<User> users = userMapper.findUsers();
            String jsonString = JSON.toJSONString(users);
            stringRedisTemplate.opsForValue().set("userList",jsonString);
            return users;
        }

    }

3.jemeter 对比 使用redis 和 不使用jedis 接口的吞吐量差别

​ 注意:jemeter 的 简单使用

​ 关闭mybatis/mp 的 缓存

4.讲解 缓存击穿、缓存穿透 、缓存雪崩、缓存倾斜相关概念 ,演示缓存击穿 和 解决方案

java
@Override
    public List<User> findUsers3() {

        String userList = stringRedisTemplate.opsForValue().get("userList");

         if(userList==null){

             synchronized (this){
                 System.out.println("进锁了");
                 userList = stringRedisTemplate.opsForValue().get("userList");

                 if(userList!=null){
                     System.out.println("缓存命中");
                     List<User> users = JSON.parseArray(userList, User.class);
                     return users;
                 }

                // 睡10秒
                 try {
                     TimeUnit.SECONDS.sleep(10);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }


                 System.out.println("缓存未命中");
                 List<User> users = userMapper.findUsers();
                 String jsonString = JSON.toJSONString(users);
                     stringRedisTemplate.opsForValue().set("userList",jsonString);
                     return users;

             }
         }else{
             System.out.println("缓存命中,不查询数据库");
                 List<User> users = JSON.parseArray(userList, User.class);
                 return users;
         }

    }

5.由单进程的缓存击穿 引入 分布式环境下 本地锁失效的问题 引入 setnx 指令

image-20221111092100556
java
@Override
    public List<User> findUsers4() {
        String userList = stringRedisTemplate.opsForValue().get("userList");

        if(userList!=null){
            // 缓存命中
            List<User> users = JSON.parseArray(userList, User.class);
            return users;
        }else{
            //缓存没命中   占坑
            // 占坑成功  返回 true   占坑失败  返回false
            Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");

            //占坑失败的人 进while 循环
            while (!lock){     // 类似自旋锁的效果  不是真正的自选

                userList = stringRedisTemplate.opsForValue().get("userList");
                if(userList!=null){
                    //从缓存取到了数据  返回
                    List<User> users = JSON.parseArray(userList, User.class);
                    return users;
                }
                // 没有进if 说明 占坑成功的那个人 还没有把数据放进去
                // 进入下次循环  继续尝试从缓存取数据
            }

            // 站着茅坑 在里面 待了10秒
            try {
                TimeUnit.SECONDS.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            System.out.println("先来的人 占坑成功  查询数据库");

                List<User> users = userMapper.findUsers();

                String jsonString = JSON.toJSONString(users);
                stringRedisTemplate.opsForValue().set("userList",jsonString);
                return users;
        }

    }

6.买手机案例 引出分布式锁的解决方案

6.1 synchronized本地锁
java
@RequestMapping("/buyPhone")
    public String buyPhone(Integer id){
        // 先查询手机的库存
        int stock = phoneService.getStockById(id);

        if(stock>0){
            synchronized (this){
                stock = phoneService.getStockById(id);
                if(stock>0){
                    try {
                        TimeUnit.SECONDS.sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    int result = phoneService.updateStockById(id);

                    if(result>0){
                        return "恭喜你,买到了";
                    }
                }
            }
        }
        return "对不起,已售罄";
    }
6.2 ReentrantLock 本地锁
java
ReentrantLock lock = new ReentrantLock();

    @RequestMapping("/buyPhone2")
    public String buyPhone2(Integer id){
        // 先查询手机的库存
        int stock = phoneService.getStockById(id);

        if(stock<=0){
            return "对不起,已售罄";
        }
        // 要是 把锁写在这里   每个线程都会有一把自己的锁  加了跟没加一样
        //ReentrantLock lock = new ReentrantLock();
        // 加锁
        lock.lock();
        try {
            stock = phoneService.getStockById(id);

            if(stock>0){
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                int result = phoneService.updateStockById(id);

                if(result>0){
                    return "恭喜你,买到了";
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();   // 解锁
        }
        return "对不起,已售罄";
    }
6.3 手动实现分布式锁
java
@Override
    public String buyPhone3(Integer id) {

        int stock = this.getStockById(id);
        if(stock<=0){
            return "商品已售罄";
        }

        //占坑 加锁    注意 这里的锁 用完了 一定要删  要不然 就成死锁了  所以 finally 有删除锁的动作
        //但是 如果 加锁成功  程序由于某些问题 没有执行到 删除锁的代码 还是会出现死锁  怎么办?
        //Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("mylock", "phonelock");
        // 优化的第一个动作  给锁设置过期时间   这样 的话  就算没有执行删除锁的动作 ,这个锁到期了 也会自动删除

        // 这里会出现的问题
        // 锁的过期时间小于业务的执行时间  会导致 线程安全问题 你正在里面干活呢 别人就闯进来了
        // 刚加上锁 还没来得及给锁设置过期时间  程序挂掉了 还是会出现死锁问题
        //stringRedisTemplate.expire("mylock",10,TimeUnit.SECONDS);  //给这锁设置10秒的过期时间


        // 优化的第二个动作  加锁和 给锁设置过期时间 需要具备原子性
       // Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("mylock","phonelock",3,TimeUnit.SECONDS);

        // 优化的第三个动作  防误删  比如 锁的过期时间是3秒 业务执行时间是5秒 就会出现第一个线程 删除的是第二个线程的锁
        // 所以在删除锁之前判断是不是自己的锁   那就需要 加锁的时候   给每个线程的锁 一个特殊的标记,这里每个线程的
        // 锁的 key 是 一样的,可以把value 设置为锁的标识 这里把value 设计为 uuid

        String lockKey= "mylock";
        String lockValue = UUID.randomUUID().toString();
        Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,3,TimeUnit.SECONDS);

        try {
            if(lock){
                // 第一个人 占坑成功
                stock = this.getStockById(id);

                if(stock>0){

                    int result = this.updateStockById(id);
                    return "恭喜你买到了";
                }else{
                    return "对不起,已售罄";
                }
            }else{
                // 后续的人在else 中 进行购买   仍然 要现获取锁 再进行买的动作  ,但是 注意 这里要等待别人把
                // 锁释放了  才能继续获取
                //但是 注意 不能直接占锁  因为 这个锁 别人可能还没有释放
                //stringRedisTemplate.opsForValue().setIfAbsent("mylock","phonelock");

                while (true){
                    // 第一个线程占锁成功  后续线程进入while 循环 尝试获取锁 执行购买操作
                    //lock = stringRedisTemplate.opsForValue().setIfAbsent("mylock","phonelock");

                    lock = stringRedisTemplate.opsForValue().setIfAbsent(lockKey,lockValue,3,TimeUnit.SECONDS);
                    if(lock){
                        // 占锁成功
                        stock = this.getStockById(id);
                        if(stock>0){
                            int result = this.updateStockById(id);
                            return "恭喜你买到了";
                        }else{
                            return "对不起,已售罄";
                        }
                    }
                }


            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 一定要有 释放锁的操作
            //stringRedisTemplate.delete("mylock");  // 解锁
            //删除的时候 要判断是不是自己的锁
            if(lockValue.equals(stringRedisTemplate.opsForValue().get(lockKey))){
                //如果 第一个线程 进入if  准备删除锁的时候 停了,第二个线程 占了锁 , 然后第一个线程
                //继续往下走 就又删掉了别人的锁  出现这个问题的 原因    删除操作  不具备原子性
                stringRedisTemplate.delete(lockKey);
            }
            // 删除操作也得具备原子性   这里 得使用lua 脚本   lua脚本的执行 具备原子性
            // 定义 lua 脚本
            String script="if redis.call('get' ,KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
            // 定义 脚本对象
            DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
            //封装上面的脚本文本片段
            redisScript.setScriptText(script);
            //设置返回值类型
            redisScript.setResultType(Long.class);
            // Arrays.asList(lockKey)  对应上面的  KEYS[1]
            // lockValue 对应上面的  ARGV[1]
            //执行 lua 脚本
            stringRedisTemplate.execute(redisScript, Arrays.asList(lockKey),lockValue);

        }


        return null;
    }

image-20221111093557675

else 中 自旋 尝试获取锁

image-20221111111620185

分布式锁的特征

1.互斥性 这里利用 setnx 在 任意时刻 只能一个线程 占坑成功

2.不能发生死锁 即使某个线程没有执行解锁操作 也要保证后续线程可以获取锁, 方案 就是 给锁设置过期时间

3.不能误删其他线程的锁 加锁 和 解锁必须是同一个线程 把 value 设置成 uuid 删的时候 判断下

4.加锁 和 解锁 必须具备原子性 这里使用了 lua 脚本

5.还需要考虑 锁的过期时间 和 业务的执行时间长短的问题 理论上 锁的过期时间 要大于 业务的执行时间 有一种看门狗机制

6.还需要考虑 锁放到了 redis ,redis 要是挂了怎么办?

image-20221111112901441

红锁 大家了解下 其工作原理,面试的时候 有同学遇到过。 10分钟 就能理解 就是一种 方案 思路 花几分钟了解下

上面 就是咱们手写的基于 redis 的 分布式锁 , 算是比较完善的 , 真正的 基于 redis 的分布式锁解决方案 redisson 底层 也是上面的实现流程。

当前 市面主流的分布式锁 解决方案

1.基于 redis 的解决方案 redisson 性能最好

2.基于 zookeeper 的 解决方案 可靠性最好

3.基于数据库的解决方案

6.4 基于redisson的分布式锁解决方案
java
添加依赖
    
  <!--以后使用redisson 作为所有分布式锁 分布式对象等功能框架-->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.12.0</version>
        </dependency>
    
配置类
@Configuration
public class MyRedissonConfig {


    /**
     * 所有对Redisson 的使用 都是通过RedissonClient对象
     * */
    @Bean(destroyMethod = "shutdown")        //指定销毁对象的方法  服务停止时 调用这个方法 进行销毁
    public RedissonClient redisson(){
        //1.创建配置    单redis节点模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        //2.根据配置 创建RedissonClient 实例
        return Redisson.create(config);
    }
}


@Autowired
    private RedissonClient redissonClient;    // redissonClient 可以得到 分布式锁对象


@Override
    public String buyPhone4(Integer id) {
        int stock = this.getStockById(id);
        if(stock<=0){
            return "商品已售罄";
        }
        //RLock 是 juc 的 Lock 接口的子接口  用法和 juc 先的锁一样
        RLock lock = redissonClient.getLock("redisson-lock");
        //lock.lock();   //1 底层还是占坑   这有一只看门狗 watch dog 看着你的业务执行时间  这种方式 锁的时间永远大于业务的执行时间

        lock.lock(10,TimeUnit.SECONDS);  //2 设置一个 10 秒的锁    企业中真正使用的是这种
        try {
            stock = this.getStockById(id);
            if(stock>0){
                TimeUnit.SECONDS.sleep(5);
                int i = this.updateStockById(id);
                return "恭喜你,买到了";
            }else{
                return "对不起,已售罄";
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁了
            lock.unlock();
        }

        return "对不起,已售罄";
    }

注意 redisson 提供的锁 底层 还是 占坑思想 而且 这个坑默认30秒 也就是 setnx 命令 如果一个key不存在就设置进去 并设定默认30秒有效期,如果30秒内 业务逻辑没走完 自动延长坑的时间,如果占坑线程由于某种原因 没有释放锁 比如 服务中断 ,redisson 会自动 释放锁。 避免死锁问题

测试运行 发现这个锁 能满足需求

公平锁 Fair Lock RLock fairLock = redisson.getFairLock("anyLock"); 了解

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么 后面的线程会等待至少25秒。

简单理解 就是 排着队抢锁 排队靠前的先拿到锁 默认 是非公平锁 就是所有线程 不排队 谁都有可能先抢到

读写锁 ReadWriteLock RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");

java
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

业务要读的时候 就加 读锁

业务要修改的时候 就加 写锁

A在修改数据 B要读取数据 B要等待A把锁释放了 才能读取数据,如果都是并发读数据 互不影响 ,并发写 会竞争锁 ,只要写锁存在 读锁就得等待

shell
/**
     * 读写锁的好处   能够保证 一定读到最新数据
     *  写锁 是一个排他锁 (互斥锁)
     *  读锁 是一个共享锁  加了跟没加一样
     *  只要写锁存在  就必须等待
     *  读+   相当于无锁  并发读   不会等待 会在redis 中记录当前的所有线程的读锁
     *  写+读     等待写释放
     *  写+写    阻塞模式
     *  读+写    正在读的时候   写也需要等待
     * */


    @GetMapping("/write")
    @ResponseBody
    public String writeValue(){
        RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

        String s="";

        RLock rLock = lock.writeLock();
        try {
            // 改数据  加写锁    读数据 读锁
            rLock.lock();
            s = UUID.randomUUID().toString();
            TimeUnit.SECONDS.sleep(15);
            redisTemplate.opsForValue().set("writeValue",s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            rLock.unlock();
        }

        return s;
    }


    @GetMapping("/read")
    @ResponseBody
    public String readValue(){
        RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");

        RLock rLock = lock.readLock();
        String value="";
        try{
            rLock.lock();  // 读锁
            value = redisTemplate.opsForValue().get("writeValue");

        }catch (Exception e){
            e.printStackTrace();
        }finally {
            rLock.unlock();
        }


        return value;
    }

**闭锁 ** CountDownLatch

基于Redisson的Redisson分布式闭锁(CountDownLatch)Java对象RCountDownLatch采用了与java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他线程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
java
    /**
     * 放假 锁学校大门
     * 
     * 5个班全部走完  我们可以锁大门
     * */

@GetMapping("/lockDoor")
    @ResponseBody
    public String  lockDoor() throws InterruptedException {
        RCountDownLatch door = redisson.getCountDownLatch("door");

        door.trySetCount(5);   // 5个班

        door.await();  //  等待闭锁都完成

        return "放假了";
    }

    @GetMapping("/gogogo/{id}")
	@ResponseBody
    public String gogogo(@PathVariable("id") Long id){

        RCountDownLatch door = redisson.getCountDownLatch("door");

        door.countDown();   // 计数减一

        return id+"班的人都走了";
    }

信号量 Semaphore

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)反射式(Reactive)RxJava2标准的接口

shell
/**
     * 车位停车
     * 3车位

     *  利用信号量的特性   可以进行   限流操作
     *  比如 系统只能供10000 个线程访问  ,就可以分配10000 信号量
     *
     * */
    @GetMapping("/park")
    @ResponseBody
    public String park() throws InterruptedException {
        //先在 redis设置键值:   set park 3
        RSemaphore park = redisson.getSemaphore("park");

       // park.acquire();  // 获取一个信号 获取一个值 占一个车位    可以执行三次  车位占完  就没法继续往下执行了   需要释放

        boolean b = park.tryAcquire();   // 尝试 获取信号量   获取到返回 true  获取不到  返回false
		// 限流操作
		if(b){
			// 车位够  执行业务
		}else{
		   // 车位不够  执行限流 
		   
		}
        return "ok" + b;

    }

    @GetMapping("/go")
    @ResponseBody
    public String go(){
        RSemaphore park = redisson.getSemaphore("park");

        park.release();   // 释放一个车位

        return "ok";
    }

总结:分布式锁使用 redisson解决