C++实现高性能服务器框架(3) | PHM's world

LOADING

歡迎來到烏托邦的世界

C++实现高性能服务器框架(3)

C++实现高性能服务器(3)—-线程与协程

一,线程模块

  • 封装了一些常用的锁机制(信号量、互斥锁、自旋锁、读写锁),确保线程安全
  • 对pthread系列函数封装,生成Thread类

该模块基于pthread实现。sylar说,由于c++11中的thread也是由pthread封装实现的,并且没有提供读写互斥量,读写锁,自旋锁等,所以自己封装了pthread。

锁模块实现了信号量、互斥量、读写锁、自旋锁、原子锁的封装

  • class Semaphore:信号量封装
  • class Mutex:互斥量封装
  • class RWMutex:读写锁封装
  • class Spinlock:自旋锁封装
  • class CASLock:原子锁封装

线程模块主要由Thread类实现

  • class Thread:实现线程的封装

关于线程id的问题,在获取线程id时使用syscall获得唯一的线程id

css复制代码进程pid: getpid()                 
线程tid: pthread_self()     //进程内唯一,但是在不同进程则不唯一。
线程pid: syscall(SYS_gettid)     //系统内是唯一的

Lock

  • 一些常用的锁机制,包括信号量SemaphoreLock、互斥锁Mutex、自旋锁SpinLock、读写锁RWMutex
  • RAII模式,构造时lock,析构时unlcok
 //局部锁
template<class T>
class ScopedLockImpl {
 public:
     //构造函数,自动加锁
     ScopedLockImpl(T& mutex)
         :m_mutex(mutex) {
         lock();
     }

     //析构函数,自动解锁
     ~ScopedLockImpl() {     
         unlock();
     }

     void lock() {
         if (!m_locked) {
             m_mutex.lock();
             m_locked = true;
         }
     }

     void unlock() {
         if (m_locked) {
             m_mutex.unlock();   
             m_locked = false;
         }
     }
     
 private:
     T& m_mutex;
     bool m_locked = false;
 };

Thread

  • 线程模块,主要是对pthread的封装
class Thread : Noncopyable
{
public:
    typedef std::shared_ptr<Thread> ptr;

    /**
     * @brief 构造函数
     * @param[in] cb 线程执行函数
     * @param[in] name 线程名称
     */
    Thread(std::function<void()> cb, const std::string& name);
    ~Thread();
    void join();				// 等待线程执行完成
    static Thread* GetThis();	// 获取当前的线程指针

    // get set 方法

private:
    static void* run(void* arg);	// 线程执行函数 	
private:	
    pid_t m_id = -1;				// 线程id
    std::string m_name;				// 线程名称
    pthread_t m_thread = 0;			// 线程
    std::function<void()> m_cb;		// 线程执行函数
    SemaphoreLock m_semaphore;		// 信号量
};
  • 线程的入口函数,只支持void(void)类型的入口函数,不支持给线程传参数,但实际使用时可以结合std::bind来绑定参数,这样就相当于支持任何类型和数量的参数。
std::function<void()> m_cb;		// 线程执行函数
  • 因为我们是在线程执行函数中完成一些成员变量的初始化,因此在Thread的构造函数中,应当使用信号量(V操作),保证在构造完成之前已经执行了线程执行函数(入口函数中P操作)。
// 构造函数
Thread::Thread(std::function<void()> cb, const std::string& name)
        : m_cb(cb)
        , m_name(name)
{
    if (name.empty())
    {
        m_name = "UNKNOW";
    }
    int rt = pthread_create(&m_thread, nullptr, &Thread::run, this);
    if (rt) {
        LOG_ERROR(g_logger) << "pthread_create thread fail, rt=" << rt
            << " name=" << name;
        throw std::logic_error("pthread_create error");
    }

    //等待,直到创建出的线程开始执行,run()
    m_semaphore.wait();
}

// 线程执行函数
void* Thread::run(void* arg) {
    Thread* thread = (Thread*)arg;
    t_thread = thread;
    t_thread_name = thread->m_name;
    thread->m_id = johnsonli::getThreadId();
    pthread_setname_np(pthread_self(), thread->m_name.substr(0, 15).c_str());

    std::function<void()> cb;
    cb.swap(thread->m_cb);

    //线程初始化完成后,就唤醒 Thread(),完成构造
    thread->m_semaphore.notify();

    cb();
    return 0;
}

同时也保证了在构造完成之后线程函数一定已经处于运行状态

  • 局部线程静态变量,表示当前正在执行的线程。声明为static thread_local
// thread.cpp
// 局部线程静态变量,只在当前线程有用
// 当前运行的线程
static thread_local Thread* t_thread = nullptr;

在线程执行函数run中,会完成初始化,此后在线程入口函数m_cb中就可以调用静态方法GetThis获得当前线程

// thread.h
static Thread* GetThis()

// thread.cpp
Thread* Thread::GetThis() {
    return t_thread;
}

// main.cpp
void fun1()
{
    // 线程入口函数中获取当前线程
    LOG_INFO(g_logger) << " this.name: " << Thread::GetThis()->getName();
}

Thread::ptr thr(new johnsonli::Thread(&fun1, "name"));

二,协程模块

1.每个协程在创建时都会指定一个入口函数,类似线程。协程的本质就是函数和函数运行状态的组合。
2.在普通函数中,函数一旦被调用,只能从头开始执行,直到函数执行结束退出;协程可以执行一半就退出(call),但并未真正结束,只是暂时让出CPU执行权,之后可以恢复运行(back)。在暂停运行期间,其他协程可以获得CPU并运行,因此协程也称为轻量级线程。
3.本协程模块时基于ucontext_t实现,也就是协程上下文,包含了函数在当前执行状态下的全部CPU寄存器的值(函数栈帧,代码执行位置等),具体信息如下所示

1. 主要功能

  • 使用非对称协程模型,简化程序逻辑
  • 由用户控制协程的执行逻辑,实现了主协程与子协程间的自由切换
  • 每个线程有一个主协程t_threadFiber,由主协程创建子协程,通过call()进入子协程运行,back()退出子协程,返回主协程

为什么要使用协程?

从了解进程,线程,协程之间的区别开始。

  1. 从定义来看
    • 进程是资源分配和拥有的基本单位。进程通过内存映射拥有独立的代码和数据空间,若没有内存映射给进程独立的空间,则没有进程的概念了。
    • 线程是程序执行的基本单位。线程都处在一个进程空间中,可以相互访问,没有限制,所以使用线程进行多任务变成十分便利,所以当一个线程崩溃,其他任何一个线程都不能幸免。每个进程中都有唯一的主线程,且只能有一个,主线程和进程是相互依存的关系,主线程结束进程也会结束。
    • 协程是用户态的轻量级线程,线程内部调度的基本单位。协程在线程上执行。
  2. 从系统调用来看
    • 进程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换进程时需要切换虚拟内存空间,切换页表,切换内核栈以及硬件上下文等,开销非常大。
    • 线程由操作系统进行切换,会在用户态与内核态之间来回切换。在切换线程时需要保存和设置少量寄存器内容,开销很小。
    • 协程由用户进行切换,并不会陷入内核态。先将寄存器上下文和栈保存,等切换回来的时候再进行恢复,上下文的切换非常快
  3. 从并发性来看
    • 不同进程之间切换实现并发,各自占有CPU实现并行
    • 一个进程内部的多个线程并发执行
    • 同一时间只能执行一个协程,而其他协程处于休眠状态,适合对任务进行分时处理

通过协程,我们可以让程序按照我们的想法去运行,而不是从头到尾的执行下去。例如我们在执行一个函数时,可以通过yield退出,让出当前的CPU执行权,等到了合适的时候,通过resume重新恢复运行。

因为协程是在单线程上运行的,并不是并发执行的,是顺序执行的,所以不能使用锁来做协程的同步,这样会直接导致线程的死锁。

实现思路

使用非对称协程的设计思路,通过主协程创建新协程,主协程由swapIn()让出执行权执行子协程的任务,子协程可以通过YieldToHold()让出执行权继续执行主协程的任务,不能在子协程之间做相互的转化,这样会导致回不到main函数的上下文。这里使用了两个线程局部变量保存当前协程和主协程,切换协程时调用swapcontext,若两个变量都保存子协程,则无法回到原来的主协程中。

css复制代码Fiber::GetThis() 获得主协程
                  swapIn()        
Thread->man_fiber --------> sub_fiber (new(Fiber(cb)))
            ^
            | Fiber::YieldToHold()
            |
         sub_fiber

class Fiber(协程模块)

arduino复制代码// 用于生成协程id
static std::atomic<uint64_t> s_fiber_id {0};
// 用于统计当前的协程数
static std::atomic<uint64_t> s_fiber_count {0};

// 约定协程栈的大小1MB
static ConfigVar<uint32_t>::ptr g_fiber_stack_size =
    Config::Lookup<uint32_t>("fiber.stack_size", 1024 * 1024, "fiber stack size");

// 当前协程
static thread_local Fiber *t_fiber = nullptr;
// 主协程
static thread_local Fiber::ptr t_threadFiber = nullptr;
  • 创建/释放协程运行栈
arduino复制代码class MallocStackAllocator {
public:
    static void* Alloc(size_t size) {
        return malloc(size);
    }

    static void Dealloc(void* vp, size_t size) {
        return free(vp);
    }
};

using StackAllocator = MallocStackAllocator;
  • 协程有五种状态
arduino复制代码enum State
{
    // 初始化
    INIT,
    // 暂停
    HOLD,
    // 执行
    EXEC,
    // 结束
    TERM,
    // 可执行
    READY,
    // 异常
    EXCEPT
};

mumber(成员变量)

arduino复制代码// 协程id
uint64_t m_id = 0;
// 协程运行栈大小
uint32_t m_stacksize = 0;
// 协程状态
State m_state = INIT;
// 上下文
ucontext_t m_ctx;
// 协程运行栈指针
void* m_stack = nullptr;
// 协程执行方法
std::function<void()> m_cb;

Fiber(构造函数)

  • private无参构造

主协程的构造

scss复制代码Fiber::Fiber() {
    m_state = EXEC;
    // 设置当前协程
    SetThis(this);
    // 获取当前协程的上下文信息保存到m_ctx中
    if (getcontext(&m_ctx)) {
        SYLAR_ASSERT2(false, "getcontext");
    }

    ++s_fiber_count;

    SYLAR_LOG_DEBUG(g_logger)  << "Fiber::Fiber(root)";
}
  • public有参构造

子协程的构造

scss复制代码Fiber::Fiber(std::function<void()> cb, size_t stacksize, bool use_caller)
    :m_id(s_fiber_count)
    ,m_cb(cb){
    ++s_fiber_count;
    // 若给了初始化值则用给定值,若没有则用约定值
    m_stacksize = stacksize ? stacksize : g_fiber_stack_size->getValue();
    
    // 获得协程运行指针
    m_stack = StackAllocator::Alloc(m_stacksize);
    // 保存当前协程上下文信息到m_ctx中
    if (getcontext(&m_ctx)) {
        SYLAR_ASSERT2(false, "getcontext");
    }
    // uc_link为空,执行完当前context之后退出程序。
    m_ctx.uc_link = nullptr;
    // 初始化栈指针
    m_ctx.uc_stack.ss_sp = m_stack;
    // 初始化栈大小
    m_ctx.uc_stack.ss_size = m_stacksize;
    
    // 指明该context入口函数
    if (!use_caller) {
        makecontext(&m_ctx, &Fiber::MainFunc, 0);
    } else {
        makecontext(&m_ctx, &Fiber::CallerMainFunc, 0);
    }
    
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::Fiber id = " << m_id;
}

~Fiber(析构函数)

释放协程运行栈

scss复制代码Fiber::~Fiber() {
    --s_fiber_count;
    // 子协程
    if (m_stack) {
        // 不在准备和运行状态
        SYLAR_ASSERT(m_state == TERM || m_state == INIT || m_state == EXCEPT);
        // 释放运行栈
        StackAllocator::Dealloc(m_stack, m_stacksize);
    } else {
         // 主协程的释放要保证没有任务并且当前正在运行
        SYLAR_ASSERT(!m_cb);
        SYLAR_ASSERT(m_state == EXEC);
        //若当前协程为主协程,将当前协程置为空
        Fiber *cur = t_fiber;
        if (cur == this) {
            SetThis(nullptr);
        }
    }
    SYLAR_LOG_DEBUG(g_logger) << "Fiber::~Fiber id = " << m_id;
}

reset(重置协程)

ini复制代码void Fiber::reset(std::function<void()> cb) {
    // 主协程不分配栈
    SYLAR_ASSERT(m_stack);
    // 当前协程不在准备和运行态
    SYLAR_ASSERT(m_state == INIT || m_state == TERM || m_state == EXCEPT);
    m_cb = cb;
    if (getcontext(&m_ctx)) {
        SYLAR_ASSERT2(false, "getcontext");
    }
    m_ctx.uc_link = nullptr;
    m_ctx.uc_stack.ss_sp = m_stack;
    m_ctx.uc_stack.ss_size = m_stacksize;

    makecontext(&m_ctx, &Fiber::MainFunc, 0);
    m_state = INIT;
}

call、swapIn(切换到当前协程)

scss复制代码// 从协程主协程切换到当前协程
void Fiber::call() {
    SetThis(this);
    m_state = EXEC;
    if (swapcontext(&t_threadFiber->m_ctx, &m_ctx)) {
        SYLAR_ASSERT2(false, "swapIn_context");
    }
}
// 从调度器的主协程切换到当前协程
void Fiber::swapIn() {
    SetThis(this);
    SYLAR_ASSERT(m_state != EXEC);
    m_state = EXEC;

    if (swapcontext(&Scheduler::GetMainFiber()->m_ctx, &m_ctx)) {
        SYLAR_ASSERT2(false, "swapIn_context");
    }
}

back、swapOut(当前协程切换到后台)

scss复制代码// 从当前协程切换到主协程
void Fiber::back() {
    SetThis(t_threadFiber.get());
    if (swapcontext(&m_ctx, &t_threadFiber->m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}
// 从当前协程切换到调度器主协程
void Fiber::swapOut() {
    SetThis(Scheduler::GetMainFiber());
    // SYLAR_LOG_DEBUG(g_logger) << "change fiber with" << Scheduler::GetMainFiber()->GetFiberId();

    if (swapcontext(&m_ctx, &Scheduler::GetMainFiber()->m_ctx)) {
        SYLAR_ASSERT2(false, "swapcontext");
    }
}

SetThis(设置当前协程)

ini复制代码void Fiber::SetThis(Fiber* f) {
    t_fiber = f;
}

GetThis(返回当前协程并获得主协程)

scss复制代码Fiber::ptr Fiber::GetThis() {
    // 返回当前协程
    if (t_fiber) {
        return t_fiber->shared_from_this();
    }
    // 获得主协程
    Fiber::ptr main_fiber(new Fiber);
    // 此时当前协程应该为主协程
    SYLAR_ASSERT(main_fiber.get() == t_fiber);
    t_threadFiber = main_fiber;

    return t_fiber->shared_from_this();
}

YieldToReady(协程切换到后台, 并且设置为Ready状态)

ini复制代码void Fiber::YieldToReady() {
    Fiber::ptr cur = GetThis();
    cur->m_state = READY;
    cur->swapOut();
}

YieldToHold(协程切换到后台, 并且设置为Hold状态)

ini复制代码void Fiber::YieldToHold() {
    Fiber::ptr cur = GetThis();
    cur->m_state = HOLD;
    cur->swapOut();
}

MainFunc(协程执行函数)

rust复制代码void Fiber::MainFunc() {
    // 获得当前协程
    Fiber::ptr cur = GetThis();
    SYLAR_ASSERT(cur);
    try {
        // 执行任务
        cur->m_cb();
        cur->m_cb = nullptr;
        // 将状态设置为结束
        cur->m_state = TERM;
    } catch (std::exception &ex) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
                                  << std::endl
                                  << " fiber_id = " << cur->getId()
                                  << std::endl
                                  << sylar::BacktraceToString();
    }
    catch (...)
    {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: "
                                  << std::endl
                                  << " fiber_id = " << cur->getId()
                                  << std::endl
                                  << sylar::BacktraceToString();
    }
    
    // 获得裸指针
    auto raw_ptr = cur.get();
    // 引用-1,防止fiber释放不掉
    cur.reset();
    //执行完释放执行权
    raw_ptr->swapOut();

    SYLAR_ASSERT2(false, "never reach fiber_id = " + std::to_string(raw_ptr->getId()));
}

void Fiber::CallerMainFunc() {
    Fiber::ptr cur = GetThis();
    SYLAR_ASSERT(cur);
    try {
        cur->m_cb();
        cur->m_cb = nullptr;
        cur->m_state = TERM;
    } catch (std::exception &ex) {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: " << ex.what()
                                  << std::endl
                                  << " fiber_id = " << cur->getId()
                                  << std::endl
                                  << sylar::BacktraceToString();
    }
    catch (...)
    {
        cur->m_state = EXCEPT;
        SYLAR_LOG_ERROR(g_logger) << "Fiber Except: "
                                  << std::endl
                                  << " fiber_id = " << cur->getId()
                                  << std::endl
                                  << sylar::BacktraceToString();
    }

    auto raw_ptr = cur.get();
    cur.reset();
    raw_ptr->back();

    SYLAR_ASSERT2(false, "never reach fiber_id = " + std::to_string(raw_ptr->getId()));
}