01 RedissonLock

redisson 基于 org.redisson:redisson-spring-data-27:3.27.2 版本

java 中,操作 redis 一般都会选择 redisson 框架, 我们需要了解常用功能的实现原理, 这次来介绍 RedissonLock

使用方式

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
@Test
void testDistributedLock() {
    RLock lock = redissonClient.getLock("lock");
    try {
        lock.lock();
        ThreadUtil.sleep(30, TimeUnit.SECONDS);
        System.out.println("xxx");
    } finally {
        lock.unlock();
    }
}

上面是最常见分布式锁使用示例, redisson 的锁分为好几种,我们先以 RedissonLock 来说明。

./分布式锁.png
分布式锁

lock

源码位置: org.redisson.RedissonLock#lock()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// leaseTime:续约时间, 默认为 30 秒
// interruptibly: 支持可打断
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    // 获取当前线程 id
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // ttl 为 null,表示获取锁成功
    if (ttl == null) {
        return;
    }

    // 订阅这个锁,一旦锁释放就会得到通知
    CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
    pubSub.timeout(future);
    RedissonLockEntry entry;
    if (interruptibly) {
        entry = commandExecutor.getInterrupted(future);
    } else {
        entry = commandExecutor.get(future);
    }

    try {
        while (true) {
            // 尝试获取锁
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    // 等待锁释放
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    entry.getLatch().acquire();
                } else {
                    entry.getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 取消订阅
        unsubscribe(entry, threadId);
    }
}

上面的逻辑可以结合图来理解。

源码位置: org.redisson.RedissonLock#tryAcquireAsync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// leaseTime 为 -1,表示会一直持有锁,除非调用 unlock 解锁
private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        // 执行 lua 脚本进行加锁
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 执行 lua 脚本进行加锁
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    // 处理异常情况,如果加锁失败,就释放锁
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);

    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // ttlRemaining 为 null,加锁成功
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 定期续约锁
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

源码位置: org.redisson.RedissonLock#tryLockInnerAsync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 执行 lua 脚本进行加锁
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, command,
            // 不存在锁
            "if ((redis.call('exists', KEYS[1]) == 0) " +
                        // 可重入锁
                        "or (redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    // 加一
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    // 设置过期时间
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    // 返回 null,表示加锁成功
                    "return nil; " +
                "end; " +
                // 获取锁过期时间
                "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

unlock

源码位置: org.redisson.RedissonBaseLock#unlock

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public void unlock() {
    try {
        // 获取当前线程 id, 然后解锁, 最终调用 unlockInnerAsync
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

源码位置: org.redisson.RedissonBaseLock#unlockInnerAsync(long)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
protected final RFuture<Boolean> unlockInnerAsync(long threadId) {
    String id = getServiceManager().generateId();
    MasterSlaveServersConfig config = getServiceManager().getConfig();
    int timeout = (config.getTimeout() + config.getRetryInterval()) * config.getRetryAttempts();
    timeout = Math.max(timeout, 1);
    // 执行 lua 脚本进行解锁
    RFuture<Boolean> r = unlockInnerAsync(threadId, id, timeout);
    CompletionStage<Boolean> ff = r.thenApply(v -> {
        CommandAsyncExecutor ce = commandExecutor;
        if (ce instanceof CommandBatchService) {
            ce = new CommandBatchService(commandExecutor);
        }
        // 删除标记 
        ce.writeAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.DEL, getUnlockLatchName(id));
        if (ce instanceof CommandBatchService) {
            ((CommandBatchService) ce).executeAsync();
        }
        return v;
    });
    return new CompletableFutureWrapper<>(ff);
}

源码位置: org.redisson.RedissonLock#unlockInnerAsync

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected RFuture<Boolean> unlockInnerAsync(long threadId, String requestId, int timeout) {
    return evalWriteSyncedAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                   // 判断是否已经解锁过
                  "local val = redis.call('get', KEYS[3]); " +
                        "if val ~= false then " +
                            "return tonumber(val);" +
                        "end; " +
                        // 锁已经不存在,无需解锁
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            "return nil;" +
                        "end; " +
                        // 处理可重入锁
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "redis.call('set', KEYS[3], 0, 'px', ARGV[5]); " +
                            "return 0; " +
                        "else " +
                            // 解锁
                            "redis.call('del', KEYS[1]); " +
                            // 发布解锁消息
                            "redis.call(ARGV[4], KEYS[2], ARGV[1]); " +
                            // 标记已经解锁
                            "redis.call('set', KEYS[3], 1, 'px', ARGV[5]); " +
                            "return 1; " +
                        "end; ",
                    Arrays.asList(getRawName(), getChannelName(), getUnlockLatchName(requestId)),
                    LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime,
                    getLockName(threadId), getSubscribeService().getPublishCommand(), timeout);
}
0%