• 作者:老汪软件技巧
  • 发表时间:2024-11-14 17:02
  • 浏览量:

锁定一个常规的互斥锁(参见章节“锁定:互斥锁和读写锁”)时,如果该互斥锁已经被锁定,你的线程将会被挂起。这种机制可以避免在等待锁释放时浪费资源。如果锁只会在极短的时间内被持有,并且多个锁定它的线程可以在不同的处理器核心上并行运行,那么让线程重复尝试获取锁而不是挂起可能会更好。

自旋锁是一种正好实现了上述功能的互斥锁。当尝试锁定已经被锁住的自旋锁时,线程会进入忙等待或“自旋”状态:不断重复尝试,直到最终成功。这虽然会浪费处理器的周期,但有时可以在锁定时获得更低的延迟。

注意:

许多实际应用中的互斥锁实现,包括在某些平台上的 std::sync::Mutex,在最初锁定时会短暂表现得像自旋锁,然后才请求操作系统挂起线程。这种尝试结合两者优点的做法是否有利,完全取决于具体的使用场景。

在本章中,我们将构建我们自己的 SpinLock 类型,应用我们在第 2 章和第 3 章中学到的内容,并看看如何利用 Rust 的类型系统为 SpinLock 的用户提供一个安全且有用的接口。

最小实现

让我们从头开始实现一个自旋锁。

最简单的版本其实很容易实现,如下所示:

pub struct SpinLock {
    locked: AtomicBool,
}

我们只需要一个布尔值来指示自旋锁是否被锁定。由于我们希望多个线程能够同时与它交互,因此使用了原子布尔类型。

接下来,我们需要一个构造函数,以及 lock 和 unlock 方法:

impl SpinLock {
    pub const fn new() -> Self {
        Self { locked: AtomicBool::new(false) }
    }
    pub fn lock(&self) {
        while self.locked.swap(true, Acquire) {
            std::hint::spin_loop();
        }
    }
    pub fn unlock(&self) {
        self.locked.store(false, Release);
    }
}

locked 布尔值初始为 false,lock 方法将其交换为 true 并在已经是 true 时不断重试,而 unlock 方法则将其重置为 false。

注意:

我们也可以使用比较并交换(compare-and-exchange)操作来原子地检查布尔值是否为 false 并在其为 false 时将其设为 true:

rust
复制代码
while self.locked.compare_exchange_weak(false, true, Acquire, Relaxed).is_err()

虽然代码略显冗长,但根据个人喜好,这种写法可能更易于理解,因为它更清晰地表达了操作可能会失败或成功的概念。然而,这可能会导致生成略有不同的指令,关于这一点我们将在第 7 章中进行探讨。

在 while 循环中,我们使用了一个自旋循环提示(spin loop hint),告诉处理器我们正在等待某些事情的变化。在大多数主要平台上,这个提示会触发特殊指令,使处理器核心优化其行为,例如暂时减速或优先处理其他有用的事情。然而,与 thread::sleep 或 thread::park 等阻塞操作不同,自旋循环提示不会调用操作系统将线程挂起以让出给其他线程。

提示:

通常,在自旋循环中包含这样的提示是个好主意。根据具体情况,甚至可以在再次尝试访问原子变量之前多次执行这个提示。如果您关心最后几纳秒的性能并想找到最优策略,您需要为您的具体用例进行基准测试。不幸的是,这种基准测试的结论可能高度依赖于硬件,关于这点我们将在第 7 章进一步讨论。

我们使用获取(Acquire)和释放(Release)内存排序来确保每次 unlock() 调用与随后的 lock() 调用建立了一个发生先后(happens-before)关系。换句话说,我们确保在锁定它之后,可以安全地假设上一次锁定期间发生的事情已经发生完毕。这是获取和释放排序最经典的用例:获取和释放锁。

图 4-1 展示了一个使用我们的 SpinLock 来保护共享数据的场景,其中两个线程并发地尝试获取锁。请注意,第一个线程的解锁操作与第二个线程的加锁操作形成了一个发生先后关系,这确保了线程不能同时访问数据。

image.png

不安全的自旋锁

上面的 SpinLock 类型有一个完全安全的接口,因为它本身在误用时不会导致未定义行为。然而,在大多数用例中,它会被用来保护对共享变量的修改,这意味着用户仍然需要使用不安全的、未经检查的代码。

为了提供一个更方便的接口,我们可以更改 lock 方法,让它返回一个对被锁保护的数据的独占引用(&mut T),因为在大多数用例中,正是锁的操作保证了独占访问是安全的。

为此,我们必须将类型修改为泛型,以保护不同类型的数据,并添加一个字段来保存该数据。由于即使自旋锁本身是共享的,这些数据也可以被修改(或被独占访问),我们需要使用内部可变性(详见“内部可变性”一节),为此我们将使用 UnsafeCell:

use std::cell::UnsafeCell;
pub struct SpinLock {
    locked: AtomicBool,
    value: UnsafeCell,
}

出于安全考虑,UnsafeCell 没有实现 Sync,这意味着我们的类型现在无法在线程之间共享,这使得它几乎无用。为了解决这个问题,我们需要向编译器保证,将我们的类型在线程之间共享实际上是安全的。然而,由于自旋锁可以用于将类型为 T 的值从一个线程发送到另一个线程,我们必须将这一承诺限制在可以在线程之间安全发送的类型上。因此,我们(不安全地)为所有实现了 Send 的 T 实现 Sync:

unsafe impl Sync for SpinLock where T: Send {}

请注意,我们不需要要求 T 是 Sync,因为我们的 SpinLock 只允许一个线程一次访问被保护的 T。只有当我们允许多个线程同时访问它时(例如像读写锁对读者那样),我们才需要额外要求 T: Sync。

接下来,我们的新构造函数需要一个类型为 T 的值来初始化 UnsafeCell:

impl SpinLock {
    pub const fn new(value: T) -> Self {
        Self {
            locked: AtomicBool::new(false),
            value: UnsafeCell::new(value),
        }
    }
    …
}

现在进入有趣的部分:lock 和 unlock 方法。我们之所以做这一切,是为了使 lock() 能返回一个 &mut T,这样用户在使用自旋锁保护他们的数据时就不需要编写不安全的、未经检查的代码。这意味着我们现在必须在 lock 实现内部使用不安全代码。UnsafeCell 可以通过 get() 方法为我们提供一个指向其内容的原始指针(*mut T),然后我们可以在不安全块中将其转换为引用,如下所示:

pub fn lock(&self) -> &mut T {
    while self.locked.swap(true, Acquire) {
        std::hint::spin_loop();
    }
    unsafe { &mut *self.value.get() }
}

由于 lock 的函数签名在输入和输出中都包含了引用,因此 &self 和 &mut T 的生命周期被省略并假定是相同的(详见《Rust 编程语言》中的“生命周期省略”部分)。我们可以通过手动写出生命周期来使其显式,如下所示:

pub fn lock<'a>(&'a self) -> &'a mut T { … }

这清楚地表明返回的引用的生命周期与 &self 的生命周期相同。这意味着我们声称返回的引用在锁本身存在的情况下是有效的。

如果我们假装 unlock() 方法不存在,那么这个接口是完全安全和合理的。SpinLock 可以被锁定,返回一个 &mut T,然后再也不能被锁定,这保证了这个独占引用确实是独占的。

但是,如果我们尝试重新添加 unlock() 方法,则需要一种方法来限制返回引用的生命周期,直到下一次调用 unlock() 为止。如果编译器能理解英文,也许以下代码会起作用:

pub fn lock<'a>(&self) -> &'a mut T
where
    'a ends at the next call to unlock() on self,
    even if that's done by another thread.
    Oh, and it also ends when self is dropped, of course.
    (Thanks!)
{ … }

不幸的是,这在 Rust 中并不有效。我们无法向编译器解释这一限制,但可以向用户解释。为了将责任转移给用户,我们将 unlock 函数标记为不安全的,并留下说明,告知用户他们需要做些什么来保证安全:

/// 安全注意:`lock()` 返回的 `&mut T` 必须已销毁!
/// (不要作弊,保留对 `T` 字段的引用!)
pub unsafe fn unlock(&self) {
    self.locked.store(false, Release);
}

使用锁守卫实现安全接口

为了提供一个完全安全的接口,我们需要将解锁操作与 &mut T 的生命周期绑定在一起。我们可以通过将此引用包装在我们自己的类型中来实现,这种类型不仅表现得像一个引用,还实现了 Drop 特性,在它被销毁时执行某些操作。

这种类型通常被称为守卫(guard),因为它有效地守卫了锁的状态,并在被销毁之前负责该状态。

我们的 Guard 类型将简单地包含对 SpinLock 的引用,以便它既可以访问其 UnsafeCell,也可以稍后重置 AtomicBool:

pub struct Guard {
    lock: &SpinLock,
}

但是,当我们尝试编译时,编译器告诉我们:

error[E0106]: missing lifetime specifier
   --> src/lib.rs
    |
    |         lock: &SpinLock,
    |               ^ expected named lifetime parameter
    |
help: consider introducing a named lifetime parameter
    |
    ~     pub struct Guard<'a, T> {
    |                      ^^^
    ~         lock: &'a SpinLock,
    |                ^^
    |

显然,这不是可以省略生命周期的地方。我们必须显式地说明引用的生命周期,正如编译器所建议的那样:

pub struct Guard<'a, T> {
    lock: &'a SpinLock,
}

这样就保证了 Guard 不能比 SpinLock 存在得更久。

接下来,我们修改 SpinLock 的 lock 方法以返回一个 Guard:

pub fn lock(&self) -> Guard {
    while self.locked.swap(true, Acquire) {
        std::hint::spin_loop();
    }
    Guard { lock: self }
}

我们的 Guard 类型没有构造函数,并且它的字段是私有的,因此这是用户获取 Guard 的唯一方式。因此,我们可以安全地假设 Guard 的存在意味着 SpinLock 已被锁定。

为了使 Guard 像一个(独占)引用一样工作,并透明地访问 T,我们需要实现特殊的 Deref 和 DerefMut 特性,如下所示:

use std::ops::{Deref, DerefMut};
impl Deref for Guard<'_, T> {
    type Target = T;
    fn deref(&self) -> &T {
        // 安全性:Guard 的存在保证了我们独占地锁住了锁。
        unsafe { &*self.lock.value.get() }
    }
}
impl DerefMut for Guard<'_, T> {
    fn deref_mut(&mut self) -> &mut T {
        // 安全性:Guard 的存在保证了我们独占地锁住了锁。
        unsafe { &mut *self.lock.value.get() }
    }
}

最后一步,我们为 Guard 实现 Drop,这样就可以完全移除不安全的 unlock 方法:

impl Drop for Guard<'_, T> {
    fn drop(&mut self) {
        self.lock.locked.store(false, Release);
    }
}

就这样,通过 Drop 和 Rust 的类型系统的魔力,我们为 SpinLock 类型提供了一个完全安全(且实用)的接口。

让我们来试试:

fn main() {
    let x = SpinLock::new(Vec::new());
    thread::scope(|s| {
        s.spawn(|| x.lock().push(1));
        s.spawn(|| {
            let mut g = x.lock();
            g.push(2);
            g.push(2);
        });
    });
    let g = x.lock();
    assert!(g.as_slice() == [1, 2, 2] || g.as_slice() == [2, 2, 1]);
}

上面的程序演示了我们的 SpinLock 是多么容易使用。得益于 Deref 和 DerefMut,我们可以直接在守卫上调用 Vec::push 方法。而且由于有了 Drop,我们不必担心解锁。

也可以显式地解锁,通过调用 drop(g) 来销毁守卫。如果您尝试过早地解锁,您会看到守卫通过编译器错误在做它的工作。例如,如果您在两个 push(2) 之间插入 drop(g);,第二个 push 将无法编译,因为此时您已经销毁了 g:

error[E0382]: borrow of moved value: `g`
   --> src/lib.rs
    |
    |     drop(g);
    |          - value moved here
    |     g.push(2);
    |     ^^^^^^^^^ value borrowed here after move

得益于 Rust 的类型系统,我们可以放心地知道这种错误在运行程序之前就会被捕获。

总结