C++实现高性能服务器(3)—-线程与协程
一,线程模块
- 封装了一些常用的锁机制(信号量、互斥锁、自旋锁、读写锁),确保线程安全
- 对pthread系列函数封装,生成Thread类
该模块基于pthread
实现。sylar说,由于c++11中的thread也是由pthread封装实现的,并且没有提供读写互斥量,读写锁,自旋锁等,所以自己封装了pthread。
锁模块实现了信号量、互斥量、读写锁、自旋锁、原子锁的封装
- class Semaphore:信号量封装
- class Mutex:互斥量封装
- class RWMutex:读写锁封装
- class Spinlock:自旋锁封装
- class CASLock:原子锁封装
线程模块主要由Thread类实现
- class Thread:实现线程的封装
关于线程id的问题,在获取线程id时使用syscall获得唯一的线程id
css复制代码进程pid: getpid()
线程tid: pthread_self() //进程内唯一,但是在不同进程则不唯一。
线程pid: syscall(SYS_gettid) //系统内是唯一的
Lock
- 一些常用的锁机制,包括信号量
SemaphoreLock
、互斥锁Mutex
、自旋锁SpinLock
、读写锁RWMutex
- RAII模式,构造时lock,析构时unlcok
//局部锁
template<class T>
class ScopedLockImpl {
public:
//构造函数,自动加锁
ScopedLockImpl(T& mutex)
:m_mutex(mutex) {
lock();
}
//析构函数,自动解锁
~ScopedLockImpl() {
unlock();
}
void lock() {
if (!m_locked) {
m_mutex.lock();
m_locked = true;
}
}
void unlock() {
if (m_locked) {
m_mutex.unlock();
m_locked = false;
}
}
private:
T& m_mutex;
bool m_locked = false;
};
Thread
- 线程模块,主要是对pthread的封装
class Thread : Noncopyable
{
public:
typedef std::shared_ptr<Thread> ptr;
/**
* @brief 构造函数
* @param[in] cb 线程执行函数
* @param[in] name 线程名称
*/
Thread(std::function<void()> cb, const std::string& name);
~Thread();
void join(); // 等待线程执行完成
static Thread* GetThis(); // 获取当前的线程指针
// get set 方法
private:
static void* run(void* arg); // 线程执行函数
private:
pid_t m_id = -1; // 线程id
std::string m_name; // 线程名称
pthread_t m_thread = 0; // 线程
std::function<void()> m_cb; // 线程执行函数
SemaphoreLock m_semaphore; // 信号量
};
- 线程的入口函数,只支持void(void)类型的入口函数,不支持给线程传参数,但实际使用时可以结合std::bind来绑定参数,这样就相当于支持任何类型和数量的参数。
std::function<void()> m_cb; // 线程执行函数
- 因为我们是在线程执行函数中完成一些成员变量的初始化,因此在Thread的构造函数中,应当使用信号量(V操作),保证在构造完成之前已经执行了线程执行函数(入口函数中P操作)。
// 构造函数
Thread::Thread(std::function<void()> cb, const std::string& name)
: m_cb(cb)
, m_name(name)
{
if (name.empty())
{
m_name = "UNKNOW";
}
int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
if (rt) {
LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
<< " name=" << name;
throw std::logic_error("pthread_create error");
}
//等待,直到创建出的线程开始执行,run()
m_semaphore.wait();
}
// 线程执行函数
void* Thread::run(void* arg) {
Thread* thread = (Thread*)arg;
t_thread = thread;
t_thread_name = thread->m_name;
thread->m_id = johnsonli::getThreadId();
pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());
std::function<void()> cb;
cb.swap(thread->m_cb);
//线程初始化完成后,就唤醒 Thread(),完成构造
thread->m_semaphore.notify();
cb();
return 0;
}
同时也保证了在构造完成之后线程函数一定已经处于运行状态
- 局部线程静态变量,表示当前正在执行的线程。声明为
static thread_local
// thread.cpp
// 局部线程静态变量,只在当前线程有用
// 当前运行的线程
static thread_local Thread* t_thread = nullptr;
在线程执行函数run
中,会完成初始化,此后在线程入口函数m_cb
中就可以调用静态方法GetThis
获得当前线程
// thread.h
static Thread* GetThis()
// thread.cpp
Thread* Thread::GetThis() {
return t_thread;
}
// main.cpp
void fun1()
{
// 线程入口函数中获取当前线程
LOG_INFO(g_logger) << " this.name: " << Thread::GetThis()->getName();
}
Thread::ptr thr(new johnsonli::Thread(&fun1, "name"));
二,协程模块
1.每个协程在创建时都会指定一个入口函数,类似线程。协程的本质就是函数和函数运行状态的组合。
2.在普通函数中,函数一旦被调用,只能从头开始执行,直到函数执行结束退出;协程可以执行一半就退出(call),但并未真正结束,只是暂时让出CPU执行权,之后可以恢复运行(back)。在暂停运行期间,其他协程可以获得CPU并运行,因此协程也称为轻量级线程。
3.本协程模块时基于ucontext_t实现,也就是协程上下文,包含了函数在当前执行状态下的全部CPU寄存器的值(函数栈帧,代码执行位置等),具体信息如下所示
1. 主要功能
- 使用非对称协程模型,简化程序逻辑
- 由用户控制协程的执行逻辑,实现了主协程与子协程间的自由切换
- 每个线程有一个主协程t_threadFiber,由主协程创建子协程,通过call()进入子协程运行,back()退出子协程,返回主协程
为什么要使用协程?
从了解进程,线程,协程之间的区别开始。
- 从定义来看
- 进程是资源分配和拥有的基本单位。进程通过内存映射拥有独立的代码和数据空间,若没有内存映射给进程独立的空间,则没有进程的概念了。
- 线程是程序执行的基本单位。线程都处在一个进程空间中,可以相互访问,没有限制,所以使用线程进行多任务变成十分便利,所以当一个线程崩溃,其他任何一个线程都不能幸免。每个进程中都有唯一的主线程,且只能有一个,主线程和进程是相互依存的关系,主线程结束进程也会结束。
- 协程是用户态的轻量级线程,线程内部调度的基本单位。协程在线程上执行。
- 从系统调用来看
- 进程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换进程时需要切换虚拟内存空间,切换页表,切换内核栈以及硬件上下文等,开销非常大。
- 线程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换线程时需要保存和设置少量寄存器内容,开销很小。
- 协程由用户进行切换,并不会陷入内核态。先将寄存器上下文和栈保存,等切换回来的时候再进行恢复,上下文的切换非常快
- 从并发性来看
- 不同进程之间切换实现并发,各自占有CPU实现并行
- 一个进程内部的多个线程并发执行
- 同一时间只能执行一个协程,而其他协程处于休眠状态,适合对任务进行分时处理
通过协程,我们可以让程序按照我们的想法去运行,而不是从头到尾的执行下去。例如我们在执行一个函数时,可以通过yield
退出,让出当前的CPU执行权,等到了合适的时候,通过resume
重新恢复运行。
因为协程是在单线程上运行的,并不是并发执行的,是顺序执行的,所以不能使用锁来做协程的同步,这样会直接导致线程的死锁。
实现思路
使用非对称协程的设计思路,通过主协程创建新协程,主协程由swapIn()
让出执行权执行子协程的任务,子协程可以通过YieldToHold()
让出执行权继续执行主协程的任务,不能在子协程之间做相互的转化,这样会导致回不到main
函数的上下文。这里使用了两个线程局部变量保存当前协程和主协程,切换协程时调用swapcontext
,若两个变量都保存子协程,则无法回到原来的主协程中。
css复制代码Fiber::GetThis() 获得主协程
swapIn()
Thread->man_fiber --------> sub_fiber (new(Fiber(cb)))
^
| Fiber::YieldToHold()
|
sub_fiber
class Fiber(协程模块)
arduino复制代码// 用于生成协程id
static std::atomic<uint64_t> s_fiber_id {0};
// 用于统计当前的协程数
static std::atomic<uint64_t> s_fiber_count {0};
// 约定协程栈的大小1MB
static ConfigVar<uint32_t>::ptr g_fiber_stack_size =
Config::Lookup<uint32_t>("fiber.stack_size", 1024 * 1024, "fiber stack size");
// 当前协程
static thread_local Fiber *t_fiber = nullptr;
// 主协程
static thread_local Fiber::ptr t_threadFiber = nullptr;
- 创建/释放协程运行栈
arduino复制代码class MallocStackAllocator {
public:
static void* Alloc(size_t size) {
return malloc(size);
}
static void Dealloc(void* vp, size_t size) {
return free(vp);
}
};
using StackAllocator = MallocStackAllocator;
- 协程有五种状态
arduino复制代码enum State
{
// 初始化
INIT,
// 暂停
HOLD,
// 执行
EXEC,
// 结束
TERM,
// 可执行
READY,
// 异常
EXCEPT
};
mumber(成员变量)
arduino复制代码// 协程id
uint64_t m_id = 0;
// 协程运行栈大小
uint32_t m_stacksize = 0;
// 协程状态
State m_state = INIT;
// 上下文
ucontext_t m_ctx;
// 协程运行栈指针
void* m_stack = nullptr;
// 协程执行方法
std::function<void()> m_cb;
Fiber(构造函数)
- private无参构造
主协程的构造
scss复制代码Fiber::Fiber() {
m_state = EXEC;
// 设置当前协程
SetThis(this);
// 获取当前协程的上下文信息保存到m_ctx中
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
++s_fiber_count;
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber(root)";
}
- public有参构造
子协程的构造
scss复制代码Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool use_caller)
:m_id(s_fiber_count)
,m_cb(cb){
++s_fiber_count;
// 若给了初始化值则用给定值,若没有则用约定值
m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
// 获得协程运行指针
m_stack = StackAllocator::Alloc(m_stacksize);
// 保存当前协程上下文信息到m_ctx中
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
// uc_link为空,执行完当前context之后退出程序。
m_ctx.uc_link = nullptr;
// 初始化栈指针
m_ctx.uc_stack.ss_sp = m_stack;
// 初始化栈大小
m_ctx.uc_stack.ss_size = m_stacksize;
// 指明该context入口函数
if (!use_caller) {
makecontext(&m_ctx, &Fiber::MainFunc, 0);
} else {
makecontext(&m_ctx, &Fiber::CallerMainFunc, 0);
}
SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber id = " << m_id;
}
~Fiber(析构函数)
释放协程运行栈
scss复制代码Fiber::~Fiber() {
--s_fiber_count;
// 子协程
if (m_stack) {
// 不在准备和运行状态
SYLAR_ASSERT(m_state == TERM || m_state == INIT || m_state == EXCEPT);
// 释放运行栈
StackAllocator::Dealloc(m_stack, m_stacksize);
} else {
// 主协程的释放要保证没有任务并且当前正在运行
SYLAR_ASSERT(!m_cb);
SYLAR_ASSERT(m_state == EXEC);
//若当前协程为主协程,将当前协程置为空
Fiber *cur = t_fiber;
if (cur == this) {
SetThis(nullptr);
}
}
SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber id = " << m_id;
}
reset(重置协程)
ini复制代码void Fiber::reset(std::function<void()> cb) {
// 主协程不分配栈
SYLAR_ASSERT(m_stack);
// 当前协程不在准备和运行态
SYLAR_ASSERT(m_state == INIT || m_state == TERM || m_state == EXCEPT);
m_cb = cb;
if (getcontext(&m_ctx)) {
SYLAR_ASSERT2(false, "getcontext");
}
m_ctx.uc_link = nullptr;
m_ctx.uc_stack.ss_sp = m_stack;
m_ctx.uc_stack.ss_size = m_stacksize;
makecontext(&m_ctx, &Fiber::MainFunc, 0);
m_state = INIT;
}
call、swapIn(切换到当前协程)
scss复制代码// 从协程主协程切换到当前协程
void Fiber::call() {
SetThis(this);
m_state = EXEC;
if (swapcontext(&t_threadFiber->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapIn_context");
}
}
// 从调度器的主协程切换到当前协程
void Fiber::swapIn() {
SetThis(this);
SYLAR_ASSERT(m_state != EXEC);
m_state = EXEC;
if (swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)) {
SYLAR_ASSERT2(false, "swapIn_context");
}
}
back、swapOut(当前协程切换到后台)
scss复制代码// 从当前协程切换到主协程
void Fiber::back() {
SetThis(t_threadFiber.get());
if (swapcontext(&m_ctx, &t_threadFiber->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
// 从当前协程切换到调度器主协程
void Fiber::swapOut() {
SetThis(Scheduler::GetMainFiber());
// SYLAR_LOG_DEBUG(g_logger) << "change fiber with" << Scheduler::GetMainFiber()->GetFiberId();
if (swapcontext(&m_ctx, &Scheduler::GetMainFiber()->m_ctx)) {
SYLAR_ASSERT2(false, "swapcontext");
}
}
SetThis(设置当前协程)
ini复制代码void Fiber::SetThis(Fiber* f) {
t_fiber = f;
}
GetThis(返回当前协程并获得主协程)
scss复制代码Fiber::ptr Fiber::GetThis() {
// 返回当前协程
if (t_fiber) {
return t_fiber->shared_from_this();
}
// 获得主协程
Fiber::ptr main_fiber(new Fiber);
// 此时当前协程应该为主协程
SYLAR_ASSERT(main_fiber.get() == t_fiber);
t_threadFiber = main_fiber;
return t_fiber->shared_from_this();
}
YieldToReady(协程切换到后台, 并且设置为Ready状态)
ini复制代码void Fiber::YieldToReady() {
Fiber::ptr cur = GetThis();
cur->m_state = READY;
cur->swapOut();
}
YieldToHold(协程切换到后台, 并且设置为Hold状态)
ini复制代码void Fiber::YieldToHold() {
Fiber::ptr cur = GetThis();
cur->m_state = HOLD;
cur->swapOut();
}
MainFunc(协程执行函数)
rust复制代码void Fiber::MainFunc() {
// 获得当前协程
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur);
try {
// 执行任务
cur->m_cb();
cur->m_cb = nullptr;
// 将状态设置为结束
cur->m_state = TERM;
} catch (std::exception &ex) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
<< std::endl
<< " fiber_id = " << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
catch (...)
{
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: "
<< std::endl
<< " fiber_id = " << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
// 获得裸指针
auto raw_ptr = cur.get();
// 引用-1,防止fiber释放不掉
cur.reset();
//执行完释放执行权
raw_ptr->swapOut();
SYLAR_ASSERT2(false, "never reach fiber_id = " + std::to_string(raw_ptr->getId()));
}
void Fiber::CallerMainFunc() {
Fiber::ptr cur = GetThis();
SYLAR_ASSERT(cur);
try {
cur->m_cb();
cur->m_cb = nullptr;
cur->m_state = TERM;
} catch (std::exception &ex) {
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
<< std::endl
<< " fiber_id = " << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
catch (...)
{
cur->m_state = EXCEPT;
SYLAR_LOG_ERROR(g_logger) << "Fiber Except: "
<< std::endl
<< " fiber_id = " << cur->getId()
<< std::endl
<< sylar::BacktraceToString();
}
auto raw_ptr = cur.get();
cur.reset();
raw_ptr->back();
SYLAR_ASSERT2(false, "never reach fiber_id = " + std::to_string(raw_ptr->getId()));
}