Redis实现分布式锁
前言
随着时代的发展,分布式系统的运用越来越多,而在分布式系统中,本地锁已经无法解决数据安全问题,分布式锁能够很好的解决这个问题.
一、分布式锁是什么?
在分布式系统中,由于多个节点同时访问一个资源,可能会出现脏数据、数据冲突等问题,分布式锁通过加锁、解锁的方式,保证在同一时刻只有一个节点能够访问该资源,从而避免了数据冲突和错误操作。分布式锁的实现方式有很多种,常见的包括基于Redis、Zookeeper、数据库等分布式系统的实现方式。这里主要介绍Redis的方式
二、本地锁示例
1.本地锁代码示例:
//controller层 @GetMapping("/testLock") public Result testLock(){ testService.testLock1(); return Result.ok(); } //service层 public synchronized void testLock1(){ String num = redisTemplate.opsForValue().get("num").toString(); if (!StringUtils.isEmpty(num)){ int i = Integer.parseInt(num); redisTemplate.opsForValue().set("num",String.valueOf(++i)); } }
2.开启两个相同的服务 模拟分布式(代码一致,端口号不一致)开启网关作为统一访问路径
进行负载均衡
3.利用ab进行网关压力测试
4.拿到redis中num的值
从上述实验可以发现:我们进行了1000次请求发送给网关,而num最终的值等于613,而不是我们想要看到的1000,因此可以发现,在分布式系统里,本地锁无法解决数据安全问题,这主要是由于分布式系统中存在多个节点,每个节点拥有自己的本地资源和本地锁。当多个请求同时访问同一份数据时,就会出现数据的并发访问和修改,而本地锁只能控制本地的并发访问,无法控制分布式系统中其他节点的并发访问
三、分布式锁的使用
1.前言:因为分布式集群系统微服务多分布在不同的机器上,这使得原来单机部署下的并发控制锁失效,单纯的javaAPI无法实现分布式锁,因此我们需要一种可以跨JVM的方式来控制共享数据的访问
可以利用Redis中的setnx操作来实现分布式锁
2.setnx有如下优点
2.1.setnx是一个原子性操作,只有一个客户端设置键值能成功,其他客户端再来设置,均会失效
2.2.在分布式环境下可以把setnx这个操作当作锁,如果一个客户端已经获取到锁,那么它将会返回true,就可以往下执行业务逻辑,在这个时候其他客户端又想来获取这把锁就会返回false
3.使用步骤
3.1.导入依赖 写配置文件
org.springframework.boot spring-boot-starter-data-redis org.apache.commons commons-pool2
spring: redis: host: 192.168.72.166 port: 6379 database: 0 timeout: 1800000 password: lettuce: pool: max-active: 20 #最大连接数 max-wait: -1 #最大阻塞等待时间(负数表示没限制) max-idle: 5 #最大空闲 min-idle: 0 #最小空闲
3.2.代码实现
//controller层 @GetMapping("/testLock") public Result testLock(){ testService.testRedisLock(); return Result.ok(); } //service层 @Autowired private RedisTemplate redisTemplate; public void testRedisLock() { //获取锁 //加上uuid防止误删除锁 String uuid = System.currentTimeMillis() + UUID.randomUUID().toString().replaceAll("-", ""); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS); //如果获取到锁执行步骤 //最后释放锁 if (lock) { String num = redisTemplate.opsForValue().get("num").toString(); if (!StringUtils.isEmpty(num)) { int i = Integer.parseInt(num); redisTemplate.opsForValue().set("num", String.valueOf(++i)); } else { return; } //在极端情况下仍然会误删除锁 //因此使用lua脚本的方式来防止误删除 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); defaultRedisScript.setScriptText(script); defaultRedisScript.setResultType(Long.class); redisTemplate.execute(defaultRedisScript, Arrays.asList("lock"), uuid); /* if (redisTemplate.opsForValue().get("lock").toString().equals(uuid)){ redisTemplate.delete("lock"); }*/ } else { //如果没有获取到锁 //重试 try { Thread.sleep(100); testRedisLock();; } catch (InterruptedException e) { e.printStackTrace(); } } }
3.3.结果截图
3.4.总结 :
由此可见,使用redis中setnx的方式实现了分布式锁,解决了数据安全问题
这里有三个问题需要注意:
3.4.1.为什么要将锁加上过期时间
示例代码:
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
当一个请求进入到该方法里面时,正在一行行的执行业务逻辑,如果在某一行出现了问题报了异常,而这时还没有将释放锁那一行代码执行完毕,这时就会导致锁无法释放,从而导致其他请求也无法进入.而在这时如果设置了过期时间,那么就会在时间到了之后自动释放锁,其他请求就也能获取锁了
3.4.2.为什么要给锁设置UUID
示例代码:
String uuid = System.currentTimeMillis() + UUID.randomUUID().toString().replaceAll("-", ""); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 10, TimeUnit.SECONDS);
这是为了防止误删锁的发生,比如业务逻辑执行的时间是7s,而锁的过期时间是3s,现在有A1和A2两个请求,A1正在执行业务逻辑,还没有执行到释放锁的那一行代码时,锁的过期时间已经超过3s了,这时A1的锁释放,A2就能拿到锁了,A2在执行的过程中,A1又执行之后的代码,最终导致A1进行了释放锁的操作,由于它们连接的是同一个redis,使用的锁的键名也相同,因此A1成功的将A2的锁给释放掉了。为了防止这种情况的发生,我们可以将每一把锁都设置唯一的UUID,这样在不同的请求到来时就会生成不同的UUID并将它存入redis中,在释放锁的时候,就可以根据UUID来判断是否是自己的锁来进行删除
3.4.3为什么要使用lua脚本
示例代码:
//在极端情况下仍然会误删除锁 //因此使用lua脚本的方式来防止误删除 String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" + "then\n" + " return redis.call(\"del\",KEYS[1])\n" + "else\n" + " return 0\n" + "end"; DefaultRedisScript defaultRedisScript = new DefaultRedisScript(); defaultRedisScript.setScriptText(script); defaultRedisScript.setResultType(Long.class); redisTemplate.execute(defaultRedisScript, Arrays.asList("lock"), uuid);
使用lua脚本也是为了保证原子性操作,保证判断uuid是否相等和释放锁一起执行,防止极端情况的发生,我们先来看使用lua脚本前的代码是如果进行防误删的:
if (redisTemplate.opsForValue().get("lock").toString().equals(uuid)){ redisTemplate.delete("lock");
这里所存在的问题是删除操作缺乏原子性,我们还保持和3.4.2一样的条件,有A1和A2两个请求,假如A1在执行完了上图的if代码后,锁因为超过过期时间而被释放了,这时A2获取到锁执行业务逻辑,生成了自己的uuid,在执行的过程中,A1又接着往下进行,尽管此时两者uuid已经不同,但是由于A1已经进行过if判断,所以可以直接删除掉A2的锁。因此我们需要用lua脚本的方式,将判断和删除合为一步,保证其原子性,这样就可以解决锁的误删问题,加强锁的健壮性
4.还可以使用Redisson实现分布式锁
4.1简单介绍:
Redisson是一个在Redis的基础上实现的Java驻内存数据网格,它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。提供了使用Redis的最简单和最便捷的方法。
4.2 使用步骤
4.2.1: 导入依赖并配置RedissonClient对象,配置文件上文已给出
org.redisson redisson 3.15.3 //配置redisson package com.atguigu.gmall.common.config; @Data @Configuration @ConfigurationProperties("spring.redis") public class RedissonConfig { private String host; private String password; private String port; private int timeout = 3000; private static String ADDRESS_PREFIX = "redis://"; /** * 自动装配 */ @Bean RedissonClient redissonSingle() { Config config = new Config(); if(StringUtils.isEmpty(host)){ throw new RuntimeException("host is empty"); } SingleServerConfig serverConfig = config.useSingleServer() .setAddress(ADDRESS_PREFIX + this.host + ":" + port) .setTimeout(this.timeout); if(!StringUtils.isEmpty(this.password)) { serverConfig.setPassword(this.password); } return Redisson.create(config); } }
4.2.2.实现代码
public void testRedissonLock() { //获取锁 RLock lock = redissonClient.getLock("lock"); //开始加锁 try { boolean tryLock = lock.tryLock(100, 10, TimeUnit.SECONDS); if (tryLock){ String num = redisTemplate.opsForValue().get("num").toString(); if (!StringUtils.isEmpty(num)) { int i = Integer.parseInt(num); redisTemplate.opsForValue().set("num", String.valueOf(++i)); } else { return; } } } catch (InterruptedException e) { e.printStackTrace(); }finally { if (lock.isLocked()){ lock.unlock(); } } }
总结
以上就是今天所总结的内容,主要学习的是分布式锁的实现,希望大神指正哪里有错误之处!!!