• 作者:老汪软件技巧
  • 发表时间:2024-10-16 11:03
  • 浏览量:

开心一刻

一男人站在楼顶准备跳楼,楼下有个劝解员拿个喇叭准备劝解

劝解员:兄弟,别跳

跳楼人:我不想活了

劝解员:你想想你媳妇

跳楼人:媳妇跟人跑了

劝解员:你还有兄弟

跳楼人:就是跟我兄弟跑的

劝解员:你想想你家孩子

跳楼人:孩子是他俩的

劝解员:死吧,妈的,你活着也没啥意义了

写在前面

关于锁,相信大家都不陌生,一般我们用其在多线程环境中控制对共享资源的并发访问;单服务下,用 JDK 中的 synchronized 或 Lock 的实现类可实现对共享资源的并发访问,分布式服务下,JDK 中的锁就显得力不从心了,分布式锁也就应运而生了;分布式锁的实现方式有很多,常见的有如下几种

基于 MySQL,利用行级悲观锁(select ... for update)基于 Redis,利用其 (setnx + expire) 或 set基于 Zookeeper,利用其临时目录和事件回调机制

本文不讲这些,网上资料很多,感兴趣的小伙伴自行去查阅;本文的重点是基于 Redis 的 Redisson,从源码的角度来看看为什么推荐用 Redisson 来实现分布式锁;推荐大家先去看看

搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了

有助于理解后文

分布式锁特点

可以类比 JDK 中的锁

互斥

不仅要保证同个服务中不同线程的互斥,还需要保证不同服务间、不同线程的互斥;如何处理互斥,是自旋、还是阻塞 ,还是其他 ?

超时

锁超时设置,防止程序异常奔溃而导致锁一直存在,后续同把锁一直加不上

续期

程序具体执行的时长无法确定,所以过期时间只能是个估值,那么就不能保证程序在过期时间内百分百能运行完,所以需要进行锁续期,保证业务是在加锁的情况下完成的

可重入

可重入锁又名递归锁,是指同一个线程在外层方法已经获得锁,再进入该线程的中层或内层方法会自动获取锁;简单点来说,就是同个线程可以反复获取同一把锁

专一释放

通俗点来讲:谁加的锁就只有它能释放这把锁;为什么会出现这种错乱释放的问题了,举个例子就理解了

线程 T1 对资源 lock_zhangsan 加了锁,由于某些原因,业务还未执行完,锁已经过期自动释放了,此时线程 T2 对资源 lock_zhangsan 加锁成功,T2 还在执行业务的过程中,T1 业务执行完后释放资源 lock_zhangsan 的锁,结果把 T2 加的锁给释放了

公平与非公平

公平锁:多个线程按照申请锁的顺序去获得锁,所有线程都在队列里排队,这样就保证了队列中的第一个先得到锁

非公平锁:多个线程不按照申请锁的顺序去获得锁,而是同时直接去尝试获取锁

JDK 中的 ReentrantLock 就有公平和非公平两种实现,有兴趣的可以去看看它的源码;多数情况下用的是非公平锁,但有些特殊情况下需要用公平锁

你们可能会有这样的疑问

引入一个简单的分布式锁而已,有必要考虑这么多吗?

虽然绝大部分情况下,我们的程序都是在跑正常流程,但不能保证异常情况 100% 跑不到,出于健壮性考虑,异常情况都需要考虑到;下面我们就来看看 Redisson 是如何实现这些特点的

Redisson实现分布式锁

分布式锁如何实现锁等待_分布式锁的场景_

关于 Redisson,更多详细信息可查看官方文档,它提供了非常丰富的功能,分布式锁 只是其中之一;我们基于 Redisson 3.13.6,来看看分布式锁的实现

先将 Redis 信息配置给 Redisson,创建出 RedissonClient 实例

Redis 的部署方式不同,Redisson 配置模式也会不同,详细信息可查看:Configuration,我们就以最简单的 Single mode 来配置

@Before
public void before() {
    Config config = new Config();
    config.useSingleServer()
            .setAddress("redis://192.168.1.110:6379");
    redissonClient = Redisson.create(config);
}

通过 RedissonClient 实例获取锁

RedissonClient 实例创建出来后,就可以通过它来获取锁

/**
 * 多线程
 * @throws Exception
 */
@Test
public void multiLock() throws Exception {
    RLock testLock = redissonClient.getLock("multi_lock");
    int count = 5;
    CountDownLatch latch = new CountDownLatch(count);
    for (int i=1; i<=count; i++) {
        new Thread(() -> {
            try {
                System.out.println("线程 " + Thread.currentThread().getName() + " 尝试获取锁");
                testLock.lock();
                System.out.println(String.format("线程 %s 获取到锁, 执行业务中...", Thread.currentThread().getName()));
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(String.format("线程 %s 业务执行完成", Thread.currentThread().getName()));
                latch.countDown();
            } finally {
                testLock.unlock();
                System.out.println(String.format("线程 %s 释放锁完成", Thread.currentThread().getName()));
            }
        }, "t" + i).start();
    }
    latch.await();
    System.out.println("结束");
}

完整示例代码:redisson-demo

用 Redisson 实现分布式锁就是这么简单,但光会使用肯定是不够的,我们还得知道其底层实现原理

知其然,并知其所以然!

那如何知道其原理呢?当然是看其源码实现

客户端创建

客服端的创建过程中,会生成一个 id 作为唯一标识,用以区分分布式下不同节点中的客户端

id 值就是一个 UUID,客户端启动时生成;至于这个 id 有什么用,大家暂且在脑中留下这个疑问,我们接着往下看

锁获取

我们从 lock 开始跟源码

最终会来到有三个参数的 lock 方法

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    
    // 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
    Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    // 锁被其他线程占用而获取失败,使用redis的发布订阅功能来等待锁的释放通知,而非自旋监测锁的释放
    RFuture future = subscribe(threadId);
    
    // 当前线程会阻塞,直到锁被释放时当前线程被唤醒(有超时等待,默认 7.5s,而不会一直等待)
    // 持有锁的线程释放锁之后,redis会发布消息,所有等待该锁的线程都会被唤醒,包括当前线程
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    try {
        while (true) {
            // 尝试获取锁;ttl为null表示锁获取成功; ttl不为null表示获取锁失败,其值为其他线程占用该锁的剩余时间
            ttl = tryAcquire(-1, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }
            // waiting for message
            if (ttl >= 0) {
                try {
                    // future.getNow().getLatch() 返回的是 Semaphore 对象,其初始许可证为 0,以此来控制线程获取锁的顺序
                    // 通过 Semaphore 控制当前服务节点竞争锁的线程数量
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 退出锁竞争(锁获取成功或者放弃获取锁),则取消锁的释放订阅
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}

主要三个点:尝试获取锁、订阅、取消订阅

尝试获取锁

尝试获取锁主要做了两件事:1、尝试获取锁,2、锁续期;尝试获取锁主要涉及到一段 Lua 代码

结合 搞懂了 Redis 的订阅、发布与Lua,Redisson的加锁原理就好理解了 来看这段 Lua 脚本,还是很好理解的

用 exists 判断 key 不存在,则用 hash 结构来存放锁,key = 资源名,field = uuid + : + threadId,value 自增 1;设置锁的过期时间(默认是 lockWatchdogTimeout = 30 * 1000 毫秒),并返回 nil

用 hexists 判断 field = uuid + : + threadId 存在,则该 field 的 value 自增 1,并重置过期时间,最后返回 nil

这里相当于实现了锁的重入

上面两种情况都不满足,则说明锁被其他线程占用了,直接返回锁的过期时间

给你们提个问题

为什么 field = uuid + : + threadId,而不是 field = threadId

友情提示:从多个服务(也就是多个 Redisson 客户端)来考虑

这个问题想清楚了,那么前面提到的:在 Redisson 客户端创建的过程中生成的 id(一个随机的 uuid 值),它的作用也就清楚了

尝试获取锁成功之后,会启动一个定时任务(即 WatchDog,亦称 看门狗)实现锁续期,也涉及到一段 Lua 脚本

这段脚本很简单,相信你们都能看懂

默认情况下,锁的过期时间是 30s,锁获取成功之后每隔 10s 进行一次锁续期,重置过期时间成 30s

若锁已经被释放了,则定时任务也会停止,不会再续期