1. 程式人生 > >使用 setcontext 類函式實現 mini 協程庫

使用 setcontext 類函式實現 mini 協程庫

協程實現原理

協程的本質都是通過修改 ESP 和 EIP 指標來實現的。其理論上還是單執行緒在執行.

程式在CPU上執行時依賴3個暫存器:

  1. ESP寄存值指向當前棧頂地址,指向當前指令需要的資料
  2. EBP指向當前活動棧幀的基地址
  3. 指令暫存器IP,指向當前需要執行的指令

其中主要有(IP,ESP)暫存器最重要,這兩個暫存器指標的改變可以修改當前需要載入到 CPU 執行的指令和資料,當某個操作陷入到耗時的等待中時,通過修改這兩個指標即可讓出CPU,交給其他的任務去使用,每個任務都必須主動的讓出CPU,然後等待下一次的排程來繼續未完成的任務,這樣就可以最大程度的利用CPU,當一個任務等待過程非常短的時候,就出現了多個任務並行執行的效果,也就是協程。

實現協程的多種方法

  1. 利用 glibcucontext 元件(雲風的庫)
  2. 使用匯編來切換上下文(實現miniC協程,騰訊libco)
  3. 利用C語言語法switch-case的奇淫技巧來實現(Protothreads)
  4. 利用了 C 語言的 setjmplongjmp一種協程的 C/C++ 實現,要求函式裡面使用 static local 的變數來儲存協程內部的資料)
  5. 利用Boost庫提供的兩種實現,分別是stacklessstackfull
  6. 等等語言和庫....

當然了,最"厲害"的還是去用匯編去實現,因為這樣會避免許多跨平臺等問題.比如在有些平臺就會用不了setcontext

等函式.有些函式可能也慢慢的廢除了,所以用匯編實現是最有效的,也是騰訊公司經過多年的實踐得出的.

glibc庫裡的上下文操作函式:

1.getcontext()  : 獲取當前context
2.setcontext()  : 切換到指定context
3.makecontext() : 設定 函式指標 和 堆疊 到對應context儲存的 sp 和 pc 暫存器中,
			呼叫之前要先呼叫 getcontext()
4.swapcontext() : 儲存當前context,並且切換到指定context

具體使用可以自己查詢一下,最後我會用他們實現一個C++11協程庫.

如何實現儲存和恢復協程執行棧


例項:使用C++11setcontext類函式實現協程庫

main.cpp

#include "Coroutine.h"

#include <iostream>
#include <stdio.h>
#include <unistd.h>
using namespace std;
using namespace Tattoo;

class TEST
{
  public:
	void func3(std::shared_ptr<CoroutineSchedule> s, void *arg)
	{
		for (int i = 0; i < 10; i++)
		{
			cout << "coroutine : " << s->GetCurCoID() << " : " << *(int *)arg + i << endl;
			s->Yield();
		}
	}
};

int main()
{
	std::shared_ptr<CoroutineSchedule> schedule = std::make_shared<CoroutineSchedule>();

	int test1 = 1;
	int test2 = 2;
	int test3 = 3;

	TEST tt;

	int id_co1 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test1));
	int id_co2 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test2));
	int id_co3 = schedule->CreateCoroutine(std::bind(&TEST::func3, &tt, schedule, &test3));

	printf("main start\n");
	while ((schedule->IsAlive(id_co1)) && (schedule->IsAlive(id_co2)) && (schedule->IsAlive(id_co3)))
	{
		schedule->ResumeCoroutine(id_co1);
		schedule->ResumeCoroutine(id_co2);
		schedule->ResumeCoroutine(id_co3);
	}
	printf("main end\n");

	return 0;
}

CoroutineSchedule.h


#ifndef _COROUTINESCHEDULER_H
#define _COROUTINESCHEDULER_H //B.h

#include <ucontext.h>
#include <unordered_map>
#include <memory>
#include <functional>
#include <iostream>

#define INFO(x, y, z) std::cout << x << " : " << y << " : " << z << std::endl;
namespace Tattoo
{

class Coroutine;
class CoroutineSchedule
{
  public:
	static const int STACK_SIZE = 1024 * 1024;
	static const int MAX_CO = 16;
	using CoFun = std::function<void()>;

	CoroutineSchedule() : cur_co_num_(0), cur_run_id_(-1) {}
	~CoroutineSchedule() {}

	CoroutineSchedule(const CoroutineSchedule &) = delete;
	CoroutineSchedule &operator=(const CoroutineSchedule &) = delete;

	int CreateCoroutine(CoFun func);
	void DestroyCroutine(int cor_id);
	void ResumeCoroutine(int cor_id);
	void Yield();

	bool IsAlive(int cor_id);
	int GetCurCoID() { return cur_run_id_; }

	static void static_fun(void *arg);

  private:
	std::unordered_map<int, std::shared_ptr<Coroutine>> mmap_;
	char SchStack[STACK_SIZE] = {0};
	ucontext_t main_ctx;
	int cur_co_num_; /*實時記錄協程數量,也會控制map下標*/
	int cur_run_id_;
};
} // namespace Tattoo
#endif

CoroutineSchedule.cpp


#include "Coroutine.h"

namespace Tattoo
{

int CoroutineSchedule::CreateCoroutine(CoFun func)
{
	/*這裡可以用 cur_max_num_ 去限制協程數目*/
	cur_co_num_++;
	auto cor = std::make_shared<Coroutine>(this, func, cur_co_num_);
	mmap_[cur_co_num_] = cor;
	return cur_co_num_;
}
void CoroutineSchedule::DestroyCroutine(int cor_id)
{
	if (mmap_.find(cor_id) == mmap_.end())
		return;
	if (cor_id == cur_run_id_)
		cur_run_id_ = -1;
	mmap_[cor_id]->SetStatus(Coroutine::CO_FINSHED);
	cur_co_num_--; /*下次重複使用就行了,不需要 erase */
}
void CoroutineSchedule::Yield()
{
	if (-1 == cur_run_id_)
		return;
	int id = cur_run_id_;
	std::shared_ptr<Coroutine> cor = mmap_[id];
	// assert(reinterpret_cast<char *>(&cor) > );
	cor->save_stack(SchStack + STACK_SIZE);
	cor->SetStatus(Coroutine::CO_SUSPEND);
	cur_run_id_ = -1;
	swapcontext(&cor->ctx_, &main_ctx);
}
bool CoroutineSchedule::IsAlive(int cor_id)
{
	if (mmap_[cor_id]->GetStatus() == Coroutine::CO_FINSHED)
		return false;
	else
		return true;
}
void CoroutineSchedule::ResumeCoroutine(int cor_id)
{
	// INFO("*******************************************ResumeCoroutine", cor_id, cur_run_id_);

	if (mmap_.find(cor_id) == mmap_.end()) /*不存在的 id */
		return;

	assert(this->cur_run_id_ == -1); //保證沒有其他協程執行

	std::shared_ptr<Coroutine> cor = mmap_[cor_id];
	switch (cor->GetStatus())
	{
	case Coroutine::CO_READY:
	{
		getcontext(&cor->ctx_);
		cor->ctx_.uc_stack.ss_sp = SchStack;
		cor->ctx_.uc_stack.ss_size = STACK_SIZE;
		cor->ctx_.uc_link = &main_ctx;

		cur_run_id_ = cor_id;
		cor->SetStatus(Coroutine::CO_RUNNING);
		makecontext(&cor->ctx_, reinterpret_cast<void (*)()>(static_fun), 1, this);
		swapcontext(&this->main_ctx, &cor->ctx_);
	}
	break;
	case Coroutine::CO_SUSPEND:
	{
		memcpy(SchStack + STACK_SIZE - cor->stack_cur_size_, cor->CorStack, cor->stack_cur_size_);
		cur_run_id_ = cor_id;
		cor->SetStatus(Coroutine::CO_RUNNING);
		swapcontext(&main_ctx, &cor->ctx_);
	}
	break;
	default:
		assert(0);
	}
	if (-1 == cur_run_id_ && cor->status_ == Coroutine::CO_FINSHED)
		DestroyCroutine(cor_id);
}
void CoroutineSchedule::static_fun(void *arg)
{
	CoroutineSchedule *sch = reinterpret_cast<CoroutineSchedule *>(arg);
	int id = sch->cur_run_id_;
	std::shared_ptr<Coroutine> cor = sch->mmap_[id];
	cor->func_();

	sch->cur_run_id_ = -1;
	sch->mmap_[id]->SetStatus(Coroutine::CO_FINSHED);
}
} // namespace Tattoo

Coroutine.h


#ifndef _COROUTINE_H
#define _COROUTINE_H
#include <ucontext.h>
#include <memory>
#include <assert.h>
#include <string.h>
#include "CoroutineSchedule.h"

namespace Tattoo
{
class Coroutine
{
  public:
	using CoFun = std::function<void()>;
	enum
	{
		CO_FINSHED,
		CO_READY,
		CO_RUNNING,
		CO_SUSPEND,
	};
	Coroutine(CoroutineSchedule *sch, CoFun func, int id);
	~Coroutine();

	Coroutine(const Coroutine &) = delete;
	Coroutine &operator=(const Coroutine &) = delete;

	void SetStatus(int status);
	int GetStatus();

  private:
	friend class CoroutineSchedule;

	void save_stack(void *top);

	CoFun func_;
	ucontext_t ctx_;
	std::shared_ptr<CoroutineSchedule> sch_;
	std::ptrdiff_t stack_max_size_; //stack最大 大小
	std::ptrdiff_t stack_cur_size_; //協程實際大小
	int status_;
	char *CorStack;
	int id_;
};
} // namespace Tattoo
#endif

Coroutine.cpp


#include <ucontext.h>
#include <memory>
#include <assert.h>
#include <string.h>

#include "Coroutine.h"

namespace Tattoo
{
Coroutine::Coroutine(CoroutineSchedule *sch, CoFun func, int id)
	: func_(func), id_(id), sch_(sch),
	  stack_max_size_(0), stack_cur_size_(0),
	  status_(CO_READY), CorStack(0)
{
}
Coroutine::~Coroutine()
{
	delete[] CorStack;
}

void Coroutine::SetStatus(int status)
{
	status_ = status;
}
int Coroutine::GetStatus()
{
	return status_;
}
void Coroutine::save_stack(void *top)
{
	char dummy = 0;

	assert((char *)top - &dummy <= CoroutineSchedule::STACK_SIZE);

	if (stack_max_size_ < (char *)top - &dummy)
	{
		delete[] CorStack;
		stack_max_size_ = (char *)top - &dummy;
		CorStack = new char[stack_max_size_];
	}
	stack_cur_size_ = (char *)top - &dummy;
	memcpy(CorStack, &dummy, stack_cur_size_);
}
} // namespace Tattoo

執行截圖:

在這裡插入圖片描述

因為我的目標是用匯編去實現,所以也就不花時間去優化程式碼了(比如裡面還存在原始指標,new,delete ,記憶體洩露等).