• 作者:老汪软件技巧
  • 发表时间:2024-11-18 15:06
  • 浏览量:

什么是线程

线程与进程,你真得理解了吗_进程和线程的区别-CSDN博客

sylar 中线程模块的组成(一)信号量类(class Semaphore)

// 信号量类
class Semaphore{
public:
    Semaphore(uint32_t count = 0);
    ~Semaphore();
    void wait();    // 等待信号量
    void notify();  // 发送信号量通知
private:
    Semaphore(const Semaphore&) = delete;
    Semaphore(Semaphore&&) = delete;
    Semaphore& operator=(const Semaphore&) = delete;
    Semaphore& operator=(Semaphore&&) = delete;
private:
    sem_t m_semaphore;
};

Semaphore::Semaphore(uint32_t count)
{
    if(sem_init(&m_semaphore, 0, count))
    {
        throw std::logic_error("sem_init error");
    }
}
Semaphore::~Semaphore()
{
    sem_destroy(&m_semaphore);
}
void Semaphore::wait()
{
    // sem_wait 用于阻塞线程,直到信号量的值大于0
    if(sem_wait(&m_semaphore))
    {
        throw std::logic_error("sem_wait error");
    }
}
void Semaphore::notify()
{
    // sem_post 调用时信号量的值增加1
    if(sem_post(&m_semaphore))
    {
        throw std::logic_error("sem_post error");
    }
}

POSIX,全称为 “Portable Operating System Interface” ,是由 IEEE(Institute of Electrical and Electronics Engineers) 制定的一系列标准,旨在定义操作系统的 API(应用程序编程接口),以实现应用程序在不同的 Unix 系统和类 Unix 系统之间的可移植性。就是说 POSIX 标准定义了一套操作系统必须实现的 api,这样的话,当你写的代码只使用了POSIX 标准定义的接口时,那么你的代码相对于所有支持 POSIX 标准的操作系统来说,都是可移植的,最多重新编译一下就可以使用。

Semaphore::Semaphore(uint32_t count)
{
    if(sem_init(&m_semaphore, 0, count))
    {
        throw std::logic_error("sem_init error");
    }
}

int sem_destroy(sem_t *sem) :

Semaphore::~Semaphore()
{
    sem_destroy(&m_semaphore);
}

int sem_wait(sem_t *sem) :

void Semaphore::wait()
{
    // sem_wait 用于阻塞线程,直到信号量的值大于0
    if(sem_wait(&m_semaphore))
    {
        throw std::logic_error("sem_wait error");
    }
}

int sem_post(sem_t *sem) :

void Semaphore::notify()
{
    // sem_post 调用时信号量的值增加1
    if(sem_post(&m_semaphore))
    {
        throw std::logic_error("sem_post error");
    }
}

int sem_trywait(sem_t *sem) :

int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) :

删除拷贝和移动构造函数,以及赋值运算符:

禁止信号量对象的拷贝和移动,以确保线程同步资源的唯一性和一致性。

(二)封装互斥锁(1)三个模板类

ScopedLockImpl:

用于封装常规锁的 RAII 实现。在构造函数中加锁,析构函数中解锁,确保锁在作用域结束时被释放。提供 lock() 和 unlock() 方法,允许在特定情况下手动控制锁定和解锁。

// 对互斥锁的简单封装,基于RAII原则
template<class T>
struct ScopedLockImpl{
public:
    ScopedLockImpl(T& mutex) : m_mutex(mutex)
    {
        lock();
    }
    ~ScopedLockImpl()
    {
        unlock();
    }
    void lock()
    {
        if(!m_locked)
        {
            m_locked = true;
            m_mutex.lock();
        }
    }
    void unlock()
    {
        if(m_locked)
        {
            m_locked = false;
            m_mutex.unlock();
        }
    }
private:
    T& m_mutex;
    bool m_locked = false;
};

ReadScopedLockImpl:

专为读锁设计,封装读操作的加锁和解锁。

template<class T>
struct ReadScopedLockImpl{
public:
    ReadScopedLockImpl(T& mutex) : m_mutex(mutex)
    {
        lock();
    }
    ~ReadScopedLockImpl()
    {
        unlock();
    }
    void lock()
    {
        if(!m_locked)
        {
            m_locked = true;
            m_mutex.rdlock();
        }
    }
    void unlock()
    {
        if(m_locked)
        {
            m_locked = false;
            m_mutex.unlock();
        }
    }
private:
    T& m_mutex;
    bool m_locked = false;
};

WriteScopedLockImpl:

专为写锁设计,确保写操作的加锁和解锁。

template<class T>
struct WriteScopedLockImpl{
public:
    WriteScopedLockImpl(T& mutex) : m_mutex(mutex)
    {
        lock();
    }
    ~WriteScopedLockImpl()
    {
        unlock();
    }
    void lock()
    {
        if(!m_locked)
        {
            // 下面两条语句不能调换顺序
            m_locked = true;    
            m_mutex.wrlock();
        }
    }
    void unlock()
    {
        if(m_locked)
        {
            m_locked = false;
            m_mutex.unlock();
        }
    }
private:
    T& m_mutex;
    bool m_locked = false;
};

(2)不同类型的锁

Mutex:

基于 pthread_mutex_t 的普通互斥锁,提供了加锁和解锁功能。

class Mutex{
public:
    typedef ScopedLockImpl Lock;
    Mutex()
    {
        pthread_mutex_init(&m_mutex, nullptr);
    }
    ~Mutex()
    {
        pthread_mutex_destroy(&m_mutex);
    }
    void lock()
    {
        pthread_mutex_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_mutex_unlock(&m_mutex);
    }

线程池框架__线程管理框架

private: pthread_mutex_t m_mutex; };

NullMutex:

一个 “空”互斥锁,不会执行任何操作,适用于不需要锁的地方,避免代码条件编译。

class NullMutex{
public:
    typedef ScopedLockImpl Lock;
    NullMutex() {}
    ~NullMutex() {}
    void lock() {}
    void unlock() {}
};

RWMutex:

基于 pthread_rwlock_t 的读写锁。提供了读锁 (rdlock()) 和写锁 (wrlock()) 的支持,适用于读多写少的并发场景。

// 读写锁
class RWMutex{
public:
    typedef ReadScopedLockImpl ReadLock;
    typedef WriteScopedLockImpl WriteLock;
    RWMutex()
    {
        pthread_rwlock_init(&m_lock, nullptr);
    }
    ~RWMutex()
    {
        pthread_rwlock_destroy(&m_lock);
    }
    void rdlock()
    {
        pthread_rwlock_rdlock(&m_lock);
    }
    void wrlock()
    {
        pthread_rwlock_wrlock(&m_lock);
    }
    void unlock()
    {
        pthread_rwlock_unlock(&m_lock);
    }
private:
    pthread_rwlock_t m_lock;
};

NullRWMutex:

“空”读写锁,与 NullMutex 类似,用于无需锁的情形。

class NullRWMutex{
public:
    typedef ReadScopedLockImpl ReadLock;
    typedef WriteScopedLockImpl WriteLock;
    NullRWMutex() {}
    ~NullRWMutex() {}
    void rdlock() {}
    void wrlock() {}
    void unlock() {}
};

Spinlock:

基于 pthread_spinlock_t 实现的自旋锁。适合短时间的锁定,避免了线程上下文切换的开销。

// 自旋锁
class Spinlock{
public:
    typedef ScopedLockImpl Lock;
    Spinlock()
    {
        pthread_spin_init(&m_mutex, 0);
    }
    ~Spinlock()
    {
        pthread_spin_destroy(&m_mutex);
    }
    void lock()
    {
        pthread_spin_lock(&m_mutex);
    }
    void unlock()
    {
        pthread_spin_unlock(&m_mutex);
    }
private:
    pthread_spinlock_t m_mutex;
};

CASLock:

// 原子锁
class CASLock {
public:
    typedef ScopedLockImpl Lock;
    CASLock() 
    {
        m_mutex.clear();
    }
    ~CASLock() 
    {
    }
    void lock() 
    {
        while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
    }
    void unlock() 
    {
        std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
    }
private:
    volatile std::atomic_flag m_mutex;
};

(三)线程类

封装 POSIX 线程(pthread)的实现,并提供线程创建和管理功能。

class Thread{
public:
    typedef std::shared_ptr ptr;    // 定义共享指针类型
    // 构造函数,接受一个可调用对象 cb 和线程名称 name。cb是线程运行时执行的回调函数
    Thread(std::function<void()> cb, const std::string& name);
    ~Thread();
    pid_t getId() const { return m_id;}                     // 返回线程 id
    const std::string& getName() const { return m_name;}    // 返回线程名称
    void join();                                            // 等待线程结束
    static Thread* GetThis();                               // 指向当前正在执行的 Thread 对象指针
    static const std::string& GetName();                    // 返回当前线程的名称
    static void SetName (const std::string& name);          // 设置当前线程的名称
private:
    Thread(const Thread&) = delete;                 // 禁止拷贝构造
    Thread(Thread&&) = delete;                      // 禁止移动构造
    Thread& operator=(const Thread&) = delete;      // 禁止拷贝赋值
    Thread& operator=(Thread&&) = delete;           // 禁止移动赋值
    static void* run(void* arg);
private:
    pid_t m_id = -1;
    pthread_t m_thread = 0;
    std:: function<void()> m_cb;
    std::string m_name;
    Semaphore m_semaphore;
};

Thread(const Thread&) = delete;                 // 禁止拷贝构造
Thread(Thread&&) = delete;                      // 禁止移动构造
Thread& operator=(const Thread&) = delete;      // 禁止拷贝赋值
Thread& operator=(Thread&&) = delete;           // 禁止移动赋值

拷贝语义不适用于线程:

移动操作的复杂性:

简化类的设计:

pthread_t:

// 阻塞当前线程
pthread_join(pthread_t thread, void** retval)

// 将线程标记为分离状态
pthread_detach(pthread_t thread)

(四)线程类的函数解析(1)构造函数

Thread::Thread(std::function<void()> cb, const std::string &name)
    :m_cb(std::move(cb)), m_name(name)  // 移动赋值能避免不必要的开销
{
    if(name.empty())
    {
        m_name = "UNKNOW";
    }
    // 创建一个新线程
    int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
    if(rt)  // 非零表示创建失败
    {
        SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt="
            << rt << " name=" << name;
        throw std::logic_error("pthread_create error");
    }
    // 等待子线程通知初始化完成。
    // 确保在主线程使用子线程时,子线程已经初始化完成
    m_semaphore.wait();
}

(2)析构函数

Thread::~Thread()
{  
    if(m_thread)
    {
        pthread_detach(m_thread);
    }
}

(3)线程执行入口函数 run

static thread_local Thread* t_thread = nullptr;             // 指向当前执行的 Thread 对象
static thread_local std::string t_thread_name = "UNKNOW";   // 存储当前线程名称
...
void *Thread::run(void *arg)
{
    Thread* thread = (Thread*)arg;  // 将传入的参数 arg 转换为 Thread 指针
    t_thread = thread;              // 将当前线程对象指针赋值给线程局部变量 t_thread
    t_thread_name = thread->m_name; // 设置线程局部变量 t_thread_name 为线程的名称
    thread->m_id = sylar::GetThreadId(); // 获取当前线程 ID,并赋值给线程对象的 m_id
    pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str()); // 设置系统线程名称
    std::function<void()> cb;       // 声明一个函数对象 cb
    cb.swap(thread->m_cb);          // 获取并交换回调函数,保证回调函数的安全执行
    
    // run()是静态成员函数,无法直接访问类的非静态成员函数
    thread->m_semaphore.notify();   // 子线程完成初始化,通知主线程
    cb();                           // 调用回调函数
    return 0;                       // 返回 0,表示线程正常结束
}

cb.swap(thread->m_cb)

在 run() 函数中,当子线程开始执行时,首先会进行一些初始化操作。完成初始化后,子线程调用 thread->m_semaphore.notify(),它通过 notify() 操作将信号量的值增加1,此时如果有线程(主线程)在 wait() 上阻塞,它会被唤醒。即当信号量被通知(值变为大于0),主线程从 m_semaphore.wait() 处返回,继续执行后续的代码。

thread->m_semaphore.notify(); 与 cb(); 顺序不能变,cb() 执行回调,实际上执行了子线程的核心任务。只有在通知主线程子线程初始化完成之后,才能确保回调执行时子线程的状态是安全和一致的。


上一条查看详情 +Electron 优雅的进行进程间通讯
下一条 查看详情 +没有了