欢迎光临散文网 会员登陆 & 注册

【Java项目】高并发场景下,基于Redisson实现的分布式锁

2023-06-24 22:35 作者:美丽的程序人生  | 我要投稿

【Java项目】高并发场景下,基于Redisson实现的分布式锁

分布式锁应用场景

随着互联网应用的高速发展,在电商应用中高并发应用场景涉及很多,例如:

  • 秒杀:在大规模的秒杀场景中,需要保证商品数量、限制用户购买数量, 防止用户购买数量的超限、避免出现超卖情况;
  • 订单支付: 当用户下单付款时,需要对订单信息进行互斥操作以避免订单重复支付;
  • 提现操作:需要防止用户重复提现,避免造成财务损失。
  • 总结:分布式锁应用场景可以分为两类:
  • 1、共享资源的互斥访问:当多个节点需要对同一个共享资源进行操作时,需要确保同一时刻只有一个节点可以操作,此时就可以使用分布式锁;
  • 2、分布式任务调度:分布式系统往往需要对任务进行调度,确保任务在多个节点的协作下执行。而在并行的任务执行过程中,需要区分哪些任务已经被分配并且正在被执行,哪些任务没有被分配。利用分布式锁来保证任务的正确性、顺序性和稳定性。
  • 概括地说,就是对多线程下,对共享变量操作,线程间是变量不可见,导致出现并发问题,需要通过分布式锁来进行控制,今天就给大家通过案例,分享一下如何使用redisson实现分布式锁。

案例需求描述

库存中有200件商品,通过商品下单购买场景,使用分布式锁避免商品超卖问题。

Redisson环境准备

本地Redis环境安装

下载地址: https://github.com/tporadowski/redis/releases

1、windows下安装

默认端口:6379

下载连接 https://github.com/tporadowski/redis/releases

解压

双击redis-server.exe启动服务端

双击redis-cli.exe启动客户端连接服务端

在客户端输入 “ping”,出现“PONG”,即证明连接成功,部分配置可以在redis.conf文件修改;

Spring boot项目与redis集成

引入依赖

 <dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson-spring-boot-starter</artifactId>
  <version>3.22.0</version>
 </dependency>

创建redis连接池代码

 package com.zhc.config.redis;
 
 import org.redisson.Redisson;
 import org.redisson.api.RedissonClient;
 import org.redisson.client.codec.Codec;
 import org.redisson.codec.JsonJacksonCodec;
 import org.redisson.config.Config;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
 /**
  * redisson 连接池配置
  * @author zhouhengchao
  * @since 2023-06-19 20:29:00
  * @version 1.0
  */
 @Configuration
 public class RedisConfig {
 
  @Value("${spring.redis.host}")
  private String host;
 
  @Value("${spring.redis.port}")
  private String port;
 
  @Value("${spring.redis.database}")
  private Integer dataBase;
 
  @Bean(name = "redisTemplate")
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
    GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = serializer();
    RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
    // key采用String的序列化方式
    redisTemplate.setKeySerializer(StringRedisSerializer.UTF_8);
    // value序列化方式采用jackson
    redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
    // hash的key也采用String的序列化方式
    redisTemplate.setHashKeySerializer(StringRedisSerializer.UTF_8);
    //hash的value序列化方式采用jackson
    redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
    redisTemplate.setConnectionFactory(redisConnectionFactory);
    return redisTemplate;
  }
 
  /**
   * 此方法不能用@Ben注解,避免替换Spring容器中的同类型对象
   */
  public GenericJackson2JsonRedisSerializer serializer() {
    return new GenericJackson2JsonRedisSerializer();
  }
 
  @Bean
  public RedissonClient redissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://" + host + ":" + port).setDatabase(dataBase);
    // 设置redisson序列化方式,否则打开查看数据可能乱码
    Codec codec = new JsonJacksonCodec();
    config.setCodec(codec);
    return Redisson.create(config);
  }
 }
redis的yaml文件配置
 spring:
  redis:
  host: localhost
  port: 6379
  database: 0

扣减库存方法

 /**
   * 从redis中获取库存,扣减库存数量
   */
  private void reduceStock(){
    // 从redis中获取商品库存
    RBucket<Integer> bucket = redissonClient.getBucket(REDIS_STOCK);
    int stock = bucket.get();
    if (stock > 0) {
      // 库存-1
      stock--;
      // 更新库存
      bucket.set(stock, 2, TimeUnit.DAYS);
      log.info("扣减成功,剩余库存:" + stock);
    } else {
      log.info("扣减失败,库存不足");
    }
  }

基于synchronized加锁控制

 @GetMapping("/test01")
  public void test01(){
    for (int i = 0; i < 6; i++) {
      synchronized (this) {
        new Thread(this::reduceStock).start();
      }
    }
  }
   

我们通过了Synchronized锁,成功解决了多个线程争抢导致的超卖问题,但是有个问题,假设后期公司为了保证服务可用性。

将单击的应用,升级称为了集群的模式,那么是否会有超卖问题呢?

通过nginx搭建负载均衡

下载Nginx: http://nginx.org/download/nginx-1.18.0.zip

nginx.conf完整配置

 worker_processes 1;
 events {
  worker_connections 1024;
 }
 http {
  include   mime.types;
  default_type application/octet-stream;
  sendfile   on;
  keepalive_timeout 65;
  upstream redislock{
  server localhost:8081 weight=1;
  server localhost:8082 weight=1;
 }
  server {
    listen   80;
    server_name localhost;
    location / {
      root html;
      index index.html index.htm;
    proxy_pass http://redislock;
    }
    error_page 500 502 503 504 /50x.html;
    location = /50x.html {
      root html;
    }
  }
 
 }
 

启动nginx,双击nginx.exe文件即可;

访问应用:http://localhost/test01

发现存在超卖问题。

使用redis分布式锁

 @GetMapping("/test02")
  public void test02(){
    // 分布式锁名称,关键是多个应用要共享这一个Redis的key
    String lockKey = "lockDeductStock";
    // setIfAbsent 如果不存在key就set值,并返回1
    //如果存在(不为空)不进行操作,并返回0,与redis命令setnx相似,setIfAbsent是java中的方法
    // 根据返回值为1就表示获取分布式锁成功,返回0就表示获取锁失败
    Boolean lockResult = redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey);
    // 加锁不成功,返回给前端错误码,前端给用户友好提示
    if (Boolean.FALSE.equals(lockResult)) {
      log.info("系统繁忙,请稍后再试!");
      return;
    }
    reduceStock();
    // 业务执行完成,删除这个锁
    redisTemplate.delete(lockKey);
  }

1、主要使用setIfAbsent方法:如果不包含key就set值,并返回1;

如果存在(不为空)不进行操作,并返回0;

2、很明显,比get和set要好。因为先判断get,再set的用法,有可能会重复set值,与setnx类似。

以上redis加锁可以解决并发问题,但是存在问题:

1、如果setIfAbsent加锁成功,但是到业务逻辑代码时,该服务挂掉了,就会导致另一个服务一直获取不到锁,一直在等待中;

2、可以使用 redisTemplate.opsForValue().setIfAbsent(lockKey, lockKey,30,TimeUnit.SECONDS),设置锁的key过期时间,在规定时间后key过期就可以再获取。

redis分布式锁优化

以上分布式锁还是存在问题,如果锁的key过期时间与程序执行时间差问题,例如

  • 如果锁key在程序执行结束前过期,就会导致删除key失败;
  • 同时另一个应用获取了锁,又会被其他应用删掉锁,导致锁一直失效,存在并发问题。
  • 可以通过引入UUId来解决锁被其他应用勿释放问题,如下代码:
 @GetMapping("/test03")
  public void test03(){
    // 分布式锁名称,关键是多个应用要共享这一个Redis的key
    String lockKey = "lockDeductStock";
    // 分布式锁的值
    String lockValue = UUID.randomUUID().toString().replaceAll("-",   "");
    // setIfAbsent 如果不存在key就set值,并返回1
    //如果存在(不为空)不进行操作,并返回0,与redis命令setnx相似,setIfAbsent是java中的方法
    // 根据返回值为1就表示获取分布式锁成功,返回0就表示获取锁失败
    Boolean lockResult =   redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 30, TimeUnit.SECONDS);
    // 加锁不成功,返回给前端错误码,前端给用户友好提示
    if (Boolean.FALSE.equals(lockResult)) {
      log.info("系统繁忙,请稍后再试!");
      return ;
    }
    reduceStock();
    // 判断是不是当前请求的UUID,如果是则可以正常释放锁。如果不是,则释放锁失败!
    if (lockValue.equals(redisTemplate.opsForValue().get(lockKey)))   {
      redisTemplate.delete(lockKey);
    }
  }

还存在锁超时问题:锁超时问题,写一个定时任务,分线程每隔十秒去查看一次主线程是否持有这把锁,如果这个锁存在,重新将这个锁的超时时间设置为30S,对锁延时,比较复杂。

使用redisson实现分布式锁

 @GetMapping("/test04")
  public void test04(){
    // 分布式锁名称,关键是多个应用要共享这一个Redis的key
    String lockKey = "lockDeductStock";
    // 获取锁对象
    RLock redissonLock = redissonClient.getLock(lockKey);
    try {
      redissonLock.lock();
 //     boolean result = redissonLock.tryLock();
      // 加锁不成功,返回给前端错误码,前端给用户友好提示
 //     if (!result) {
 //       log.info("系统繁忙,请稍后再试!");
 //       return;
 //     }
      reduceStock();
    }
    finally{
      if(redissonLock.isHeldByCurrentThread()){
        redissonLock.unlock();
      }
    }
  }

redisson分布式锁原理图:


关键方法介绍:

  • lock() 方法是阻塞获取锁的方式,如果当前锁被其他线程持有,则当前线程会一直阻塞等待获取锁,直到获取到锁或者发生超时或中断等情况才会结束等待;
  • tryLock() 方法是一种非阻塞获取锁的方式,在尝试获取锁时不会阻塞当前线程,而是立即返回获取锁的结果,如果获取成功则返回 true,否则返回 false.
  • 总结:
  • lock()方法获取到锁之后可以保证线程对共享资源的访问是互斥的,适用于需要确保共享资源只能被一个线程访问的场景。Redisson 的 lock() 方法支持可重入锁和公平锁等特性,可以更好地满足多线程并发访问的需求;
  • tryLock() 方法支持加锁时间限制、等待时间限制以及可重入等特性,可以更好地控制获取锁的过程和等待时间,避免程序出现长时间无法响应等问题。
  • 在实际应用中需要根据具体场景和业务需求来选择合适的方法,以确保程序的正确性和高效性。
  • 视频中的内容如果对您有所帮助,请给个三连加关注的支持,欢迎在评论区留言讨论,后续会进一步完善文档。


【Java项目】高并发场景下,基于Redisson实现的分布式锁的评论 (共 条)

分享到微博请遵守国家法律