- 作者:老汪软件技巧
- 发表时间:2024-11-18 15:06
- 浏览量:
什么是线程
线程与进程,你真得理解了吗_进程和线程的区别-CSDN博客
sylar 中线程模块的组成(一)信号量类(class Semaphore)
// 信号量类
class Semaphore{
public:
Semaphore(uint32_t count = 0);
~Semaphore();
void wait(); // 等待信号量
void notify(); // 发送信号量通知
private:
Semaphore(const Semaphore&) = delete;
Semaphore(Semaphore&&) = delete;
Semaphore& operator=(const Semaphore&) = delete;
Semaphore& operator=(Semaphore&&) = delete;
private:
sem_t m_semaphore;
};
Semaphore::Semaphore(uint32_t count)
{
if(sem_init(&m_semaphore, 0, count))
{
throw std::logic_error("sem_init error");
}
}
Semaphore::~Semaphore()
{
sem_destroy(&m_semaphore);
}
void Semaphore::wait()
{
// sem_wait 用于阻塞线程,直到信号量的值大于0
if(sem_wait(&m_semaphore))
{
throw std::logic_error("sem_wait error");
}
}
void Semaphore::notify()
{
// sem_post 调用时信号量的值增加1
if(sem_post(&m_semaphore))
{
throw std::logic_error("sem_post error");
}
}
POSIX,全称为 “Portable Operating System Interface” ,是由 IEEE(Institute of Electrical and Electronics Engineers) 制定的一系列标准,旨在定义操作系统的 API(应用程序编程接口),以实现应用程序在不同的 Unix 系统和类 Unix 系统之间的可移植性。就是说 POSIX 标准定义了一套操作系统必须实现的 api,这样的话,当你写的代码只使用了POSIX 标准定义的接口时,那么你的代码相对于所有支持 POSIX 标准的操作系统来说,都是可移植的,最多重新编译一下就可以使用。
Semaphore::Semaphore(uint32_t count)
{
if(sem_init(&m_semaphore, 0, count))
{
throw std::logic_error("sem_init error");
}
}
int sem_destroy(sem_t *sem) :
Semaphore::~Semaphore()
{
sem_destroy(&m_semaphore);
}
int sem_wait(sem_t *sem) :
void Semaphore::wait()
{
// sem_wait 用于阻塞线程,直到信号量的值大于0
if(sem_wait(&m_semaphore))
{
throw std::logic_error("sem_wait error");
}
}
int sem_post(sem_t *sem) :
void Semaphore::notify()
{
// sem_post 调用时信号量的值增加1
if(sem_post(&m_semaphore))
{
throw std::logic_error("sem_post error");
}
}
int sem_trywait(sem_t *sem) :
int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout) :
删除拷贝和移动构造函数,以及赋值运算符:
禁止信号量对象的拷贝和移动,以确保线程同步资源的唯一性和一致性。
(二)封装互斥锁(1)三个模板类
ScopedLockImpl:
用于封装常规锁的 RAII 实现。在构造函数中加锁,析构函数中解锁,确保锁在作用域结束时被释放。提供 lock() 和 unlock() 方法,允许在特定情况下手动控制锁定和解锁。
// 对互斥锁的简单封装,基于RAII原则
template<class T>
struct ScopedLockImpl{
public:
ScopedLockImpl(T& mutex) : m_mutex(mutex)
{
lock();
}
~ScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
m_locked = true;
m_mutex.lock();
}
}
void unlock()
{
if(m_locked)
{
m_locked = false;
m_mutex.unlock();
}
}
private:
T& m_mutex;
bool m_locked = false;
};
ReadScopedLockImpl:
专为读锁设计,封装读操作的加锁和解锁。
template<class T>
struct ReadScopedLockImpl{
public:
ReadScopedLockImpl(T& mutex) : m_mutex(mutex)
{
lock();
}
~ReadScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
m_locked = true;
m_mutex.rdlock();
}
}
void unlock()
{
if(m_locked)
{
m_locked = false;
m_mutex.unlock();
}
}
private:
T& m_mutex;
bool m_locked = false;
};
WriteScopedLockImpl:
专为写锁设计,确保写操作的加锁和解锁。
template<class T>
struct WriteScopedLockImpl{
public:
WriteScopedLockImpl(T& mutex) : m_mutex(mutex)
{
lock();
}
~WriteScopedLockImpl()
{
unlock();
}
void lock()
{
if(!m_locked)
{
// 下面两条语句不能调换顺序
m_locked = true;
m_mutex.wrlock();
}
}
void unlock()
{
if(m_locked)
{
m_locked = false;
m_mutex.unlock();
}
}
private:
T& m_mutex;
bool m_locked = false;
};
(2)不同类型的锁
Mutex:
基于 pthread_mutex_t 的普通互斥锁,提供了加锁和解锁功能。
class Mutex{
public:
typedef ScopedLockImpl Lock;
Mutex()
{
pthread_mutex_init(&m_mutex, nullptr);
}
~Mutex()
{
pthread_mutex_destroy(&m_mutex);
}
void lock()
{
pthread_mutex_lock(&m_mutex);
}
void unlock()
{
pthread_mutex_unlock(&m_mutex);
}

private:
pthread_mutex_t m_mutex;
};
NullMutex:
一个 “空”互斥锁,不会执行任何操作,适用于不需要锁的地方,避免代码条件编译。
class NullMutex{
public:
typedef ScopedLockImpl Lock;
NullMutex() {}
~NullMutex() {}
void lock() {}
void unlock() {}
};
RWMutex:
基于 pthread_rwlock_t 的读写锁。提供了读锁 (rdlock()) 和写锁 (wrlock()) 的支持,适用于读多写少的并发场景。
// 读写锁
class RWMutex{
public:
typedef ReadScopedLockImpl ReadLock;
typedef WriteScopedLockImpl WriteLock;
RWMutex()
{
pthread_rwlock_init(&m_lock, nullptr);
}
~RWMutex()
{
pthread_rwlock_destroy(&m_lock);
}
void rdlock()
{
pthread_rwlock_rdlock(&m_lock);
}
void wrlock()
{
pthread_rwlock_wrlock(&m_lock);
}
void unlock()
{
pthread_rwlock_unlock(&m_lock);
}
private:
pthread_rwlock_t m_lock;
};
NullRWMutex:
“空”读写锁,与 NullMutex 类似,用于无需锁的情形。
class NullRWMutex{
public:
typedef ReadScopedLockImpl ReadLock;
typedef WriteScopedLockImpl WriteLock;
NullRWMutex() {}
~NullRWMutex() {}
void rdlock() {}
void wrlock() {}
void unlock() {}
};
Spinlock:
基于 pthread_spinlock_t 实现的自旋锁。适合短时间的锁定,避免了线程上下文切换的开销。
// 自旋锁
class Spinlock{
public:
typedef ScopedLockImpl Lock;
Spinlock()
{
pthread_spin_init(&m_mutex, 0);
}
~Spinlock()
{
pthread_spin_destroy(&m_mutex);
}
void lock()
{
pthread_spin_lock(&m_mutex);
}
void unlock()
{
pthread_spin_unlock(&m_mutex);
}
private:
pthread_spinlock_t m_mutex;
};
CASLock:
// 原子锁
class CASLock {
public:
typedef ScopedLockImpl Lock;
CASLock()
{
m_mutex.clear();
}
~CASLock()
{
}
void lock()
{
while(std::atomic_flag_test_and_set_explicit(&m_mutex, std::memory_order_acquire));
}
void unlock()
{
std::atomic_flag_clear_explicit(&m_mutex, std::memory_order_release);
}
private:
volatile std::atomic_flag m_mutex;
};
(三)线程类
封装 POSIX 线程(pthread)的实现,并提供线程创建和管理功能。
class Thread{
public:
typedef std::shared_ptr ptr; // 定义共享指针类型
// 构造函数,接受一个可调用对象 cb 和线程名称 name。cb是线程运行时执行的回调函数
Thread(std::function<void()> cb, const std::string& name);
~Thread();
pid_t getId() const { return m_id;} // 返回线程 id
const std::string& getName() const { return m_name;} // 返回线程名称
void join(); // 等待线程结束
static Thread* GetThis(); // 指向当前正在执行的 Thread 对象指针
static const std::string& GetName(); // 返回当前线程的名称
static void SetName (const std::string& name); // 设置当前线程的名称
private:
Thread(const Thread&) = delete; // 禁止拷贝构造
Thread(Thread&&) = delete; // 禁止移动构造
Thread& operator=(const Thread&) = delete; // 禁止拷贝赋值
Thread& operator=(Thread&&) = delete; // 禁止移动赋值
static void* run(void* arg);
private:
pid_t m_id = -1;
pthread_t m_thread = 0;
std:: function<void()> m_cb;
std::string m_name;
Semaphore m_semaphore;
};
Thread(const Thread&) = delete; // 禁止拷贝构造
Thread(Thread&&) = delete; // 禁止移动构造
Thread& operator=(const Thread&) = delete; // 禁止拷贝赋值
Thread& operator=(Thread&&) = delete; // 禁止移动赋值
拷贝语义不适用于线程:
移动操作的复杂性:
简化类的设计:
pthread_t:
// 阻塞当前线程
pthread_join(pthread_t thread, void** retval)
// 将线程标记为分离状态
pthread_detach(pthread_t thread)
(四)线程类的函数解析(1)构造函数
Thread::Thread(std::function<void()> cb, const std::string &name)
:m_cb(std::move(cb)), m_name(name) // 移动赋值能避免不必要的开销
{
if(name.empty())
{
m_name = "UNKNOW";
}
// 创建一个新线程
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if(rt) // 非零表示创建失败
{
SYLAR_LOG_ERROR(g_logger) << "pthread_create thread fail, rt="
<< rt << " name=" << name;
throw std::logic_error("pthread_create error");
}
// 等待子线程通知初始化完成。
// 确保在主线程使用子线程时,子线程已经初始化完成
m_semaphore.wait();
}
(2)析构函数
Thread::~Thread()
{
if(m_thread)
{
pthread_detach(m_thread);
}
}
(3)线程执行入口函数 run
static thread_local Thread* t_thread = nullptr; // 指向当前执行的 Thread 对象
static thread_local std::string t_thread_name = "UNKNOW"; // 存储当前线程名称
...
void *Thread::run(void *arg)
{
Thread* thread = (Thread*)arg; // 将传入的参数 arg 转换为 Thread 指针
t_thread = thread; // 将当前线程对象指针赋值给线程局部变量 t_thread
t_thread_name = thread->m_name; // 设置线程局部变量 t_thread_name 为线程的名称
thread->m_id = sylar::GetThreadId(); // 获取当前线程 ID,并赋值给线程对象的 m_id
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str()); // 设置系统线程名称
std::function<void()> cb; // 声明一个函数对象 cb
cb.swap(thread->m_cb); // 获取并交换回调函数,保证回调函数的安全执行
// run()是静态成员函数,无法直接访问类的非静态成员函数
thread->m_semaphore.notify(); // 子线程完成初始化,通知主线程
cb(); // 调用回调函数
return 0; // 返回 0,表示线程正常结束
}
cb.swap(thread->m_cb)
在 run() 函数中,当子线程开始执行时,首先会进行一些初始化操作。完成初始化后,子线程调用 thread->m_semaphore.notify(),它通过 notify() 操作将信号量的值增加1,此时如果有线程(主线程)在 wait() 上阻塞,它会被唤醒。即当信号量被通知(值变为大于0),主线程从 m_semaphore.wait() 处返回,继续执行后续的代码。
thread->m_semaphore.notify(); 与 cb(); 顺序不能变,cb() 执行回调,实际上执行了子线程的核心任务。只有在通知主线程子线程初始化完成之后,才能确保回调执行时子线程的状态是安全和一致的。