Skip to content

Commit

Permalink
refactor coroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
idealvin committed Jul 1, 2020
1 parent 545fe89 commit 39ccf57
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 75 deletions.
28 changes: 14 additions & 14 deletions include/co/co.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,25 +155,21 @@ class Pool {
Pool();
~Pool();

// @ccb: a create callback []() { return (void*) new T; }
// when pop from an empty pool, this callback is used to create an element
//
// @dcb: a destroy callback [](void* p) { delete (T*)p; }
// this callback is used to destroy an element when needed
//
// @cap: max capacity of the pool for each thread
// this argument is ignored if the destory callback is not set
Pool(std::function<void*()>&& ccb, std::function<void(void*)>&& dcb, size_t cap=(size_t)-1);

Pool(Pool&& p) : _p(p._p) { p._p = 0; }

Pool(const Pool&) = delete;
void operator=(const Pool&) = delete;

// set a create callback
// when pop from an empty pool, this callback is used to create an element
// set_create_cb([]() { return (void*) new T; });
void set_create_cb(std::function<void*()>&& cb);

// set a destroy callback
// this callback is used to destroy an element when needed
// set_destroy_cb([](void* p) { delete (T*)p; });
void set_destroy_cb(std::function<void(void*)>&& cb);

// set max capacity of the pool for each thread
// this argument is ignored if the destory callback is not set
void set_max_capacity(size_t cap);

// pop an element from the pool
// return NULL if the pool is empty and the create callback is not set
// MUST be called in coroutine
Expand Down Expand Up @@ -343,26 +339,30 @@ inline void set_cloexec(sock_t fd) {
}
#endif

// fill in ipv4 addr with ip & port
inline bool init_ip_addr(struct sockaddr_in* addr, const char* ip, int port) {
memset(addr, 0, sizeof(*addr));
addr->sin_family = AF_INET;
addr->sin_port = hton16((uint16) port);
return inet_pton(AF_INET, ip, &addr->sin_addr) == 1;
}

// fill in ipv6 addr with ip & port
inline bool init_ip_addr(struct sockaddr_in6* addr, const char* ip, int port) {
memset(addr, 0, sizeof(*addr));
addr->sin6_family = AF_INET6;
addr->sin6_port = hton16((uint16) port);
return inet_pton(AF_INET6, ip, &addr->sin6_addr) == 1;
}

// get ip string from ipv4 addr
inline fastring ip_str(struct sockaddr_in* addr) {
char s[INET_ADDRSTRLEN] = { 0 };
inet_ntop(AF_INET, &addr->sin_addr, s, sizeof(s));
return fastring(s);
}

// get ip string from ipv6 addr
inline fastring ip_str(struct sockaddr_in6* addr) {
char s[INET6_ADDRSTRLEN] = { 0 };
inet_ntop(AF_INET6, &addr->sin6_addr, s, sizeof(s));
Expand Down
88 changes: 44 additions & 44 deletions src/co/impl/co.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@
namespace co {

void go(Closure* cb) {
sched_mgr().next()->add_task(cb);
sched_mgr()->next()->add_task(cb);
}

void sleep(uint32 ms) {
gSched ? gSched->sleep(ms) : sleep::ms(ms);
}

void stop() {
sched_mgr().stop();
sched_mgr()->stop();
}

int max_sched_num() {
Expand Down Expand Up @@ -89,6 +89,7 @@ void EventImpl::signal() {
if (!_co_wait.empty()) _co_wait.swap(co_wait);
}

// using atomic operation here, as the timeout-checker may also modify the state
for (auto it = co_wait.begin(); it != co_wait.end(); ++it) {
Coroutine* co = it->first;
if (atomic_compare_swap(&co->state, S_wait, S_ready) == S_wait) {
Expand Down Expand Up @@ -193,31 +194,31 @@ bool Mutex::try_lock() {

class PoolImpl {
public:
PoolImpl()
: _pool(co::max_sched_num()), _maxcap((size_t)-1) {
}

~PoolImpl();
typedef std::vector<void*> T;

void set_create_cb(std::function<void*()>&& cb) {
_create_cb = std::move(cb);
PoolImpl()
: _pools(co::max_sched_num()), _maxcap((size_t)-1) {
}

void set_destroy_cb(std::function<void(void*)>&& cb) {
_destroy_cb = std::move(cb);
// @ccb: a create callback []() { return (void*) new T; }
// @dcb: a destroy callback [](void* p) { delete (T*)p; }
// @cap: max capacity for each pool
PoolImpl(std::function<void*()>&& ccb, std::function<void(void*)>&& dcb, size_t cap)
: _pools(co::max_sched_num()), _maxcap(cap) {
_create_cb = std::move(ccb);
_destroy_cb = std::move(dcb);
}

void set_max_capacity(size_t cap) {
_maxcap = cap;
}
~PoolImpl() = default;

void* pop() {
CHECK(gSched) << "must be called in coroutine..";
auto& v = _pool[gSched->id()];
auto& v = _pools[gSched->id()];
if (v == NULL) v = this->create_pool();

if (!v.empty()) {
void* p = v.back();
v.pop_back();
if (!v->empty()) {
void* p = v->back();
v->pop_back();
return p;
} else {
return _create_cb ? _create_cb() : 0;
Expand All @@ -228,34 +229,41 @@ class PoolImpl {
if (!p) return; // ignore null pointer

CHECK(gSched) << "must be called in coroutine..";
auto& v = _pool[gSched->id()];
auto& v = _pools[gSched->id()];
if (v == NULL) v = this->create_pool();

if (!_destroy_cb || v.size() < _maxcap) {
v.push_back(p);
if (!_destroy_cb || v->size() < _maxcap) {
v->push_back(p);
} else {
_destroy_cb(p);
}
}

private:
std::vector<std::vector<void*>> _pool;
// It is not safe to cleanup the pool from outside the Scheduler.
// So we add a cleanup callback to the Scheduler. It will be called
// at the end of Scheduler::loop().
T* create_pool() {
T* v = new T();
v->reserve(1024);
gSched->add_cleanup_cb(std::bind(&PoolImpl::cleanup, v, _destroy_cb));
return v;
}

static void cleanup(T* p, const std::function<void(void*)>& dcb) {
if (dcb) {
for (size_t i = 0; i < p->size(); ++i) dcb((*p)[i]);
}
delete p;
}

private:
std::vector<T*> _pools;
std::function<void*()> _create_cb;
std::function<void(void*)> _destroy_cb;
size_t _maxcap;
};

PoolImpl::~PoolImpl() {
if (!_destroy_cb) return;

for (size_t i = 0; i < _pool.size(); ++i) {
auto& v = _pool[i];
for (size_t k = 0; k < v.size(); ++k) {
_destroy_cb(v[k]);
}
v.clear();
}
}

Pool::Pool() {
_p = new PoolImpl;
}
Expand All @@ -264,16 +272,8 @@ Pool::~Pool() {
delete (PoolImpl*) _p;
}

void Pool::set_create_cb(std::function<void*()>&& cb) {
((PoolImpl*)_p)->set_create_cb(std::move(cb));
}

void Pool::set_destroy_cb(std::function<void(void*)>&& cb) {
((PoolImpl*)_p)->set_destroy_cb(std::move(cb));
}

void Pool::set_max_capacity(size_t cap) {
((PoolImpl*)_p)->set_max_capacity(cap);
Pool::Pool(std::function<void*()>&& ccb, std::function<void(void*)>&& dcb, size_t cap) {
_p = new PoolImpl(std::move(ccb), std::move(dcb), cap);
}

void* Pool::pop() {
Expand Down
1 change: 1 addition & 0 deletions src/co/impl/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ void Scheduler::loop() {
} while (0);
}
this->cleanup();
_ev.signal();
}
Expand Down
21 changes: 17 additions & 4 deletions src/co/impl/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <assert.h>
#include <string.h>
#include <memory>
#include <functional>
#include <vector>
#include <map>
#include <unordered_map>
Expand Down Expand Up @@ -52,7 +53,7 @@ struct Coroutine {
int id; // coroutine id
int state; // coroutine state
tb_context_t ctx; // context, a pointer points to the stack bottom
fastream stack; // save stack data for this coroutine
fastream stack; // save stack data for this coroutine
union {
Closure* cb; // coroutine function
Scheduler* s; // scheduler this coroutines runs in
Expand Down Expand Up @@ -161,7 +162,7 @@ class TimerManager {

private:
std::multimap<int64, Coroutine*> _timer; // timed-wait tasks: <time_ms, co>
std::multimap<int64, Coroutine*>::iterator _it; // add_timer() may be faster with it
std::multimap<int64, Coroutine*>::iterator _it; // make insert faster with this hint

::Mutex _mtx;
std::unordered_map<Coroutine*, timer_id_t> _timer_tasks; // timer tasks ready to resume
Expand Down Expand Up @@ -263,6 +264,11 @@ class Scheduler {
return (_stack <= (char*)p) && ((char*)p < _stack + _stack_size);
}

// the cleanup callbacks are called at the end of loop()
void add_cleanup_cb(std::function<void()>&& cb) {
_cbs.push_back(std::move(cb));
}

private:
void save_stack(Coroutine* co) {
co->stack.clear();
Expand All @@ -275,6 +281,11 @@ class Scheduler {
return co;
}

void cleanup() {
for (size_t i = 0; i < _cbs.size(); ++i) _cbs[i]();
_cbs.clear();
}

private:
uint32 _id; // scheduler id
uint32 _stack_size; // size of stack
Expand All @@ -287,6 +298,7 @@ class Scheduler {
Copool _co_pool;
TaskManager _task_mgr;
TimerManager _timer_mgr;
std::vector<std::function<void()>> _cbs;

SyncEvent _ev;
bool _stop;
Expand All @@ -306,6 +318,7 @@ class SchedManager {
return _scheds[now::us() % _scheds.size()];
}

// stop all schedulers
void stop();

private:
Expand All @@ -315,9 +328,9 @@ class SchedManager {
uint32 _s; // _r = 0, _s = sched_num-1; _r != 0, _s = -1;
};

inline SchedManager& sched_mgr() {
inline SchedManager* sched_mgr() {
static SchedManager kSchedMgr;
return kSchedMgr;
return &kSchedMgr;
}

} // co
1 change: 1 addition & 0 deletions src/co/impl/sock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ int sendto(sock_t fd, const void* buf, int n, const void* addr, int addrlen, int
} while (true);
}

// a thread-safe wrapper for strerror()
const char* strerror(int err) {
static __thread std::unordered_map<int, const char*>* kErrStr = 0;
if (!kErrStr) kErrStr = new std::unordered_map<int, const char*>();
Expand Down
13 changes: 0 additions & 13 deletions test/xx_test.cc
Original file line number Diff line number Diff line change
@@ -1,19 +1,6 @@
#include "test.h"
#include "co/all.h"

fastring fa(int v) {
return str::from(v);
}

fastring fb(const fastring& s) {
if (!s.empty() && s.front() != 's') {
fastring x(s);
x.front() = 's';
return x;
}
return s;
}

int main(int argc, char** argv) {
flag::init(argc, argv);
log::init();
Expand Down

0 comments on commit 39ccf57

Please sign in to comment.