【Redis 分布式锁】

06-21 1633阅读


  • 什么是分布式锁
  • 实现分布式锁注意的点
  • Redis 分布式锁原理
  • Redis 实现分布式锁注意的点及解决方案
  • Redisson 实现分布式锁源码解析
  • Redis 红锁实现原理

    一、什么是分布式锁

    不同的进程需要以互斥的方式来访问共享资源,这里实现互斥就是分布式锁。

    简单来说就是:同一时间只有一个客户端对共享资源操作。举个实际例子,抢购茅台,如果不加锁就会发生超卖的事故。

    【Redis 分布式锁】

    分布式锁实现方式有很多中, mysql,zookeeper,redis,本文主要讲 redis 的实现方式。

    二、实现分布式锁需要注意的点

    1. 互斥性:在任何时刻,只有一个客户端获得锁。
    2. 无死锁:任何时候都能获取锁,即使客户端崩溃或者或被分区。
    3. 正确性:解铃还须系铃人,客户端A加的锁只能由客户端A解锁,其他客户端不能解锁。
    4. 容错:只要大部分Redis节点处于运行状态,客户端就能够获取和释放锁。

    三、Redis 分布式锁原理

    Redis 加锁主要是使用set (redis.io/commands/se…) 命令操作:

    SET key value [EX seconds|PX milliseconds|KEEPTTL][NX|XX] [GET]

    • EX – 设置指定的过期时间,以秒为单位。

    • PX – 设置指定的过期时间,以毫秒为单位。

    • NX – 仅当该键不存在的时才会设置该键。

    • XX – 仅当该键存在时才会设置该键。

      加锁命令: SET lock_key lock_value PX 10000 NX

      只有当 lock_key 不存在时才会设置 lock_key 和 lock_value,超时时间 10000 毫秒,设置成功返回 OK:

      【Redis 分布式锁】

      当 lock_key 存在时返回 nil:

      【Redis 分布式锁】

      Redis 释放锁使用命令: DEL key(redis.io/commands/de…)

      解锁命令:DEL lock_key。

      Redis 在 2.6.12 之后的版本才加入 [EX seconds|PX milliseconds. [NX|XX. 这些参数

      【Redis 分布式锁】

      在此之前使用SETNX (redis.io/commands/se…) SETNX 是 “SET if Not eXists” 的缩写。

      SETNX 返回 1 说明设置成功, 返回 0 说明设置失败。

      SETNX 和 EXPIRE 操作之间不是原子性的,如果 SETNX 执行成功之后, 没有执行 EXPIRE 命令,就可能会发生死锁。

      【Redis 分布式锁】

      Redis 官网声明 SETNX 在将来的版本中可能会被弃用,因为 SETNX实现的功能set都能实现。

      【Redis 分布式锁】

      四、Redis 实现分布式锁注意的点及解决方案

      1. 防死锁

        设置锁和设置锁的超时时间要保持原子性,这点很容易做到 使用 SET lock_key lock_value PX 10000 NX 命令即可, 不要使用 SETNX lock_key lock_value,EXPIRE lock_key 10 这些命令,因为他们之间不是原子性的,有发生死锁的风险。

      2. 合理设置锁超时时间

        锁的超时时间要大于程序执行的时间,否则多个客户端可能同时获取锁。充分预估使用锁的业务代码执行时间,该时间不宜过长也不宜过短,过短,可能使锁发生错误;过长,客户端异常时可能会影响执行效率。

      3. 释放锁要及时

        客户端使用完共享资源之后要及时的释放锁,即使在程序发生异常,Java 中一般都是在 finally 里释放锁。

      4. 只能释放自己加的锁

        在释放锁的时要确保这个锁是自己的,不能将其他锁释放掉,这样可能导致多个客户端同时获取锁。可以通过判断 lock_value 的值是否相等来判断是否是自己加的锁,lock_value 的值可以使用 UUID 或者任意确定唯一的值。

      5. 释放锁要保证原子性

        客户端在释放锁时分两个步骤,一要比较锁的值是否相等,二要删除锁(DEL key),这两个步骤要保证原子性,否则的话可能导致将其他锁释放掉,画个图解释下:

      【Redis 分布式锁】

      1. 客户端A 设置 lock_order 锁成功,锁值为123uD,超时间为10000ms。
      2. 客户端A 业务代码执行完成,释放锁前需要获取 lock_order 锁的值。
      3. 客户端A 判断锁值是否是123uD,执行缓慢。
      4. 客户端A 的锁超时时间已到,Redis 自动移除了锁。
      5. 此时客户端B 设置锁,lock_order 锁不存在,所以加锁成功。
      6. 客户端A 判断锁值相等,执行del 释放锁,此时客户端A 释放的锁是客户端B 的而不是自己的,锁出现错误。

      这也好解决,Redis 提供了EVAL(redis.io/commands/ev…) 命令去解析 Lua 脚本,可以发一段 Lua 脚本给 Redis 执行:

      luaif redis.call("get",KEYS[1]) == ARGV[1] -- 判断锁的值是否相等。 KEYS[1], ARGV[1],是指传入的参数,以上面为例,KEYS[1] 指的是 lock_order,ARGV[1] 指的是 123uD, 
      then
          return redis.call("del",KEYS[1])    -- 删除这个 key,返回删除 key 的个数
      else
          return 0                            -- 锁值不相等返回 0
      end
      

      这样就可以保证原子执行了。

      五、基于Set命令实现 Redis 分布式锁

      基于 Redisson 客户端实现 Redis 分布式锁:

      /**
       * 加锁利用 set(key, value, "PX", "NX") 函数实现
       * 解锁利用 Lua 脚本实现
       * 

      * Created by jie.li on 2021/1/4 7:50 下午 */ @Component public class RedisLock1 { @Resource private RedissonClient redissonClient; /** * 尝试加锁 * * @param name lock name * @param value lock value * @return true 加锁成功, false 加锁失败 */ public boolean tryLock(String name, String value) { RBucket bucket = redissonClient.getBucket(name); // 执行的是 set(key, value, "PX", "NX") 命令 return bucket.trySet(value, 10000, TimeUnit.MILLISECONDS); } /** * 解锁 * * @param name lock name * @param value lock value */ public void unLock(String name, String value) { redissonClient.getScript().eval(RScript.Mode.READ_WRITE, DEL_LOCK_SCRIPT, RScript.ReturnType.INTEGER, Collections.singletonList(name), value); } // 解锁脚本 private static final String DEL_LOCK_SCRIPT = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then" + // 如果 KEYS[1] 对应的 Value 值等于 ARGV[1] " return redis.call(\"del\",KEYS[1])" + // 删除 KEYS[1] " else" + // 否则 " return 0" + // 返回 0 " end;"; }

      测试代码:

      /**
       * 测试手动加锁解锁
       * 

      * Created by jie.li on 2021/1/7 2:54 下午 */ @Service public class RedisLockTestService { @Resource private RedisLock1 redisLock1; private int i = 50; /** * 测试手动实现 redis 分布式锁 * * @return int */ public int biz() { String lockName = "redis:lock:1"; String lockValue = UUID.randomUUID().toString(); try { boolean b = redisLock1.tryLock(lockName, lockValue); if (b) { if (i > 0) { i--; } } } catch (Exception e) { e.printStackTrace(); } finally { redisLock1.unLock(lockName, lockValue); } return i; } }

      @Test
      public void testBiz() throws InterruptedException {
          CountDownLatch countDownLatch = new CountDownLatch(200);
          for (int i = 0; i  {
                  try {
                      countDownLatch.await();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
                  int i1 = redisLockTestService.biz();
                  System.out.println(Thread.currentThread().getName() + " -> " + i1);
              }, "Thread" + i).start();
              countDownLatch.countDown();
          }
          TimeUnit.SECONDS.sleep(5);
      }
      

      六、Redisson 实现分布式锁

      Redisson 实现锁简介

      Redisson 实现的分布式锁相对于我们自己实现的锁更加完善,主要有以下两点:

      1、可重入

      2、锁重试

      3、锁自动延期(看门狗机制)

      Redisson 锁的依赖图:

      【Redis 分布式锁】

      Redisson 实现了很多种类型的锁,所有的锁都实现了 JUC 中的 Lock 接口,并且做了扩展( RLock ), 所以使用方法和使用 ReentrantLock 差不多。这里我们只针对 RedissonLock 进行讲解。

      Redisson 源码解析

      尝试加锁
      // waitTime  等待获取锁的时间
      // leaseTime 锁的有效期
      // unit      使用的时间单位
      @Override
      public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
          long time = unit.toMillis(waitTime);
          long current = System.currentTimeMillis();
          long threadId = Thread.currentThread().getId();
          // 1、尝试加锁,如果当前有锁,返回锁的剩余时间ttl,否则返回空
          Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
          // lock acquired
          // 2、加锁成功,返回true
          if (ttl == null) {
              return true;
          }
          // 剩余的等待时间 waitTime
          time -= System.currentTimeMillis() - current;
          // 剩余等待时间已过
          if (time 
              // 获取锁失败
              acquireFailed(waitTime, unit, threadId);
              return false;
          }
          
          current = System.currentTimeMillis();
          // 3、订阅锁释放事件。利用semaphore(信号量),订阅(Redis 发布订阅)锁的释放事件,
          // 锁释放后立即通知等待的线程竞争获取锁。 
          RFuture
              if (!subscribeFuture.cancel(false)) {
                  subscribeFuture.onComplete((res, e) - {
                      if (e == null) {
                          unsubscribe(subscribeFuture, threadId);
                      }
                  });
              }
              acquireFailed(waitTime, unit, threadId);
              return false;
          }
          try {
              time -= System.currentTimeMillis() - current;
              // 剩余等待时间已过,加锁失败
              if (time 
                  acquireFailed(waitTime, unit, threadId);
                  return false;
              }
              // 5、继续以同样的方式获加锁,如果过了最大的等待加锁时间,则加锁失败,返回false
              while (true) {
                  long currentTime = System.currentTimeMillis();
                  ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                  // lock acquired
                  if (ttl == null) {
                      return true;
                  }
                  time -= System.currentTimeMillis() - currentTime;
                  if (time 
                      acquireFailed(waitTime, unit, threadId);
                      return false;
                  }
                  // waiting for message
                  currentTime = System.currentTimeMillis();
                  // 6、通过信号量(共享锁)阻塞,等待释放锁消息
                  // 锁剩余时间小于剩余的waitTime时间
                  if (ttl = 0 && ttl  
      
      protected RFuture renewExpirationAsync(long threadId) {
          return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +  // 如果存在指定的 key 和 filed 
                     "redis.call('pexpire', KEYS[1], ARGV[1]); " +                // 续期
                     "return 1; " +// 返回续期成功
                     "end; " +
                     "return 0;",  // 返回续期失败
                      Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
      }
      
      // 看门狗超时时间默为 30s, 自定义的话可以修改 lockWatchdogTimeout 配置
      this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
      private long lockWatchdogTimeout = 30 * 1000;
      

      锁自动续期总结:

      1. 在没有指定锁超时时间(leaseTime)的情况下,加锁成功后就会执行自动续期。
      2. 如果当前线程持有的是重入锁,则对锁重入次数+1,如果是首次加锁,除了锁次数+1还需要执行锁续期。这里需要清楚是只有首次加锁才会续期,重入锁不会执行续期操作。将锁对应的线程Id及重入次数放入对象ExpirationEntry中,ExpirationEntry对像使用 LinkedHashMap维护了锁的线程Id和重入计数器。然后将ExpirationEntry对象放EXPIRATION_RENEWAL_MAP(ConcurrentHashMap),EXPIRATION_RENEWAL_MAP中存放着所有需要续期的锁。
      3. 新建一个延迟任务,10s(默认)之后执行,在EXPIRATION_RENEWAL_MAP中取出ExpirationEntry对象,拿到第一个线程Id,然后执行Lua脚本,检查线程Id对应的 key 和 filed 是否存在(锁),如果存在则重置锁的超时时间为30s(默认),如果不存在则说明已经解锁了不需要续期。
      4. 续期成功后,继续递归调用步骤3,保证持续锁续期,续期失败则说明锁已经不存在了,停止续期。

      当服务宕机时,看门狗的线程也就不存在了,此时也就不会对锁进行自动续期,到了 30s 锁就会自动过期,其他线程就可以获取锁了,不会造成死锁。

      解锁
      protected RFuture unlockInnerAsync(long threadId) {
          return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                  "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +                  // 如果锁不存在
                          "return nil;" +       // 解锁失败
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +    // 否则将锁的重入计数器-1
                          "if (counter > 0) then " +                                           // 如果重入计数器>0
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +                        // 将锁续期 30s
                          "return 0; " +        // 返回成功
                          "else " + 
                          "redis.call('del', KEYS[1]); " +                                     // 否则删除锁    
                          "redis.call('publish', KEYS[2], ARGV[1]); " +                        // 发布解锁消息
                          "return 1; " +																			                 // 返回解锁成功
                          "end; " +
                          "return nil;",        // 解锁失败
                  Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
      }
      

      流程总结:

      1. 判断锁是否存在,如果锁不存在直接返回null。
      2. 重入计数器减一(因为支持重入锁的缘故,这里不能直接将锁删除)。
      3. 如果重入计数器还是大于零,说明线程还是持有锁的,将锁续期 30s,返回成功。
      4. 否则删除锁,并且发送删除锁的消息(channelName:redisson_lock__channel:{锁key值}),以通知阻塞队列中的线程尝试加锁。
      5. 返回解锁成功。
      总结
      • RedissonLock 实现了锁等待(waitTime),锁重入,锁自动续期等复杂功能。

      • RedissonLock 实现的分布式锁使用的是 Hash 数据结构,其中 Hash key 是我们指定锁的 key 值, filed 是 UUID:threaId,value 是重入锁次数。其中 UUID 是 Redisson 客户端连接管理器实例初始化生成的 UUID。使用 Hash 数据结构,是实现锁重入的关键。

      • RedissonLock 加锁,解锁,看门狗都是用了 Lua 脚本,保证命令执行的原子性。

      • RedissonLock 实现锁等待时间(waitTime)不是使用的 while(true) 手段,而是使用的 Redis 发布订阅,semaphore(信号量)实现的,解决了无效锁申请造成的系统资源浪费问题。

        具体实现是使用 semaphore 进行带期限的阻塞线程,当锁释放时会发布锁释放的消息,收到解锁消息后调用 release() 方法,此时被 semaphore 阻塞的等待队列中的一个线程就可以尝试获取锁了,如果在指定期限内未获得锁,则获取锁失败。

      • 只有未设置锁超时时间(leaseTime),才能使用 Redisson 看门狗机制。

        Redis 高可用架构下的分布式锁问题

        上面讲 Redisson 实现的分布式锁,在单机模式下已经趋近完美了。

        但是单点的话故障的话,那就芭比Q了,所以我们第一点想到的是部署高可用集群。

        目前 Redis 高可用架构主要有主从模式,哨兵模式,集群模式,在这三种模式下使用 Redis 分布式锁存在一个弊端,可能会导致多个客户端同时加锁成功。

        【Redis 分布式锁】

        客户端A加锁成功,由于Reids 主从同步数据是异步执行的,LockA 锁还没来的及同步到 Slave,此时 Master 节点宕机了。

        Slave 节点提升为 Master,客户端B来加锁,发现没有其他客户端占用锁,LockB 加锁成功。

        这时就导致了两个客户端同时获取了锁。

        所以,如果使用 Redis 分布式锁,应尽量避免主从、哨兵或集群模式。

        Redis 红锁(Redlock)

        Redlock 概念

        RedLock 是 Redis 作者提出的一个算法。

        Redlock 官网介绍

        在该算法的分布式版本中,我们假设有N个Redis masters。这些节点是完全独立的,所以我们不使用复制或任何其他隐式协调系统。我们已经描述了如何在单个实例中安全地获取和释放锁。我们想当然地认为,算法将使用这种方法在单个实例中获取和释放锁。在我们的示例中,我们设置了N=5,这是一个合理的值,因此我们需要在不同的计算机或虚拟机上运行5个Redis主机,以确保它们以基本独立的方式失败。

        为了获取锁,客户端执行以下操作:

        1. 以毫秒为单位获取当前时间。
        2. 使用相同的key和随机值在所有Redis实例中顺序获取锁。当在每个实例中获取锁时,客户端使用一个超时,该超时与锁自动释放的总时间相比很小,以便获取它。例如,如果自动释放时间为10秒,则超时时间可能在5-50毫秒范围内。这可以防止客户端在尝试与已关闭的Redis节点通话时长时间处于阻塞状态:如果某个实例不可用,我们应该尽快尝试与下一个实例通话。
        3. 客户端通过从当前时间中减去在步骤1中获得的时间戳来计算获取锁所用的时间。当且仅当客户端能够在大多数实例(至少3个)中获取锁,并且获取锁所用的总时间小于锁有效时间,则认为已获取锁。
        4. 如果获得了锁,其有效时间将被视为初始有效时间减去经过的时间,如步骤3中计算的。
        5. 如果客户端由于某种原因(无法锁定N/2+1实例或有效期为负)未能获取锁,它将尝试解锁所有实例(即使是它认为无法锁定的实例)。

        Redis 作者对红锁的介绍非常详细,点击这里查看。

        简单总结下:

        假设有五个 Redis 实例,这些实例之间是完全独立的,并且部署在不同的计算机上,客户端尝试在这几个实例中获取锁。

        如果客户端能够在大多数实例(N/2+1,至少三个)中获取锁,并且获取锁所有的总时间小于锁有效时间,则认为获取锁成功。

        如果加锁成功,锁的有效期=初始有效时间-获取锁的总时间,假如锁有效期为 10s,获取锁共花了 2s,那么锁的有效期还剩 8s。

        无论客户端获取锁成功还是失败,都需要解锁所有 Redis 实例,以免发生死锁。

        【Redis 分布式锁】

        使用多个完全独立的 Redis 实例,解决了 Redis 主从异步复制造成的锁丢失问题,同时保障了高可用。

        至少N/2+1个实例加锁成功,保证锁的互斥性,防止多个客户端同时获取到锁。

        Redlock 存在问题

        表面上看 RedLock 解决 Redis 分布式锁的痛点,但是真的就万无一失了吗?

        有人就提出了质疑,Martin Kleppmann:How to do distributed locking

        Martin Kleppmann 在效率和正确性方面质疑了红锁,他认为如果是为了效率使用分布式锁,没有必要承担 Redlock 的成本和复杂性,最好还是使用一个 Reids 实例或者主从模式。正确性方面,他认为 Redlock 也绝对保证不了锁的正确性,文章在网络延迟,过程暂停(GC),时钟漂移方面给出了论证。

        Redis 作者(Salvatore)也反驳了该质疑:Is Redlock safe?

        建议大家读下上面两篇文章。

        我个人认为使用 Redlock 要慎重,首先,它的效率比较差,在一些 RT 要求比较高的接口中增加了耗时风险;其次,无法保证绝对的正确性,可能会出现多个客户端同时获取锁的风险(Martin Kleppmann 在他的文章里有举证);再次,成本和复杂性较高。

        Redisson 红锁使用

        使用示例:

        // 在不同Redis实例上获取 RLock
        RLock rLock1 = redisson1.getLock(key);
        RLock rLock2 = redisson2.getLock(key);
        RLock rLock3 = redisson3.getLock(key);
        // 初始化红锁
        RedissonRedLock redissonRedLock = new RedissonRedLock(rLock1, rLock2, rLock3);
        // 加锁
        redissonRedLock.lock();
        // 业务逻辑
        // 解锁
        redissonRedLock.unlock();
        

        【Redis 分布式锁】 Redisson 在新版本中已经弃用了 RedissonRedLock,不建议使用。

        上面说了这么多,我们发现程序无论怎样实现,都保证不了100%的稳定。我们尽力做的就是在99.999…%小数点后面多加几个9,让程序足够的稳定。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]