资讯专栏INFORMATION COLUMN

Redis分布式锁

LeoHsiun / 3179人阅读

摘要:之分布式锁的实现方案如何优雅地实现分布式锁博客地址分布式锁关键词分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

</>复制代码

  1. Redis之分布式锁的实现方案 - 如何优雅地实现分布式锁(JAVA)

  2. 博客地址 https://blog.piaoruiqing.cn/2019/05/19/redis分布式锁/

关键词

分布式锁: 是控制分布式系统之间同步访问共享资源的一种方式。

spring-data-redis: Spring针对redis的封装, 配置简单, 提供了与Redis存储交互的抽象封装, 十分优雅, 也极具扩展性, 推荐读一读源码

Lua: Lua 是一种轻量小巧的脚本语言, 可在redis执行.

前言

本文阐述了Redis分布式锁的一种简单JAVA实现及优化进阶, 实现了自动解锁、自定义异常、重试、注解锁等功能, 尝试用更优雅简洁的代码完成分布式锁.

需求

互斥性: 在分布式系统环境下, 一个锁只能被一个线程持有.

高可用: 不会发生死锁、即使客户端崩溃也可超时释放锁.

非阻塞: 获取锁失败即返回.

方案

Redis具有极高的性能, 且其命令对分布式锁支持友好, 借助SET命令即可实现加锁处理.

</>复制代码

  1. SET

  2. EX seconds -- Set the specified expire time, in seconds.

  3. PX milliseconds -- Set the specified expire time, in milliseconds.

  4. NX -- Only set the key if it does not already exist.

  5. XX -- Only set the key if it already exist.

实现

</>复制代码

  1. 简单实现

做法为set if not exist(如果不存在则赋值), redis命令为原子操作, 所以多带带使用set命令时不用担心并发导致异常.

具体代码实现如下: (spring-data-redis:2.1.6)

依赖引入

</>复制代码

  1. <dependency>
  2. <groupId>org.springframework.bootgroupId>
  3. <artifactId>spring-boot-starter-data-redisartifactId>
  4. <version>2.1.4.RELEASEversion>
  5. dependency>
配置RedisTemplate

</>复制代码

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
  4. StringRedisSerializer keySerializer = new StringRedisSerializer();
  5. RedisSerializer<");new StringRedisSerializer();
  6. StringRedisTemplate template = new StringRedisTemplate();
  7. template.setConnectionFactory(factory);
  8. template.setKeySerializer(keySerializer);
  9. template.setHashKeySerializer(keySerializer);
  10. template.setValueSerializer(serializer);
  11. template.setHashValueSerializer(serializer);
  12. template.afterPropertiesSet();
  13. return template;
  14. }
简单的分布式锁实现

</>复制代码

  1. /**
  2. * try lock
  3. * @author piaoruiqing
  4. *
  5. * @param key lock key
  6. * @param value value
  7. * @param timeout timeout
  8. * @param unit
  9. time unit
  10. * @return
  11. */
  12. public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
  13. return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
  14. }

以上代码即完成了一个简单的分布式锁功能:

其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即为执行redis命令:

</>复制代码

  1. redis> set dlock:test-try-lock a EX 10 NX
  2. OK
  3. redis> set dlock:test-try-lock a EX 10 NX
  4. null
早期版本spring-data-redis分布式锁实现及注意事项

</>复制代码

  1. 方法Boolean setIfAbsent(K key, V value, long timeout, TimeUnit unit);是在2.1版本中新增的, 早期版本中setIfAbsent无法同时指定过期时间, 若先使用setIfAbsent再设置key的过期时间, 会存在产生死锁的风险, 故旧版本中需要使用另外的写法进行实现. 以spring-data-redis:1.8.20为例

</>复制代码

  1. /**
  2. * try lock
  3. * @author piaoruiqing
  4. *
  5. * @param key lock key
  6. * @param value value
  7. * @param timeout timeout
  8. * @param unit
  9. time unit
  10. * @return
  11. */
  12. public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
  13. return redisTemplate.execute(new RedisCallback() {
  14. @Override
  15. public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
  16. JedisCommands commands = (JedisCommands)connection.getNativeConnection();
  17. String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));
  18. return "OK".equals(result);
  19. }
  20. });
  21. }

spring-data-redis:1.8.20默认redis客户端为jedis, 可通过getNativeConnection直接调用jedis方法进行操作. 新旧版本实现方式最终效果相同.

优化进阶

</>复制代码

  1. 基于AOP实现分布式锁注解工具 - 不仅能用, 而且好用

优化一 (自动解锁及重试)

</>复制代码

  1. 自动解锁、重试: 上一节针对分布式锁的简单实现可满足基本需求, 但仍有较多可优化改进之处, 本小节将针对分布式锁自动解锁及重试进行优化

分布式锁抽象类

</>复制代码

  1. 实现AutoCloseable接口, 可使用try-with-resource方便地完成自动解锁.

</>复制代码

  1. /**
  2. * distributed lock
  3. * @author piaoruiqing
  4. *
  5. * @since JDK 1.8
  6. */
  7. abstract public class DistributedLock implements AutoCloseable {
  8. private final Logger LOGGER = LoggerFactory.getLogger(getClass());
  9. /**
  10. * release lock
  11. * @author piaoruiqing
  12. */
  13. abstract public void release();
  14. /*
  15. * (non-Javadoc)
  16. * @see java.lang.AutoCloseable#close()
  17. */
  18. @Override
  19. public void close() throws Exception {
  20. LOGGER.debug("distributed lock close , {}", this.toString());
  21. this.unlock();
  22. }
  23. }

封装Redis分布式锁

</>复制代码

  1. RedisDistributedLock是Redis分布式锁的抽象, 继承了DistributedLock并实现了unlock接口.

</>复制代码

  1. /**
  2. * redis distributed lock
  3. *
  4. * @author piaoruiqing
  5. * @date: 2019/01/12 23:20
  6. *
  7. * @since JDK 1.8
  8. */
  9. public class RedisDistributedLock extends DistributedLock {
  10. private RedisOperations operations;
  11. private String key;
  12. private String value;
  13. private static final String COMPARE_AND_DELETE =
  14. // (一)
  15. "if redis.call("get",KEYS[1]) == ARGV[1]
  16. " +
  17. "then
  18. " +
  19. " return redis.call("del",KEYS[1])
  20. " +
  21. "else
  22. " +
  23. " return 0
  24. " +
  25. "end";
  26. /**
  27. * @param operations
  28. * @param key
  29. * @param value
  30. */
  31. public RedisDistributedLock(RedisOperations operations, String key, String value) {
  32. this.operations = operations;
  33. this.key = key;
  34. this.value = value;
  35. }
  36. /*
  37. * (non-Javadoc)
  38. * @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
  39. */
  40. @Override
  41. public void release() {
  42. // (二)
  43. List keys = Collections.singletonList(key);
  44. operations.execute(new DefaultRedisScript(COMPARE_AND_DELETE), keys, value);
  45. }
  46. /*
  47. * (non-Javadoc)
  48. * @see java.lang.Object#toString()
  49. */
  50. @Override
  51. public String toString() {
  52. return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
  53. }
  54. }

(一): 通过Lua脚本进行解锁, 使对比锁的值+删除成为原子操作, 确保解锁操作的正确性. 简单来说就是防止删了别人的锁. 例如: 线程A方法未执行完毕时锁超时了, 随后B线程也获取到了该锁(key相同), 但此时如果A线程方法执行完毕尝试解锁, 如果不比对value, 那么A将删掉B的锁, 这时候C线程又能加锁, 业务将产生更严重的混乱.(不要过分依赖分布式锁, 在数据一致性要求较高的情况下, 数据库层面也要进行一定的处理, 例如唯一键约束、事务等来确保数据的正确)

(二): 使用RedisOperations执行Lua脚本进行解锁操作.

可参阅redis官方文档

加锁方法实现

</>复制代码

  1. /**
  2. * @author piaoruiqing
  3. * @param key lock key
  4. * @param timeout timeout
  5. * @param retries number of retries
  6. * @param waitingTime retry interval
  7. * @return
  8. * @throws InterruptedException
  9. */
  10. public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
  11. final String value
  12. = RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
  13. do {
  14. Boolean result
  15. = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
  16. if (result) {
  17. return new RedisDistributedLock(stringRedisTemplate, key, value);
  18. }
  19. if (retries > NumberUtils.INTEGER_ZERO) {
  20. TimeUnit.MILLISECONDS.sleep(waitingTime);
  21. }
  22. if(Thread.currentThread().isInterrupted()){
  23. break;
  24. }
  25. } while (retries-- > NumberUtils.INTEGER_ZERO);
  26. return null;
  27. }

(一): 锁值要保证唯一, 使用4位随机字符串+时间戳基本可满足需求 注: UUID.randomUUID()在高并发情况下性能不佳.

(二): 尝试加锁, 代码中是2.1版本的做法, 早起版本参考上一节的实现.

此代码已经可以满足自动解锁和重试的需求了, 使用方法:

</>复制代码

  1. // 根据key加锁, 超时时间10000ms, 重试2次, 重试间隔500ms
  2. try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
  3. // do something
  4. }

但还可以再优雅一点, 将模板代码封装起来, 可支持Lambda表达式:

</>复制代码

  1. /**
  2. * lock handler
  3. * @author piaoruiqing
  4. *
  5. * @since JDK 1.8
  6. */
  7. @FunctionalInterface
  8. // (一)
  9. public interface LockHandler<T> {
  10. /**
  11. * the logic you want to execute
  12. *
  13. * @author piaoruiqing
  14. *
  15. * @return
  16. * @throws Throwable
  17. */
  18. T handle() throws Throwable;
  19. // (二)
  20. }

(一): 定义函数式接口, 将业务逻辑放入Lambda表达式使代码更加简洁.

(二): 业务中的异常不建议在分布式锁中处理, 直接抛出来更合理.

使用LockHandler完成加锁的实现:

</>复制代码

  1. public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
  2. try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
  3. if (lock != null) {
  4. LOGGER.debug("get lock success, key: {}", key);
  5. return handler.handle();
  6. }
  7. LOGGER.debug("get lock fail, key: {}", key);
  8. return null;
  9. }
  10. }

此时可以通过比较优雅的方式使用分布式锁来完成编码:

</>复制代码

  1. @Test
  2. public void testTryLock() throws Throwable {
  3. final String key = "dlock:test-try-lock";
  4. AnyObject anyObject = redisLockService.tryLock(key, () -> {
  5. // do something
  6. return new AnyObject();
  7. }, 10000, true, 0, 0);
  8. }
[版权声明]
本文发布于朴瑞卿的博客, 允许非商业用途转载, 但转载必须保留原作者朴瑞卿 及链接:blog.piaoruiqing.cn. 如有授权方面的协商或合作, 请联系邮箱: piaoruiqing@gmail.com.

优化二 (自定义异常)

</>复制代码

  1. 自定义异常: 前文中针对分布式锁的封装可满足多数业务场景, 但是考虑这样一种情况, 如果业务本身会返回NULL当前的实现方式可能会存在错误的处理, 因为获取锁失败也会返回NULL. 避免返回NULL固然是一种解决方式, 但无法满足所有的场景, 此时支持自定义异常或许是个不错的选择.

实现起来很容易, 在原代码的基础之上增加onFailure参数, 如果锁为空直接抛出异常即可.

加锁方法实现

</>复制代码

  1. public T tryLock(String key, LockHandler handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<"); throws Throwable {
  2. // (一)
  3. try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
  4. if (lock != null) {
  5. LOGGER.debug("get lock success, key: {}", key);
  6. return handler.handle();
  7. }
  8. LOGGER.debug("get lock fail, key: {}", key);
  9. if (null != onFailure) {
  10. throw onFailure.newInstance();
  11. // (二)
  12. }
  13. return null;
  14. }
  15. }

(一): Class<");限定onFailure必须是RuntimeException或其子类. 笔者认为使用RuntimeException在语义上更容易理解. 如有需要使用其他异常也未尝不可(如获取锁失败需要统一处理等情况).

(二): 反射

优化三 (优雅地使用注解)

</>复制代码

  1. 结合APO优雅地使用注解完成分布式锁:

定义注解

</>复制代码

  1. 为了减小篇幅折叠部分注释

</>复制代码

  1. /**
  2. * distributed lock
  3. * @author piaoruiqing
  4. * @date: 2019/01/12 23:15
  5. *
  6. * @since JDK 1.8
  7. */
  8. @Documented
  9. @Retention(RetentionPolicy.RUNTIME)
  10. @Target(ElementType.METHOD)
  11. public @interface DistributedLockable {
  12. /** timeout of the lock */
  13. long timeout() default 5L;
  14. /** time unit */
  15. TimeUnit unit() default TimeUnit.MILLISECONDS;
  16. /** number of retries */
  17. int retries() default 0;
  18. /** interval of each retry */
  19. long waitingTime() default 0L;
  20. /** key prefix */
  21. String prefix() default "";
  22. /** parameters that construct a key */
  23. String[] argNames() default {};
  24. /** construct a key with parameters */
  25. boolean argsAssociated() default true;
  26. /** whether unlock when completed */
  27. boolean autoUnlock() default true;
  28. /** throw an runtime exception while fail to get lock */
  29. Class<");default NoException.class;
  30. /** no exception */
  31. public static final class NoException extends RuntimeException {
  32. private static final long serialVersionUID = -7821936618527445658L;
  33. }
  34. }

timeout: 超时时间

unit: 时间单位

retries: 重试次数

waitingTime: 重试间隔时间

prefix: key前缀, 默认为包名+类名+方法名

argNames: 组成key的参数

注解可使用在方法上, 需要注意的是, 本文注解通过spring AOP实现, 故对象内部方法间调用将无效.

切面实现

</>复制代码

  1. /**
  2. * distributed lock aspect
  3. * @author piaoruiqing
  4. * @date: 2019/02/02 22:35
  5. *
  6. * @since JDK 1.8
  7. */
  8. @Aspect
  9. @Order(10)
  10. // (一)
  11. public class DistributedLockableAspect implements KeyGenerator {
  12. // (二)
  13. private final Logger LOGGER = LoggerFactory.getLogger(getClass());
  14. @Resource
  15. private RedisLockClient redisLockClient;
  16. /**
  17. * {@link DistributedLockable}
  18. * @author piaoruiqing
  19. */
  20. @Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
  21. public void distributedLockable() {}
  22. /**
  23. * @author piaoruiqing
  24. *
  25. * @param joinPoint
  26. * @param lockable
  27. * @return
  28. * @throws Throwable
  29. */
  30. @Around(value = "distributedLockable() && @annotation(lockable)")
  31. public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
  32. long start = System.nanoTime();
  33. final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
  34. Object result = redisLockClient.tryLock(
  35. key, () -> {
  36. return joinPoint.proceed();
  37. },
  38. lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(),
  39. lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
  40. lockable.onFailure()
  41. );
  42. long end = System.nanoTime();
  43. LOGGER.debug("distributed lockable cost: {} ns", end - start);
  44. return result;
  45. }
  46. }

(一): 切面优先级

(二): KeyGenerator为自定义的key生成策略, 使用 prefix+argName+arg作为key, 具体实现见源码.

此时可以通过注解的方式使用分布式锁, 这种方式对代码入侵较小, 且简洁.

</>复制代码

  1. @DistributedLockable(
  2. argNames = {"anyObject.id", "anyObject.name", "param1"},
  3. timeout = 20, unit = TimeUnit.SECONDS,
  4. onFailure = RuntimeException.class
  5. )
  6. public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
  7. try {
  8. TimeUnit.SECONDS.sleep(timeout);
  9. LOGGER.info("distributed-lockable: " + System.nanoTime());
  10. } catch (InterruptedException e) {
  11. }
  12. return System.nanoTime();
  13. }
扩展

分布式锁的实现有多种方式, 可根据实际场景和需求选择不同的介质进行实现:

Redis: 性能高, 对分布式锁支持友好, 实现简单, 多数场景下表现较好.

Zookeeper: 可靠性较高, 对分布式锁支持友好, 实现较复杂但有现成的实现可以使用.

数据库: 实现简单, 可使用乐观锁/悲观锁实现, 性能一般, 高并发场景下不推荐

结语

本文阐述了Redis分布式锁的JAVA实现, 完成了自动解锁、自定义异常、重试、注解锁等功能, 源码见地址.

本实现还有诸多可以优化之处, 如:

重入锁的实现

优化重试策略为订阅Redis事件: 订阅Redis事件可以进一步优化锁的性能, 可通过wait+notifyAll来替代文中的sleep.

篇幅有限, 后续再行阐述.

参考文献

redis.io/topics/dist…

martin.kleppmann.com/2016/02/08/…


[版权声明]
本文发布于朴瑞卿的博客, 允许非商业用途转载, 但转载必须保留原作者朴瑞卿 及链接:blog.piaoruiqing.cn. 如有授权方面的协商或合作, 请联系邮箱: piaoruiqing@gmail.com.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/7228.html

相关文章

  • 百度社招面试题——如何用Redis实现布式

    摘要:集群实现分布式锁上面的讨论中我们有一个非常重要的假设是单点的。但是其实这已经超出了实现分布式锁的范围,单纯用没有命令来实现生成。这个问题用实现分布式锁暂时无解。结论并不能实现严格意义上的分布式锁。 关于Redis实现分布式锁的问题,网络上很多,但是很多人的讨论基本就是把原来博主的贴过来,甚至很多面试官也是一知半解经不起推敲就来面候选人,最近结合我自己的学习和资料查阅,整理一下用Redi...

    YanceyOfficial 评论0 收藏0
  • Redis 布式--PHP

    摘要:分布式锁的作用在单机环境下,有个秒杀商品的活动,在短时间内,服务器压力和流量会陡然上升。分布式集群业务业务场景下,每台服务器是独立存在的。这里就用到了分布式锁这里简单介绍一下,以的事务机制来延生。 Redis 分布式锁的作用 在单机环境下,有个秒杀商品的活动,在短时间内,服务器压力和流量会陡然上升。这个就会存在并发的问题。想要解决并发需要解决以下问题 1、提高系统吞吐率也就是qps 每...

    canger 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<