Skip to content

Commit

Permalink
Merge pull request #96 from idealvin/dev
Browse files Browse the repository at this point in the history
merge from dev
  • Loading branch information
idealvin authored Jul 19, 2020
2 parents 1608dc8 + 034c070 commit a3a4ca8
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 44 deletions.
2 changes: 1 addition & 1 deletion include/co/so/tcp.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Client {
// MUST be called in the thread where it is connected.
void disconnect() {
if (this->connected()) {
assert(_sched_id == -1 || _sched_id == co::sched_id());
assert(_sched_id == co::sched_id());
co::close(_fd);
_fd = (sock_t)-1;
_sched_id = -1;
Expand Down
25 changes: 14 additions & 11 deletions src/co/impl/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "co/co.h"
#include "co/atomic.h"
#include "co/log.h"
#include <vector>
#include <unordered_map>

#ifndef _WIN32
Expand Down Expand Up @@ -73,27 +74,28 @@ class Epoll {
return false;
}

bool add_event(sock_t fd, int ev) {
int& x = _ev_map[fd];
if (x) return true;
return x = EV_read | EV_write; // 3
}

void del_event(sock_t fd) { _ev_map.erase(fd); }

void del_event(sock_t fd, int ev) {
auto it = _ev_map.find(fd);
if (it != _ev_map.end() && it->second & ev) it->second &= ~ev;
}

void close();
void on_timeout(sock_t fd, PerIoInfo* p) {
CancelIo((HANDLE)fd);
p->co = 0;
_timeout.push_back(p);
}

// for unitest/co
bool assert_ev(sock_t fd, int ev) {
auto it = _ev_map.find(fd);
return ev == (it != _ev_map.end() ? it->second : 0);
void clear_timeout() {
if (!_timeout.empty()) {
for (size_t i = 0; i < _timeout.size(); ++i) delete _timeout[i];
_timeout.clear();
}
}

void close();

int wait(int ms) {
ULONG n = 0;
int r = GetQueuedCompletionStatusEx(_iocp, _ev, 1024, &n, ms, false);
Expand Down Expand Up @@ -130,6 +132,7 @@ class Epoll {
HANDLE _iocp;
epoll_event _ev[1024];
std::unordered_map<sock_t, int> _ev_map;
std::vector<PerIoInfo*> _timeout;
int _signaled;
};

Expand Down
2 changes: 1 addition & 1 deletion src/co/impl/io_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class IoEvent {
gSched->add_event(fd);
}

// We needn't delete the event on windows.
~IoEvent() {
if (_id != null_timer_id) gSched->del_timer(_id);
}
Expand All @@ -24,7 +25,6 @@ class IoEvent {
gSched->yield();
if (!gSched->timeout()) return true;

ELOG_IF(!CancelIo((HANDLE)_fd)) << "cancel io for fd " << _fd << " failed..";
_id = null_timer_id;
WSASetLastError(ETIMEDOUT);
return false;
Expand Down
23 changes: 16 additions & 7 deletions src/co/impl/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const timer_id_t null_timer_id;
__thread Scheduler* gSched = 0;
Scheduler::Scheduler(uint32 id, uint32 stack_size)
: _id(id), _stack_size(stack_size), _stack(0), _running(0),
: _id(id), _stack_size(stack_size), _stack(0), _stack_top(0), _running(0),
_wait_ms(-1), _co_pool(), _stop(false), _timeout(false) {
_main_co = _co_pool.pop();
}
Expand Down Expand Up @@ -46,25 +46,30 @@ static void main_func(tb_context_from_t from) {
* <-------- co->cb->run(): run on _stack
*/
void Scheduler::resume(Coroutine* co) {
SOLOG << ">>> resume co: " << co->id;
tb_context_from_t from;
_running = co;
if (_stack == 0) _stack = (char*) malloc(_stack_size);
if (_stack == 0) {
_stack = (char*) malloc(_stack_size);
_stack_top = _stack + _stack_size;
}
if (co->ctx == 0) {
co->ctx = tb_context_make(_stack, _stack_size, main_func);
SOLOG << "resume new co: " << co->id << ", ctx: " << co->ctx;
from = tb_context_jump(co->ctx, _main_co);
} else {
// restore stack for the coroutine
assert((_stack + _stack_size) == (char*)co->ctx + co->stack.size());
memcpy(co->ctx, co->stack.data(), co->stack.size());
SOLOG << "resume co: " << co->id << ", ctx: " << co->ctx << ", sd: " << co->stack.size();
CHECK(_stack_top == (char*)co->ctx + co->stack.size());
memcpy(co->ctx, co->stack.data(), co->stack.size()); // restore stack data
from = tb_context_jump(co->ctx, _main_co);
}
if (from.priv) {
assert(_running == from.priv);
_running->ctx = from.ctx; // update context for the coroutine
this->save_stack(_running); // save stack of the coroutine
SOLOG << "yield co: " << _running->id << ", ctx: " << from.ctx
<< ", sd: " << (size_t)(_stack_top - (char*)from.ctx);
this->save_stack(_running); // save stack data for the coroutine
}
}
Expand Down Expand Up @@ -103,6 +108,10 @@ void Scheduler::loop() {
#endif
}
#ifdef _WIN32
_epoll.clear_timeout();
#endif
SOLOG << "> check tasks ready to resume..";
do {
_task_mgr.get_all_tasks(new_tasks, ready_tasks, ready_timer_tasks);
Expand Down
12 changes: 6 additions & 6 deletions src/co/impl/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class Copool {

Coroutine* pop() {
if (!_ids.empty()) {
// We must reset the Coroutine here.
Coroutine* co = _pool[_ids.back()];
assert(co->state == S_init);
co->stack.clear();
Expand Down Expand Up @@ -275,10 +274,11 @@ class Scheduler {
_timer_mgr.del_timer(id);
}

// =========================================================================
// add or delete io event
// =========================================================================
#if defined(_WIN32)
void on_timeout(sock_t fd, PerIoInfo* p) {
_epoll.on_timeout(fd, p);
}

bool add_event(sock_t fd) {
return _epoll.add_event(fd);
}
Expand All @@ -303,7 +303,7 @@ class Scheduler {
private:
void save_stack(Coroutine* co) {
co->stack.clear();
co->stack.append(co->ctx, _stack + _stack_size - (char*)co->ctx);
co->stack.append(co->ctx, _stack_top - (char*)co->ctx);
}

Coroutine* new_coroutine(Closure* cb) {
Expand All @@ -321,6 +321,7 @@ class Scheduler {
uint32 _id; // scheduler id
uint32 _stack_size; // size of stack
char* _stack; // stack shared by coroutines in this scheduler
char* _stack_top; // stack top, equal to _stack + _stack_size
Coroutine* _main_co; // save the main context
Coroutine* _running; // the current running coroutine
Epoll _epoll;
Expand All @@ -341,7 +342,6 @@ class SchedManager {
SchedManager();
~SchedManager();

// return next scheduler
Scheduler* next() {
if (_s != (uint32)-1) return _scheds[atomic_inc(&_n) & _s];
uint32 n = atomic_inc(&_n);
Expand Down
20 changes: 7 additions & 13 deletions src/co/impl/sock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ int connect(sock_t fd, const void* addr, int addrlen, int ms) {

if (r == FALSE) {
if (co::error() != ERROR_IO_PENDING) return -1;
if (!ev.wait(ms)) { info->co = 0; return -1; } // timeout
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; } // timeout
}

r = setsockopt(fd, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, 0, 0);
Expand Down Expand Up @@ -212,13 +212,12 @@ int recv(sock_t fd, void* buf, int n, int ms) {
if (r == 0) {
if (!can_skip_iocp_on_success) ev.wait();
} else if (co::error() == WSA_IO_PENDING) {
if (!ev.wait(ms)) { info->co = 0; return -1; }
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; }
} else {
return -1;
}

if (info->s && info->n > 0) memcpy(buf, info->s, info->n);
if (info->n == 0) info->co = 0;
return (int) info->n;
}

Expand All @@ -234,7 +233,7 @@ int _Recvn(sock_t fd, void* buf, int n, int ms) {
if (r == 0) {
if (!can_skip_iocp_on_success) ev.wait();
} else if (co::error() == WSA_IO_PENDING) {
if (!ev.wait(ms)) { info->co = 0; return -1; }
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; }
} else {
return -1;
}
Expand All @@ -245,7 +244,7 @@ int _Recvn(sock_t fd, void* buf, int n, int ms) {
return n;
}

if (info->n == 0) { info->co = 0; return 0; }
if (info->n == 0) return 0;

info->move(info->n);
info->resetol();
Expand Down Expand Up @@ -296,7 +295,7 @@ int recvfrom(sock_t fd, void* buf, int n, void* addr, int* addrlen, int ms) {
if (r == 0) {
if (!can_skip_iocp_on_success) ev.wait();
} else if (co::error() == WSA_IO_PENDING) {
if (!ev.wait(ms)) { info->co = 0; return -1; }
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; }
} else {
return -1;
}
Expand All @@ -308,7 +307,6 @@ int recvfrom(sock_t fd, void* buf, int n, void* addr, int* addrlen, int ms) {
}

if (info->s && info->n > 0) memcpy(buf, info->s, info->n);
if (info->n == 0) info->co = 0;
return (int) info->n;
}

Expand All @@ -329,7 +327,7 @@ int _Send(sock_t fd, const void* buf, int n, int ms) {
if (r == 0) {
if (!can_skip_iocp_on_success) ev.wait();
} else if (co::error() == WSA_IO_PENDING) {
if (!ev.wait(ms)) { info->co = 0; return -1; }
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; }
} else {
return -1;
}
Expand Down Expand Up @@ -380,11 +378,7 @@ int sendto(sock_t fd, const void* buf, int n, const void* addr, int addrlen, int
if (r == 0) {
if (!can_skip_iocp_on_success) ev.wait();
} else if (co::error() == WSA_IO_PENDING) {
if (ms >= 0) {
if (!ev.wait(ms)) { info->co = 0; return -1; }
} else {
ev.wait();
}
if (!ev.wait(ms)) { gSched->on_timeout(fd, info.release()); return -1; }
} else {
return -1;
}
Expand Down
10 changes: 5 additions & 5 deletions src/so/tcp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ static void on_new_connection(void* p) {
void Server::loop() {
sock_t fd, connfd;

// use union here to support both ipv4 and ipv6
union {
struct sockaddr_in v4;
struct sockaddr_in6 v6;
} addr;
} addr; // use union here to support both ipv4 and ipv6

int addrlen = sizeof(addr);

Expand Down Expand Up @@ -60,14 +59,14 @@ void Server::loop() {

Connection* conn = new Connection;
conn->fd = connfd;
conn->p = this;
if (addrlen == sizeof(sockaddr_in)) {
conn->ip = co::ip_str(&addr.v4);
conn->port = ntoh16(addr.v4.sin_port);
} else {
conn->ip = co::ip_str(&addr.v6);
conn->port = ntoh16(addr.v6.sin6_port);
}
conn->p = this;

go(on_new_connection, conn);
}
Expand All @@ -91,14 +90,15 @@ bool Client::connect(int ms) {

r = co::connect(_fd, info->ai_addr, (int) info->ai_addrlen, ms);
if (r == -1) {
this->disconnect();
co::close(_fd);
_fd = (sock_t)-1;
ELOG << "connect to " << _ip << ':' << _port << " failed: " << co::strerror();
freeaddrinfo(info);
return false;
}

_sched_id = co::sched_id();
co::set_tcp_nodelay(_fd);

freeaddrinfo(info);
return true;
}
Expand Down

0 comments on commit a3a4ca8

Please sign in to comment.