Skip to content

Commit

Permalink
Adopt source code for the new API
Browse files Browse the repository at this point in the history
  • Loading branch information
ymmt2005 committed Aug 13, 2024
1 parent ca7b97f commit bedf652
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 143 deletions.
106 changes: 55 additions & 51 deletions src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,68 +24,72 @@ counter_socket::counter_socket(int fd,
g_stats.total_connections.fetch_add(1);

m_recvjob = [this](cybozu::dynbuf& buf) {
// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
m_pending.reset();
}
with_fd([this, &buf](int fd) {
// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
m_pending.reset();
}

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
if( errno == EINTR )
continue;
if( errno == ECONNRESET ) {
while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
if( errno == EINTR )
continue;
if( errno == ECONNRESET ) {
buf.reset();
release_all();
invalidate_and_close();
break;
}
cybozu::throw_unix_error(errno, "recv");
}
if( n == 0 ) {
buf.reset();
release_all();
invalidate_and_close();
break;
}
cybozu::throw_unix_error(errno, "recv");
}
if( n == 0 ) {
buf.reset();
release_all();
invalidate_and_close();
break;
}
// if (n != -1) && (n != 0)
buf.consume(n);
// if (n != -1) && (n != 0)
buf.consume(n);

const char* head = buf.data();
std::size_t len = buf.size();
while( len > 0 ) {
counter::request parser(head, len);
std::size_t c = parser.length();
if( c == 0 ) break;
head += c;
len -= c;
execute(parser);
}
if( len > MAX_REQUEST_LENGTH ) {
cybozu::logger::warning() << "denied too large request of "
<< len << " bytes.";
buf.reset();
release_all();
invalidate_and_close();
break;
const char* head = buf.data();
std::size_t len = buf.size();
while( len > 0 ) {
counter::request parser(head, len);
std::size_t c = parser.length();
if( c == 0 ) break;
head += c;
len -= c;
execute(parser);
}
if( len > MAX_REQUEST_LENGTH ) {
cybozu::logger::warning() << "denied too large request of "
<< len << " bytes.";
buf.reset();
release_all();
invalidate_and_close();
break;
}
buf.erase(head - buf.data());
}
buf.erase(head - buf.data());
}

// recv returns EAGAIN, or some error happens.
if( buf.size() > 0 )
m_pending.append(buf.data(), buf.size());
// recv returns EAGAIN, or some error happens.
if( buf.size() > 0 )
m_pending.append(buf.data(), buf.size());

m_busy.store(false, std::memory_order_release);
m_busy.store(false, std::memory_order_release);
});
};

m_sendjob = [this](cybozu::dynbuf& buf) {
if( ! write_pending_data() )
with_fd([this, &buf](int fd) {
if( ! write_pending_data(fd) )
invalidate_and_close();
});
};
}

Expand All @@ -94,7 +98,7 @@ counter_socket::~counter_socket() {
release_all();
}

bool counter_socket::on_readable() {
bool counter_socket::on_readable(int fd) {
if( m_busy.load(std::memory_order_acquire) ) {
m_reactor->add_readable(*this);
return true;
Expand All @@ -112,11 +116,11 @@ bool counter_socket::on_readable() {
return true;
}

bool counter_socket::on_writable() {
bool counter_socket::on_writable(int fd) {
cybozu::worker* w = m_finder();
if( w == nullptr ) {
// if there is no idle worker, fallback to the default.
return cybozu::tcp_socket::on_writable();
return cybozu::tcp_socket::on_writable(fd);
}

w->post_job(m_sendjob);
Expand Down
8 changes: 4 additions & 4 deletions src/counter/sockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ class counter_socket: public cybozu::tcp_socket {
std::unordered_map<const cybozu::hash_key*, std::uint32_t>
m_acquired_resources;

virtual void on_invalidate() override final {
virtual void on_invalidate(int fd) override final {
g_stats.curr_connections.fetch_sub(1);
cybozu::tcp_socket::on_invalidate();
cybozu::tcp_socket::on_invalidate(fd);
}

bool on_readable() override;
bool on_writable() override;
bool on_readable(int fd) override final;
bool on_writable(int fd) override final;

void cmd_get(const counter::request& cmd, counter::response& r);
void cmd_acquire(const counter::request& cmd, counter::response& r);
Expand Down
12 changes: 4 additions & 8 deletions src/memcache/handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,10 @@ void handler::on_master_interval() {
continue;
}
if( slave->timed_out() ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(slave->fileno()).str();
} catch (...) {
// ignore errors
}
cybozu::logger::info() << "No heartbeats from a slave (" << addr
<< "). Close the replication socket.";
cybozu::logger::info()
<< "No heartbeats from a slave ("
<< slave->peer_ip()
<< "). Close the replication socket.";
// close the socket and release resources
if( ! slave->invalidate() )
m_reactor.remove_resource(*slave);
Expand Down
Loading

0 comments on commit bedf652

Please sign in to comment.