资讯专栏INFORMATION COLUMN

聊聊redisson的分布式锁

Channe / 1467人阅读

摘要:序本文主要研究一下的分布式锁实例源码解析这里没有设置的话,默认是,使用的是,默认为秒使用的是一段脚本,该脚本有个参数,第一个参数为数组,后面几个参数为数组的元素这里的值为调用方指定的这个的名称,两个变量,第一个为,第二个为锁的

本文主要研究一下redisson的分布式锁

maven
        
            org.redisson
            redisson
            3.8.1
        
实例
    @Test
    public void testDistributedLock(){
        Config config = new Config();
//        config.setTransportMode(TransportMode.EPOLL);
        config.useSingleServer()
                .setAddress("redis://192.168.99.100:6379");
        RedissonClient redisson = Redisson.create(config);


        IntStream.rangeClosed(1,5)
                .parallel()
                .forEach(i -> {
                    executeLock(redisson);
                });

        executeLock(redisson);
    }

    public void executeLock(RedissonClient redisson){
        RLock lock = redisson.getLock("myLock");
        boolean locked = false;
        try{
            LOGGER.info("try lock");
            locked = lock.tryLock();
//            locked = lock.tryLock(1,2,TimeUnit.MINUTES);
            LOGGER.info("get lock result:{}",locked);
            if(locked){
                TimeUnit.HOURS.sleep(1);
                LOGGER.info("get lock and finish");
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            LOGGER.info("enter unlock");
            if(locked){
                lock.unlock();
            }
        }
    }
源码解析 RedissonLock.tryLock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

    @Override
    public boolean tryLock() {
        return get(tryLockAsync());
    }

    @Override
    public RFuture tryLockAsync() {
        return tryLockAsync(Thread.currentThread().getId());
    }

    @Override
    public RFuture tryLockAsync(long threadId) {
        return tryAcquireOnceAsync(-1, null, threadId);
    }

    private RFuture tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

     RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call("exists", KEYS[1]) == 0) then " +
                      "redis.call("hset", KEYS[1], ARGV[2], 1); " +
                      "redis.call("pexpire", KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call("hincrby", KEYS[1], ARGV[2], 1); " +
                      "redis.call("pexpire", KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call("pttl", KEYS[1]);",
                    Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

    protected String getLockName(long threadId) {
        return id + ":" + threadId;
    }

    private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                
                RFuture future = renewExpirationAsync(threadId);
                
                future.addListener(new FutureListener() {
                    @Override
                    public void operationComplete(Future future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can"t update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }

        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), new ExpirationEntry(threadId, task)) != null) {
            task.cancel();
        }
    }

    protected RFuture renewExpirationAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call("pexpire", KEYS[1], ARGV[1]); " +
                    "return 1; " +
                "end; " +
                "return 0;",
            Collections.singletonList(getName()), 
            internalLockLeaseTime, getLockName(threadId));
    }

    private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        ttlRemainingFuture.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

这里leaseTime没有设置的话,默认是-1,使用的是commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),默认为30秒

tryLockInnerAsync使用的是一段lua脚本,该脚本有3个参数,第一个参数为KEYS数组,后面几个参数为ARGV数组的元素

这里key的值为调用方指定的这个redissonLock的名称,两个变量,第一个为leaseTime,第二个为锁的名称,使用redissonLock的id+线程id

lua脚本第一个方法判断redissonLock的hashmap是否存在,如果不存在则创建,该hashmap有一个entry的key为锁名称,valude为1,之后设置该hashmap失效时间为leaseTime

lua脚本第二个方法是在redissonLock的hashmap存在的情况下,将该锁名的value增1,同时设置失效时间为leaseTime

最后返回该redissonLock名称的key的ttl

执行成功之后判断ttl是否还有值,有的话则调用scheduleExpirationRenewal,防止lock未执行完就失效

scheduleExpirationRenewal是注册一个延时任务,在internalLockLeaseTime / 3的时候触发,执行的方法是renewExpirationAsync,将该锁失效时间重置回internalLockLeaseTime

scheduleExpirationRenewal里头给scheduleExpirationRenewal任务增加listener,如果设置成功之后还会再次递归调用scheduleExpirationRenewal重新注册延时任务

tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId)方法是指定自动解锁时间时调用的方法,它与tryAcquireOnceAsync的区别在于,它对ttl的方回值采用long值来判断,如果是null,才执行延长失效时间的定时任务,而tryAcquireOnceAsync方法采用的是BooleanNullReplayConvertor,只要返回不是null,则返回true

RedissonLock.unlock

redisson-3.8.1-sources.jar!/org/redisson/RedissonLock.java

    @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

    @Override
    public RFuture unlockAsync(final long threadId) {
        final RPromise result = new RedissonPromise();
        RFuture future = unlockInnerAsync(threadId);

        future.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future future) throws Exception {
                if (!future.isSuccess()) {
                    cancelExpirationRenewal(threadId);
                    result.tryFailure(future.cause());
                    return;
                }

                Boolean opStatus = future.getNow();
                if (opStatus == null) {
                    IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                            + id + " thread-id: " + threadId);
                    result.tryFailure(cause);
                    return;
                }
                if (opStatus) {
                    cancelExpirationRenewal(null);
                }
                result.trySuccess(null);
            }
        });

        return result;
    }

    protected RFuture unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call("exists", KEYS[1]) == 0) then " +
                    "redis.call("publish", KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                "if (redis.call("hexists", KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call("hincrby", KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call("pexpire", KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call("del", KEYS[1]); " +
                    "redis.call("publish", KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));

    }

    String getChannelName() {
        return prefixName("redisson_lock__channel", getName());
    }

    void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = expirationRenewalMap.get(getEntryName());
        if (task != null && (threadId == null || task.getThreadId() == threadId)) {
            expirationRenewalMap.remove(getEntryName());
            task.getTimeout().cancel();
        }
    }

unlockInnerAsync通过lua脚本来释放锁,该lua使用两个key,一个是redissonLock名称,一个是channelName

该lua使用的变量有三个,一个是pubSub的unlockMessage,默认为0,一个是internalLockLeaseTime,默认为commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),一个是锁名称

如果该redissonLock不存在,则直接发布unlock消息返回1;如果该锁不存在则返回nil;

如果该锁存在则将其计数-1,如果counter大于0,则重置下失效时间,返回0;如果counter不大于0,则删除该redissonLock锁,发布unlockMessage,返回1;如果上面条件都没有命中返回nil

unlockAsync里头对unlockInnerAsync注册了FutureListener,主要是调用cancelExpirationRenewal,取消掉scheduleExpirationRenewal任务

LockPubSub

redisson-3.8.1-sources.jar!/org/redisson/pubsub/LockPubSub.java

public class LockPubSub extends PublishSubscribe {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            Runnable runnableToExecute = value.getListeners().poll();
            if (runnableToExecute != null) {
                runnableToExecute.run();
            }

            value.getLatch().release();
        }
    }

}

接收到unlockMessage的时候,会调用RedissonLockEntry的listener,然后触发latch的release

tryAcquireOnceAsync这个方法默认没有创建LockPubSub,而且没有指定自动解锁时间,则定时任务会一直延长失效时间,这个可能存在锁一直没释放的风险

小结

加锁有如下注意事项:

加锁需要设置超时时间,防止出现死锁

加锁以及设置超时时间的时候,需要保证两个操作的原子性,因而最好使用lua脚本或者使用支持NX以及EX的set方法

加锁的时候需要把加锁的调用方信息,比如线程id给记录下来,这个在解锁的时候需要使用

对于加锁时长不确定的任务,为防止任务未执行完导致超时被释放,需要对尚未运行完的任务延长失效时间

解锁有如下注意事项:

解锁一系列操作(判断key是否存在,存在的话删除key等)需要保证原子性,因而最好使用lua脚本

解锁需要判断调用方是否与加锁时记录的是否一致,防止锁被误删

如果有延续失效时间的延时任务,在解锁的时候,需要终止掉该任务

doc

一分钟实现分布式锁

这才是真正的分布式锁

漫画:什么是分布式锁?

Redis分布式锁的正确实现方式(Java版)

基于Redis的分布式锁到底安全吗(上)?

基于Redis的分布式锁到底安全吗(下)?

Redis分布式锁的正确实现姿势

Redisson分布式锁浅析

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/36853.html

相关文章

  • Redis集群环境下-RedLock(真布式) 实践

    摘要:是官方提出的实现分布式锁管理器的算法。为了避免这种情况的发生,内部提供了一个监控锁的看门狗,它的作用是在实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是秒钟,也可以通过修改来另行指定。 在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。 有很多三方库和文章描述如何用Redis实现一个分布式锁管理器,但是这些库实现的方式差别很大,而且很多简单...

    zhichangterry 评论0 收藏0
  • 重磅发布- Java商城秒杀系统设计与实战视频教程(SpringBoot版)

    摘要:技术列表缓存中间件服务协调调度中间件消息中间件综合性质的中间件分布式锁分布式唯一生成服务雪花算法邮件服务权限认证授权矿建的登录认证服务以及等等。 概要介绍:历经一个多月的时间,debug亲自录制的Java商城秒杀系统的设计与实战视频教程(SpringBoot版)终于完成了!在本课程中,debug真正的将之前所讲解的相关技术融入到了本课程中,即本课程所介绍的秒杀系统是一个真正意义上的项目...

    崔晓明 评论0 收藏0
  • 基于Reddsion布式实现

    redission用来做分布式锁比zookeeper更方便,简单。 引入依赖 org.redisson redisson 2.10.7 compile 配置redission @Import(SLockAspect.class) //引入AOP配置 @AutoConf...

    ralap 评论0 收藏0
  • 布式和spring事务管理

    摘要:否则数据会出现不同步问题我使用的做分布式锁管理,用注解事务管理。但是出现另外一个问题,锁超时但是事务仍未提交。 最近开发一个小程序遇到一个需求需要实现分布式事务管理 业务需求 用户在使用小程序的过程中可以查看景点,对景点地区或者城市标记是否想去,那么需要统计一个地点被标记的人数,以及记录某个用户对某个地点是否标记为想去,用两个表存储数据,一个地点表记录改地点被标记的次数,一个用户意向表...

    nanfeiyan 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<