点个关注👆跟腾讯工程师学技术
导语 | 本文我们将尝试对整个 C++的协程做深入浅出的剖析,方便大家的理解。再结合上层的封装,最终给出一个 C++异步框架实际业务使用的一种形态,方便大家更好的在实际项目中应用无栈协程。
在开始展开协程前,我们先来看一下一些非 C++语言中的协程实现。
(一)其他语言中的协程实现
很多语言里面,协程是作为 "一类公民" 直接加入到语言特性中的, 比如:
-
Dart1.9示例代码
Future<int> getPage(t) async {var c = new http.Client();try {var r = await c.get('http://xxx');print(r);return r.length();} finally {await c.close();}}
-
Python示例代码
async def abinary(n):if n <= 0:return 1l = await abinary(n-1)r = await abinary(n-1)return l + 1 + r
-
C#示例代码
aysnc Task<string> WaitAsync(){await Task.Delay(10000);return "Finished";}
-
小结
众多语言都实现了自己的协程机制, 通过上面的例子, 我们也能看到, 相关的机制使函数的执行特殊化了, 变成了可以多次中断和重入的结构. 那么如果 C++要支持这种机制, 会是一个什么情况呢? 接下来我们将先从最基本的原理逐步展开相关的探讨。
-
协作式多任务操作系统
-
抢占式多任务操作系统
聊到中断, 其中比较重要的就是执行环境的保存和恢复了, 而上下文的保存能力可以是操作系统直接提供的, 也可以是程序机制自身所提供的了, 综上所述, 我们大致可以将 c++中的协程的实现方案的迭代看成如下情况:
-
最早利用 setjump 来实现的协作式任务调度器
-
系统级实现, 如 linux 提供的 ucontext 相关 API, Windows 提供的 Fiber 相关的 Api
-
由系统级实现所衍生出的高性能方案, 一般是借签系统级的实现, 移除一些非必须的操作所达成的, 代表的方案有大家熟知的libco和 boost::context, 也就是我们通常所说的有栈协程实现
-
无栈实现, 最开始是纯粹使用 duff device hack 出来的方案, 后续被 MS 规整, 部分特性依赖 compiler 实现, 逐步演化成现在的 c++20 coroutine 机制了。
了解了协程在 C++中的部分历史, 我们来简单了解一下协程的执行机制, 这里我们直接以 C++20 为例, 先来看一下概览图:
关于协程的执行, 我们主要关注以下这些地方:
-
中断点和重入点的定义
-
协程执行到哪了。
-
协程当前使用的 context 进行保存, 并将程序的执行权归还给外界. 此时我们也可以返回必要的值给外界, 方便外界更好的对协程的后续执行进行控制。
-
恢复到上次挂起执行的地方继续执行
-
恢复保存的 context
-
传递必要的值到协程
-
怎么保存和恢复当前的执行位置。
-
怎么保存和恢复当前协程引用到的内存(变量等) 本篇主要侧重无栈协程, 无栈协程相关的机制后续会具体展开. 对有栈协程相关机制感兴趣的可以翻阅 libco 或 boost.context 相关的内容进行了解。
(四) 小议无栈协程的出现
-
业务复杂度膨胀带来的爆栈问题
-
使用过大的栈, 又会导致协程本身的切换开销上升或者占用内存过多.
-
Duff Device Hack实现
-
C++20 的 Coroutine
(五) 小结
(一)项目的背景介绍
-
避免大量中间类的定义和使用。
-
基于逻辑过程本身用串行的方式实现相关代码即可(可参考后续切场景的例子)
-
更容易写出数据驱动向的实现。
-
还有比较关键的一点, 可以有效避免过多的异步 Callback 导致的逻辑混乱和难于跟踪调试的问题。
(二)为何从 C++17 说起
(三)实现概述
(四)执行流程概述
整体的执行流程通过上面的分析我们也能比较简单的整理出来:
-
宏展开形成一个跨越协程函数首尾的大的 swith case 状态机。
-
协程执行时构建新的 CoPromise 对象, 正确的处理输入参数, 输入参数会被存储在 CoPromise 对象的 std::tuple<>上, 并且每次重入时作为函数的入口参数以引用的方式转入函数内部
-
每次 Resume()时根据当前 CoPromise 记录的 state, 跳转到正确的 case label 继续往下执行.
-
执行到下一个挂起点返回控制权到调度器
(五) 另外一个示例代码
mScheduler.CreateTask([](int& c, LocalStruct& locals) -> logic::CoTaskForScheduler {rco_begin();{locals.local_i = 1024;auto* task = rco_self_task();printf("step1 %d\n", locals.local_i);}rco_yield_next_frame();{c = 0;while(c < 5) {printf("in while loop c = %d\n", c);rco_yield_sleep(1000);c++;}rco_yield_next_frame();}rco_end();}, 3, LocalStruct{});
(六)绕开栈变量限制的方法
rco_begin();{rco_set_value("id", 35567);}rco_yield_next_frame();{{int64_t& val = rco_ref_value("id", int64_t);val = 5;}locals.local_i = rco_to_value("id", int);}rco_end();
(七)一个内部项目中后台切场景的代码示例
rco_begin();{locals.clientReq = req;locals.session = CServerUtil::GetSessionObj(sessionId);// ...SSTbusppInstanceKey emptyInstKey;emptyInstKey.Init();if (locals.session->GetTargetGameSvrID() != emptyInstKey) {// ...rco_await(locals.gameSceneService->CheckChangeScene(locals.playerId, locals.checkChangeSceneReq));// ...// 保存大世界信息// ...rco_await(locals.gameSceneService->ExitMainland(locals.playerId, locals.exitMainlandReq));// ...}auto gameMgrClient = GServer->GetRpcClient(TbusppInstanceKey{TBUSPP_SERVER_GAMEMGRSVR, ""});locals.gameMgrService = rstudio::rpc_proxy::GameMgrService_Proxy::Create(gameMgrClient, GServer->GetRpcScheduler());// ...LOG_DEBUG(locals.playerId, "[CHANGE SCENE] ready to Queryline group");}rco_await(locals.gameMgrService->QueryMainland2(locals.playerId, locals.querySpaceReq));{// ...rco_await(locals.gameSceneService->ChangeMainland(locals.playerId, locals.localInstanceKey, locals.changeMainlandReq));// ...}// ...LOG_DEBUG(locals.playerId, "[CHANGE SCENE] send change mainland_conf");rco_emit_finish_event(rstudio::logic::CoRpcFinishEvent(rstudio::reflection::Value(locals.clientRes)));rco_return;rco_end();
C++20 Coroutine 机制简介
-
Function Body: 通常普通函数添加 co_await 等协程关键字处理返回值就可以作为一个协程函数。
-
coroutine_handle<>: 对协程的生命周期进行控制。
-
promise_type: 异常处理, 结果接收, 同时也可以对协程部分行为进行配置, 如协程初始化时的状态, 结束时的状态等。
-
Awaitable 对象: 业务侧的中断重入点定义和数据传输定制点, 结合 co_await 关键字, 我们就能借助 compiler 实现正确的中断, 重入语义了。
(一) 一个简单的示例 - 并不简单
#include <iostream>#include <coroutine>using namespace std;struct resumable_thing{struct promise_type{resumable_thing get_return_object(){return resumable_thing(coroutine_handle<promise_type>::from_promise(*this));}auto initial_suspend() { return suspend_never{}; }auto final_suspend() noexcept { return suspend_never{}; }void return_void() {}void unhandled_exception() {}};coroutine_handle<promise_type> _coroutine = nullptr;resumable_thing() = default;resumable_thing(resumable_thing const&) = delete;resumable_thing& operator=(resumable_thing const&) = delete;resumable_thing(resumable_thing&& other): _coroutine(other._coroutine) {other._coroutine = nullptr;}resumable_thing& operator = (resumable_thing&& other) {if (&other != this) {_coroutine = other._coroutine;other._coroutine = nullptr;}}explicit resumable_thing(coroutine_handle<promise_type> coroutine) : _coroutine(coroutine){}~resumable_thing(){if (_coroutine) { _coroutine.destroy(); }}void resume() { _coroutine.resume(); }};resumable_thing counter() {cout << "counter: called\n";for (unsigned i = 1; ; i++){co_await std::suspend_always{};cout << "counter:: resumed (#" << i << ")\n";}}int main(){cout << "main: calling counter\n";resumable_thing the_counter = counter();cout << "main: resuming counter\n";the_counter.resume();the_counter.resume();the_counter.resume();the_counter.resume();the_counter.resume();cout << "main: done\n";return 0;}
main: calling countercounter: calledmain: resuming countercounter: resumed (#1)counter: resumed (#2)counter: resumed (#3)counter: resumed (#4)counter: resumed (#5)main: done
(二)Coroutine20 的实现猜想
这其实也是 C++20 Coroutine 使用的一大难点, 除了前文提到的, 特性通过 Awaitable 定制点开放给你的地方, 整体的运作机制, 我们是很难直接得出的. 另外, 在一些多线程协程混用的复杂情况下, 整体运作机制对于我们实现正确的框架, 正确的分析解决碰到的问题至关重要. 那么我们现在的问题就变成了, 怎么去补全出包含编译器处理的整体代码?
(三)借助 "cppinsights"
/************************************************************************************** NOTE: The coroutine transformation you've enabled is a hand coded transformation! ** Most of it is _not_ present in the AST. What you see is an approximation. **************************************************************************************/#include <iostream>#include <coroutine>using namespace std;struct resumable_thing{struct promise_type{inline resumable_thing get_return_object(){return resumable_thing(resumable_thing(std::coroutine_handle<promise_type>::from_promise(*this)));}inline std::suspend_never initial_suspend(){return std::suspend_never{};}inline std::suspend_never final_suspend() noexcept{return std::suspend_never{};}inline void return_void(){}inline void unhandled_exception(){}// inline constexpr promise_type() noexcept = default;};std::coroutine_handle<promise_type> _coroutine;inline constexpr resumable_thing() /* noexcept */ = default;// inline resumable_thing(const resumable_thing &) = delete;// inline resumable_thing & operator=(const resumable_thing &) = delete;inline resumable_thing(resumable_thing && other): _coroutine{std::coroutine_handle<promise_type>(other._coroutine)}{other._coroutine.operator=(nullptr);}inline resumable_thing & operator=(resumable_thing && other){if(&other != this) {this->_coroutine.operator=(other._coroutine);other._coroutine.operator=(nullptr);}}inline explicit resumable_thing(std::coroutine_handle<promise_type> coroutine): _coroutine{std::coroutine_handle<promise_type>(coroutine)}{}inline ~resumable_thing() noexcept{if(static_cast<bool>(this->_coroutine.operator bool())) {this->_coroutine.destroy();}}inline void resume(){this->_coroutine.resume();}};struct __counterFrame{void (*resume_fn)(__counterFrame *);void (*destroy_fn)(__counterFrame *);std::__coroutine_traits_impl<resumable_thing>::promise_type __promise;int __suspend_index;bool __initial_await_suspend_called;unsigned int i;std::suspend_never __suspend_44_17;std::suspend_always __suspend_48_14;std::suspend_never __suspend_44_17_1;};resumable_thing counter(){/* Allocate the frame including the promise */__counterFrame * __f = reinterpret_cast<__counterFrame *>(operator new(__builtin_coro_size()));__f->__suspend_index = 0;__f->__initial_await_suspend_called = false;/* Construct the promise. */new (&__f->__promise)std::__coroutine_traits_impl<resumable_thing>::promise_type{};resumable_thing __coro_gro = __f->__promise.get_return_object() /* NRVO variable */;/* Forward declare the resume and destroy function. */void __counterResume(__counterFrame * __f);void __counterDestroy(__counterFrame * __f);/* Assign the resume and destroy function pointers. */__f->resume_fn = &__counterResume;__f->destroy_fn = &__counterDestroy;/* Call the made up function with the coroutine body for initial suspend.This function will be called subsequently by coroutine_handle<>::resume()which calls __builtin_coro_resume(__handle_) */__counterResume(__f);return __coro_gro;}/* This function invoked by coroutine_handle<>::resume() */void __counterResume(__counterFrame * __f){try{/* Create a switch to get to the correct resume point */switch(__f->__suspend_index) {case 0: break;case 1: goto __resume_counter_1;case 2: goto __resume_counter_2;}/* co_await insights.cpp:44 */__f->__suspend_44_17 = __f->__promise.initial_suspend();if(!__f->__suspend_44_17.await_ready()) {__f->__suspend_44_17.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());__f->__suspend_index = 1;__f->__initial_await_suspend_called = true;return;}__resume_counter_1:__f->__suspend_44_17.await_resume();std::operator<<(std::cout, "counter: called\n");for( __f->i = 1; ; __f->i++) {/* co_await insights.cpp:48 */__f->__suspend_48_14 = std::suspend_always{};if(!__f->__suspend_48_14.await_ready()) {__f->__suspend_48_14.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());__f->__suspend_index = 2;return;}__resume_counter_2:__f->__suspend_48_14.await_resume();std::operator<<(std::operator<<(std::cout, "counter:: resumed (#").operator<<(__f->i), ")\n");}goto __final_suspend;} catch(...) {if(!__f->__initial_await_suspend_called) {throw ;}__f->__promise.unhandled_exception();}__final_suspend:/* co_await insights.cpp:44 */__f->__suspend_44_17_1 = __f->__promise.final_suspend();if(!__f->__suspend_44_17_1.await_ready()) {__f->__suspend_44_17_1.await_suspend(std::coroutine_handle<resumable_thing::promise_type>::from_address(static_cast<void *>(__f)).operator coroutine_handle());};}/* This function invoked by coroutine_handle<>::destroy() */void __counterDestroy(__counterFrame * __f){/* destroy all variables with dtors */__f->~__counterFrame();/* Deallocating the coroutine frame */operator delete(__builtin_coro_free(static_cast<void *>(__f)));}int main(){std::operator<<(std::cout, "main: calling counter\n");resumable_thing the_counter = counter();std::operator<<(std::cout, "main: resuming counter\n");the_counter.resume();the_counter.resume();the_counter.resume();the_counter.resume();the_counter.resume();std::operator<<(std::cout, "main: done\n");return 0;}
(四) Coroutine20 基本结构 - Compiler 视角
-
virtual table 部分, 正确的告知你协程使用的 resume 函数以及 destroy 函数
-
自动处理的栈变量, 如下图中所示的 i
-
各种使用到的 awaitable object, 这是因为 awaitable object 本身也是有状态的, 需要正确记录
-
当前执行到的位置, 这个是通过整形的__suspend_index来记录的.
(五)Compiler 视角重新分析示例代码
-
couter() - Function Body
我们知道, couter()会被编译器改写, 最终其实是变成了三个函数:
-
单纯负责生命周期以及生成正确的__counterFrame对象的counter(), 只是一个协程入口函数.
-
负责真正执行逻辑的 __counterResume()函数, 它的输入参数就是__counterFrame对象.
-
负责删除**counterFrame 对象的 **counterDestroy()函数.
通过一拆三, 编译器很好的解决了协程的入口, 协程的中断重入, 和协程以及相关对象的销毁的问题.
-
coroutine_handle<>
template <> struct coroutine_handle<void>{constexpr coroutine_handle() noexcept;constexpr coroutine_handle(nullptr_t) noexcept;coroutine_handle& operator=(nullptr_t) noexcept;constexpr void* address() const noexcept;constexpr static coroutine_handle from_address(void* addr);constexpr explicit operator bool() const noexcept;bool done() const;void operator()();void resume();void destroy();private:void* ptr;// exposition only};
template <typename Promise>struct coroutine_handle: coroutine_handle<void>{Promise& promise() const noexcept;static coroutine_handle from_promise(Promise&) noexcept;};
-
promise_type
/* This function invoked by coroutine_handle<>::resume() */void __counterResume(__counterFrame * __f){try{/* Create a switch to get to the correct resume point */switch(__f->__suspend_index) {case 0: break;case 1: goto __resume_counter_1;case 2: goto __resume_counter_2;}/* initial suspend handle here~~ */__f->__suspend_44_17 = __f->__promise.initial_suspend();__resume_counter_1:/* do somthing for yield~~ */__resume_counter_2:/* do somthing for resume~~ */goto __final_suspend;} catch(...) {if(!__f->__initial_await_suspend_called) {throw ;}__f->__promise.unhandled_exception();}__final_suspend:/* final suspend here~~ */__f->__suspend_44_17_1 = __f->__promise.final_suspend();}
-
Awaitable 对象
-
std:suspend_always
-
std::suspend_never另外我们也能通过多种方式定义 awaitable 对象
-
通过重载promise_type的await_transform() - 这是 asio 所使用的方式, 侵入性比较强
-
通过为对象实现operator co_await()
-
通过实现 awaitable 对象需要的三个子函数await_ready(), await_suspend(), await_resume() - 推荐的方式 那么当我们调用co_await awaitable的时候, 发生的事情是什么呢, 我们同样通过预处理的代码来进行了解:
__resume_counter_1:__f->__suspend_44_17.await_resume();std::operator<<(std::cout, "counter: called\n");for( __f->i = 1; ; __f->i++) {/* co_await insights.cpp:48 */__f->__suspend_48_14 = std::suspend_always{};if(!__f->__suspend_48_14.await_ready()) {__f->__suspend_48_14.await_suspend(coroutine_handle);__f->__suspend_index = 2;return;}__resume_counter_2:__f->__suspend_48_14.await_resume();std::cout << "counter:: resumed (#" << __f->i << ")\n";}
-
await_ready() - 判断是否需要挂起, 如不需要挂起, 则直接执行后续逻辑, 这里也就是继续到__resume_counter_2这个 label 执行重入点的逻辑
-
await_suspend() - 中断点触发的时候执行的逻辑, 业务中我们一般在此处发起异步操作
-
await_resume() - 重入点触发的时候执行的逻辑. 整体的机制是不是清晰了很多?
(六) 小结 - C++20 协程的特点总结
-
一套理解上稍显复杂, 需要结合 cppinsights 等工具才能了解整体的运行机制
-
适当封装, 还是能够很好的满足业务需求
-
对比 17 版本的实现, 20 版基本上没有什么使用上的限制
-
自动栈变量的处理, 可以让业务侧以更低的心智负担来进行开发
-
通过 Awaitable 对象, 我们能够扩展co_await支持的业务, 这种实现侵入性低, 实际使用负担小
-
对于异步操作较多, 多节点较多, 特别是多个异步操作级联的使用场景, 很值得实装.
-
最后我们讲解使用的是 clang, 但对于 gcc, msvc, 这些同样适用, 标准的提案来源是一致的, 都是 msvc 发起的那份, compiler 实现上存在一些细微的差异, 但基本不影响使用.
(一)Sheduler 实现的动机
(二)Scheduler 核心机制
-
Awaitable 机制: 前面也介绍了利用 c++20 的 co_await 关键字和 awaitable 对象, 我们可以很好的定义挂起点, 以及交换协程和外部系统的数据。
-
Return Callback 机制: 部分协程执行完后需要向外界反馈执行结果(如协程模式执行的 Rpc Service).
(三)Scheduler 核心对象
-
ISchedTask & SchedTaskCpp20
using CoReturnFunction = std::function<void(const CoReturnObject*)>;class ISchedTask{ friend class Scheduler; public: ISchedTask() = delete; ISchedTask(const SchedTaskCpp17&) = delete; ISchedTask(uint64_t taskId, Scheduler* manager); virtual ~ISchedTask(); uint64_t GetId() const; virtual int Run() = 0; virtual bool IsDone() const = 0; virtual CO_TASK_STATE GetCoState() const = 0; void BindSleepHandle(uint64_t handle); AwaitMode GetAwaitMode() const; int GetAwaitTimeout() const; template<typename AwaitEventType> auto BindResumeObject(AwaitEventType&& awaitEvent)->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value>; template<typename AwaitEventType> auto GetResumeObjectAsType()->std::enable_if_t<std::is_base_of<ResumeObject, AwaitEventType>::value, AwaitEventType*>; bool HasResumeObject() const noexcept; void ClearResumeObject(); bool IsLastInvokeSuc() const noexcept; bool IsLastInvokeTimeOut() const noexcept; bool IsLastInvokeFailed() const noexcept; void AddChildTask(uint64_t tid); void AddWaitNofityTask(uint64_t tid); const auto& GetChildTaskArray() const; const auto& GetWaitNotifyArray() const; void Terminate(); Scheduler* GetManager() const; static ISchedTask* CurrentTask(); void DoYield(AwaitMode mode, int awaitTimeMs = 0); void SetReturnFunction(CoReturnFunction&& func); void DoReturn(const CoReturnObject& obj); void DoReturn(); protected: uint64_t mTaskId; Scheduler* mManager; std::vector<uint64_t> mChildArray; std::vector<uint64_t> mWaitNotifyArray; //value used to return from coroutine AwaitMode mAwaitMode = AwaitMode::AwaitDoNothing; int mAwaitTimeout = 0; //value used to send to coroutine(now as a AwaitEvent) reflection::UserObject mResumeObject; uint64_t mSleepHandle = 0; bool mIsTerminate = false; CoReturnFunction mCoReturnFunc;};class SchedTaskCpp20: public ISchedTask{ public: SchedTaskCpp20(uint64_t taskId, CoTaskFunction&& taskFunc, Scheduler* manager); ~SchedTaskCpp20(); int Run() override; bool IsDone() const override; CO_TASK_STATE GetCoState() const override; void BindSelfToCoTask(); const CoResumingTaskCpp20& GetResumingTask() const; protected: CoResumingTaskCpp20 mCoResumingTask; CoTaskFunction mTaskFuncion;};
C++20 的 SchedTaskCpp20 主要完成对协程对象的封装, CoTaskFunction 用于存储相关的函数对象, 而 CoResumingTaskCpp20 则如同前面示例中的 resumable_thing 对象,内部有需要的 promise_type 实现, 我们对协程的访问也是通过它来完成的。
此处需要注意的是我们保存了协程对象外, 还额外保存了相关的函数对象, 这是因为如果协程本身是一个 lambda, compiler 并不会帮我们正确维护 lambda 的生命周期以及 lambda 所捕获的函数, 尚未清楚是实现缺陷还是功能就是如此, 所以此处需要一个额外存在的 std::function<>对象, 来保证对应 lambda 的生命周期是正确的。
对比 17 的实现, 我们的 SchedTask 对象中主要保留了:reflection::UserObject mResumeObject: 主要用于异步等待的执行, 当一个异步等待成功执行的时候, 向协程传递值。
原来利用事件去处理最终返回值的机制也替换成了 Return 回调的方式,相对来说更简单直接, 利用 lambda 本身也能很方便的保存需要最终回传的临时值了。
-
Scheduler
Scheduler 的代码比较多, 主要就是 SchedTask 的管理器, 另外也完成对前面提到的三种机制的支持, 文章重点分析一下三种机制的实现代码.
-
Yield 处理
void Scheduler::Update(){RSTUDIO_PROFILER_METHOD_INFO(sUpdate, "Scheduler::Update()", rstudio::ProfilerGroupType::kLogicJob);RSTUDIO_PROFILER_AUTO_SCOPE(sUpdate);//Handle need kill task firstwhile(!mNeedKillArray.empty()){auto tid = mNeedKillArray.front();mNeedKillArray.pop();auto* tmpTask = GetTaskById(tid);if (tmpTask != nullptr){DestroyTask(tmpTask);}}//Keep a temp queue for not excute next frame task right nowdecltype(mFrameStartTasks) tmpFrameTasks;mFrameStartTasks.swap(tmpFrameTasks);while (!tmpFrameTasks.empty()){auto task_id = tmpFrameTasks.front();tmpFrameTasks.pop();auto* task = GetTaskById(task_id);LOG_CHECK_ERROR(task);if (task){AddToImmRun(task);}}}void Scheduler::AddToImmRun(ISchedTask* schedTask){LOG_PROCESS_ERROR(schedTask);schedTask->Run();if (schedTask->IsDone()){DestroyTask(schedTask);return;}{auto awaitMode = schedTask->GetAwaitMode();auto awaitTimeoutMs = schedTask->GetAwaitTimeout();switch (schedTask->GetAwaitMode()){case rstudio::logic::AwaitMode::AwaitNever:AddToImmRun(schedTask);break;case rstudio::logic::AwaitMode::AwaitNextframe:AddToNextFrameRun(schedTask);break;case rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout:case rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:{HandleTaskAwaitForNotify(schedTask, awaitMode, awaitTimeoutMs);}break;case rstudio::logic::AwaitMode::AwaitDoNothing:break;default:RSTUDIO_ERROR(CanNotRunToHereError());break;}}Exit0:return;}
-
rstudio::logic::AwaitMode::AwaitNever: 立即将协程加入回 mReadyTask 队列, 对应协程会被马上唤醒执行
-
rstudio::logic::AwaitMode::AwaitNextframe: 将协程加入到下一帧执行的队列, 协程将会在下一帧被唤醒执行
-
rstudio::logic::AwaitMode::AwaitForNotifyNoTimeout: 等待外界通知后再唤醒执行(无超时模式), 注意该模式下如果一直没收到通知, 相关协程会一直在队列中存在.
-
rstudio::logic::AwaitMode::AwaitForNotifyWithTimeout:同 3, 差别是存在一个超时时间, 超时时间到了也会唤醒协程, 业务方可以通过 ResumeObject 判断协程是被超时唤醒的.
-
**rstudio::logic::AwaitMode::AwaitDoNothing:**特殊的 AwaitHandle 实现会使用该模式, 比如删除 Task 的实现, 都要删除 Task 了, 我们肯定不需要再将 Task 加入任何可唤醒队列了.
-
Resume处理
Resume 机制主要是通过唤醒在 Await 队列中的协程的时候向关联的 Task 对象传递 ResumeObject 实现的:
//Not a real event notify here, just do need thingstemplate <typename E>auto ResumeTaskByAwaitObject(E&& awaitObj)-> std::enable_if_t<std::is_base_of<ResumeObject, E>::value>{auto tid = awaitObj.taskId;if (IsTaskInAwaitSet(tid)){//Only in await set task can be resumeauto* task = GetTaskById(tid);if (RSTUDIO_LIKELY(task != nullptr)){task->BindResumeObject(std::forward<E>(awaitObj));AddToImmRun(task);}OnTaskAwaitNotifyFinish(tid);}}
#define rco_get_resume_object(ResumeObjectType) rco_self_task()->GetResumeObjectAsType<ResumeObjectType>()
本身就是一个简单的传值取值的过程. 注意传递 ResumeObject 后, 我们也会马上将协程加入到 mReadTasks 队列中以方便在接下来的 Update 中唤醒它.
-
一个 Awaitable 实现的范例
我们以 Rpc 的协程化 Caller 实现为例, 看看一个 awaitable 对象应该如何构造:
class RSTUDIO_APP_SERVICE_API RpcRequest{public:RpcRequest() = delete;////RpcRequest(const RpcRequest&) = delete;~RpcRequest() = default;RpcRequest(const logic::GameServiceCallerPtr& proxy,const std::string_view funcName,reflection::Args&& arg, int timeoutMs) :mProxy(proxy), mFuncName(funcName), mArgs(std::forward<reflection::Args>(arg)), mTimeoutMs(timeoutMs){}bool await_ready(){return false;}void await_suspend(coroutine_handle<>) const noexcept{auto* task = rco_self_task();auto context = std::make_shared<ServiceContext>();context->TaskId = task->GetId();context->Timeout = mTimeoutMs;auto args = mArgs;mProxy->DoDynamicCall(mFuncName, std::move(args), context);task->DoYield(AwaitMode::AwaitForNotifyNoTimeout);}::rstudio::logic::RpcResumeObject* await_resume() const noexcept{return rco_get_resume_object(logic::RpcResumeObject);}private:logic::GameServiceCallerPtr mProxy;std::string mFuncName;reflection::Args mArgs;int mTimeoutMs;};
重点是前面说到的 await_ready(), await_suspend(), await_resume()函数的实现。
-
ReturnCallback 机制
有一些场合, 可能需要协程执行完成后向业务系统发起通知并传递返回值, 比如 Rpc Service 的协程支持实现等, 这个特性其实比较类似 go 的 defer, 只是这里的实现更简单, 只支持单一函数的指定而不是队列. 我们直接以 RpcService 的协程支持为例来看一下这一块的具体使用.
首先是业务侧, 在创建完协程后, 需要给协程绑定后续协程执行完成后做进一步操作需要的数据:
task->SetReturnFunction([this, server, entity, cmdHead, routerAddr,reqHead, context](const CoReturnObject* obj) {const auto* returnObj = dynamic_cast<const CoRpcReturnObject*>(obj);if (RSTUDIO_LIKELY(returnObj)){DoRpcResponse(server, entity.get(), routerAddr, &cmdHead,reqHead, const_cast<ServiceContext&>(context),returnObj->rpcResultType,returnObj->totalRet, returnObj->retValue);}});
CoTaskInfo HeartBeatService::DoHeartBeat(logic::Scheduler& scheduler, int testVal){return scheduler.CreateTask20([testVal]() -> logic::CoResumingTaskCpp20 {co_await logic::cotasks::Sleep(1000);printf("service yield call finish!\n");co_return CoRpcReturnObject(reflection::Value(testVal + 1));});}
void CoResumingTaskCpp20::promise_type::return_value(const CoReturnObject& obj){auto* task = rco_self_task();task->DoReturn(obj);}
(四) 示例代码
//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");mScheduler.CreateTask20([clientProxy]()-> rstudio::logic::CoResumingTaskCpp20 {auto* task = rco_self_task();printf("step1: task is %llu\n", task->GetId());co_await rstudio::logic::cotasks::NextFrame{};printf("step2 after yield!\n");int c = 0;while (c < 5) {printf("in while loop c=%d\n", c);co_await rstudio::logic::cotasks::Sleep(1000);c++;}for (c = 0; c < 5; c++) {printf("in for loop c=%d\n", c);co_await rstudio::logic::cotasks::NextFrame{};}printf("step3 %d\n", c);auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false,[]()-> logic::CoResumingTaskCpp20 {printf("from child coroutine!\n");co_await rstudio::logic::cotasks::Sleep(2000);printf("after child coroutine sleep\n");});printf("new task create in coroutine: %llu\n", newTaskId);printf("Begin wait for task!\n");co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 };printf("After wait for task!\n");rstudio::logic::cotasks::RpcRequestrpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000};auto* rpcret = co_await rpcReq;if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) {assert(rpcret->totalRet == 1);auto retval = rpcret->retValue.to<int>();assert(retval == 4);printf("rpc coroutine run suc, val = %d!\n", retval);}else {printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType);}co_await rstudio::logic::cotasks::Sleep(5000);printf("step4, after 5s sleep\n");co_return rstudio::logic::CoNil;} );
step1: task is 1step2 after yield!in while loop c=0in while loop c=1in while loop c=2in while loop c=3in while loop c=4in for loop c=0in for loop c=1in for loop c=2in for loop c=3in for loop c=4step3 5new task create in coroutine: 2Begin wait for task!from child coroutine!after child coroutine sleepAfter wait for task!service yield call finish!rpc coroutine run suc, val = 4!step4, after 5s sleep
-
代码更精简了
-
Stack 变量可以被 Compiler 自动处理, 正常使用了。
-
co_await 可以直接返回值, 并有强制的类型约束了。
-
一个协程函数就是一个返回值为 logic::CoResumingTaskCpp20 类型的 lambda, 可以充分利用 lambda 本身的特性还实现正确的逻辑了。
(一) 示例代码
//C++ 20 coroutineauto clientProxy = mRpcClient->CreateServiceProxy("mmo.HeartBeat");mScheduler.CreateTask20([clientProxy]() -> rstudio::logic::CoResumingTaskCpp20 { auto* task = rco_self_task(); printf("step1: task is %llu\n", task->GetId()); co_await rstudio::logic::cotasks::NextFrame{}; printf("step2 after yield!\n"); int c = 0; while (c < 5) { printf("in while loop c=%d\n", c); co_await rstudio::logic::cotasks::Sleep(1000); c++; } for (c = 0; c < 5; c++) { printf("in for loop c=%d\n", c); co_await rstudio::logic::cotasks::NextFrame{}; } printf("step3 %d\n", c); auto newTaskId = co_await rstudio::logic::cotasks::CreateTask(false, []()-> logic::CoResumingTaskCpp20 { printf("from child coroutine!\n"); co_await rstudio::logic::cotasks::Sleep(2000); printf("after child coroutine sleep\n"); }); printf("new task create in coroutine: %llu\n", newTaskId); printf("Begin wait for task!\n"); co_await rstudio::logic::cotasks::WaitTaskFinish{ newTaskId, 10000 }; printf("After wait for task!\n"); rstudio::logic::cotasks::RpcRequest rpcReq{clientProxy, "DoHeartBeat", rstudio::reflection::Args{ 3 }, 5000}; auto* rpcret = co_await rpcReq; if (rpcret->rpcResultType == rstudio::network::RpcResponseResultType::RequestSuc) { assert(rpcret->totalRet == 1); auto retval = rpcret->retValue.to<int>(); assert(retval == 4); printf("rpc coroutine run suc, val = %d!\n", retval); } else { printf("rpc coroutine run failed! result = %d \n", (int)rpcret->rpcResultType); } co_await rstudio::logic::cotasks::Sleep(5000); printf("step4, after 5s sleep\n"); co_return rstudio::logic::CoNil;} );
(二)小议 C++20 Coroutine 对比 C++17 Coroutine 带来的改进
-
原生关键字 co_await, co_return 的支持, 业务侧使用代码更加精简, 也进一步统一了大家对无栈协程的标准理解.
-
Stack 变量可以被 compiler 自动处理, 这点对比 C++17 需要自行组织状态变量来说是非常节约心智负责的.
-
co_await可以直接返回对应类型的值, 这样协程本身就有了强制的类型约束, 整体业务的表达也会因为不需要从类型擦除的对象获取需要的类型, 变得更顺畅.
(一) 一个 Python 实现的技能示例
-
实现效果
-
技能主流程代码
# handle one skill instance createdef skill_instance_run_func(instance, user, skill_data, target, target_pos, finish_func):# set return callback hereyield TaskSetExitCallback(finish_func)# ... some code ignore herefrom common.gametime import GameTimeinit_time = GameTime.now_timefor skill_step in step_list:step_start_time = GameTime.now_time# ... some code ignore here### 1. period task handleif skill_step.cast_type == CastSkillStep.CAST_TYPE_PERIOD:#... some code ignore here### 2. missle skillelif skill_step.cast_type == CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:if len(skill_step.cast_action_group_list) > 0:action_group = skill_step.cast_action_group_list[0]for i in range(skill_step.cast_count):# yield for sleepyield TaskSleep(skill_step.cast_period)ret_val = do_skill_spend(skill_data, user, instance)if not ret_val:return# sub coroutine(missle_handle_func)task_id = yield TaskNew(missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos))instance.add_child_task_id(task_id)### 3. guide skillelif skill_step.cast_type == CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:#... some code ignore herenow_time = GameTime.now_timestep_pass_time = now_time - step_start_timeneed_sleep_time = skill_step.step_total_time - step_pass_timeif need_sleep_time > 0:yield TaskSleep(need_sleep_time)instance.on_one_step_finish(skill_step)if skill_data.delay_end_time > 0:yield TaskSleep(skill_data.delay_end_time)# wait for child finish~~for task_id in instance.child_task_list:yield TaskWait(task_id)instance.task_id = 0
-
CastSkillStep.CAST_TYPE_PERIOD:周期性触发的技能, 主要使用 yield TaskSleep()
-
CastSkillStep.CAST_TYPE_MISSLE_TO_TARGET:导弹类技能, 使用子协程功能
-
CastSkillStep.CAST_TYPE_GUIDE_TO_TARGET:引导类技能, 使用子协程功能
-
子任务 - 导弹类技能相关代码
### 1. handle for missle skill(etc: fire ball)def missle_handle_func(skill_data, instance, user, skill_step, action_group, target_id, target_pos): effect = instance.create_effect(skill_step.missle_info.missle_fx_path) effect.set_scale(skill_step.missle_info.missle_scale) cur_target_pos, is_target_valid = skill_step.missle_info.get_target_position( user, target_id, target_pos) start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos) is_reach_target = False from common.gametime import GameTime init_time = GameTime.now_time while True: # ... some code ignore here fly_distance = skill_step.missle_info.fly_speed*GameTime.elapse_time if fly_distance < total_distance: start_pos += fly_direction*math3d.vector(fly_distance, fly_distance, fly_distance) effect.set_position(start_pos) else: is_reach_target = True break # do yield util next frame yield effect.destroy() if is_reach_target: target_list = skill_data.get_target_list(user.caster, target_id, target_pos) for target in target_list: action_group.do(user.caster, target)
-
子任务 - 引导类技能代码
### 2. handle for guide skill(etc: lighting chain)def guide_handle_func(skill_data, instance, user, skill_step, start_pos, target_id, target_pos):effect = instance.create_effect(skill_step.guide_info.guide_fx_path)effect.set_scale(skill_step.guide_info.guide_scale)effect.set_position(start_pos)effect.set_guide_end_pos(target_pos - start_pos)# yield for sleepyield TaskSleep(skill_step.guide_info.guide_time)effect.destroy()
(二)对应的 C++实现
//C++ 20 skill test coroutinemScheduler.CreateTask20([instance]() -> rstudio::logic::CoResumingTaskCpp20 {rstudio::logic::ISchedTask* task = rco_self_task();task->SetReturnFunction([](const rstudio::logic::CoReturnObject*) {//ToDo: return handle code add here});for (auto& skill_step : step_list) {auto step_start_time = GGame->GetTimeManager().GetTimeHardwareMS();switch (skill_step.cast_type) {case CastSkillStep::CAST_TYPE_PERIOD: {//... some code ignore here}break;case CastSkillStep::CAST_TYPE_MISSLE_TO_TARGET: {if (skill_step.cast_action_group_list.size() > 0) {auto& action_group = skill_step.cast_action_group_list[0];for (int i = 0; i < skill_step.cast_count; i++) {co_await rstudio::logic::cotasks::Sleep(skill_step.cast_period);bool ret_val = do_skill_spend(skill_data, user, instance);if (!ret_val) {co_return rstudio::logic::CoNil;}auto task_id = co_await rstudio::logic::cotasks::CreateTask(true,[&skill_step]()->rstudio::logic::CoResumingTaskCpp20 {auto cur_target_pos = skill_step.missle_info.get_target_position(user, target_id, target_pos);auto start_pos = skill_step.missle_info.get_start_position(user, target_id, target_pos);bool is_reach_target = false;auto init_time = GGame->GetTimeManager().GetTimeHardwareMS();auto last_time = init_time;do {auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();auto elapse_time = now_time - last_time;last_time = now_time;if (now_time - init_time >= skill_step.missle_info.long_fly_time) {break;}auto cur_target_pos = skill_step.missle_info.get_target_position(user, target_id, target_pos);rstudio::math::Vector3 fly_direction = cur_target_pos - start_pos;auto total_distance = fly_direction.Normalise();auto fly_distance = skill_step.missle_info.fly_speed * elapse_time;if (fly_distance < total_distance) {start_pos += fly_direction * fly_distance;}else {is_reach_target = true;break;}co_await rstudio::logic::cotasks::NextFrame{};} while (true);if (is_reach_target) {//ToDo: add damage calculate here~~}});instance.add_child_task_id(task_id);}}}break;case CastSkillStep::CAST_TYPE_GUIDE_TO_TARGET: {//... some code ignore here}break;default:break;}auto now_time = GGame->GetTimeManager().GetTimeHardwareMS();auto step_pass_time = now_time - step_start_time;auto need_sleep_time = skill_step.step_total_time - step_pass_time;if (need_sleep_time > 0) {co_await rstudio::logic::cotasks::Sleep(need_sleep_time);}instance.on_one_step_finish(skill_step);}if (skill_data.delay_end_time > 0) {co_await rstudio::logic::cotasks::Sleep(skill_data.delay_end_time);}for (auto tid :instance.child_task_list) {co_await rstudio::logic::cotasks::WaitTaskFinish(tid, 10000);}});
(三) 小结
-
结合调度器, C++ Coroutine 的实现与脚本一样具备简洁性, 这得益于 Compiler 对 Stack 变量的自动处理, 以及规整的co_await等关键字支持, 从某种程度上, 我们可以认为这种处理提供了一个简单的类 GC 的能力, 我们可以更低心智负担的开发相关代码.
-
协程的使用同时也会带来其他一些好处, 像避免多级 Callback 带来的代码分散逻辑混乱等问题, 这个在 C++17 协程使用的范例中已经提到过, 此处不再重复.
(一) 对 asio coroutine20 实现部分的思考
-
低使用成本的经典 callback 兼容方案
asio::awaitable<void> watchdog(asio::io_context& ctx) {asio::steady_timer timer(ctx);timer.expires_after(1s);co_await timer.async_wait(asio::use_awaitable);co_return;}
-
利用操作符定义复合任务
auto [e] = co_await server.async_connect(target, use_nothrow_awaitable);if (!e){co_await ((transfer(client, server, client_to_server_deadline) ||watchdog(client_to_server_deadline))&&(transfer(server, client, server_to_client_deadline) ||watchdog(server_to_client_deadline)));}
通过这种机制, 我们一定程度拥有了对任务的复合关系进行表达的能力, 比如对一个原本不支持超时的异步任务, 我们可以非常简单的||上一个超时异步任务, 来解决它的超时支持问题. 这种设计也是很值得参考的.
(二) 关于 executions
(三) 关于后续的迭代
推荐阅读
点击下方空白 ▼ 查看明日开发者黄历
本文来自微信公众号“腾讯云开发者”(ID:QcloudCommunity)。大作社经授权转载,该文观点仅代表作者本人,大作社平台仅提供信息存储空间服务。










