Skip to content

Commit

Permalink
Early close file descriptors
Browse files Browse the repository at this point in the history
Resolves #93.

`cybozu::resource` now keeps the file descriptor private and adds
a new private method `close()` to close the file descriptor from
the friend class `cybozu::reactor`. Subclasses can get the file
descriptor via `int fileno()` method that returns -1 after closed.

`cybozu::reactor` calls `cybozu::resource::close` when it removes
the resource from the active set of resources and puts it to the
pending destruction list. By this, the file descriptor of the
closed resource is closed earlier.

Other classes are updated to reference `fileno()`.
  • Loading branch information
ymmt2005 committed Aug 5, 2024
1 parent 11b9bbd commit 26adb7a
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 20 deletions.
2 changes: 2 additions & 0 deletions cybozu/reactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ void reactor::remove_resource(int fd) {
throw_unix_error(errno, "epoll_ctl(EPOLL_CTL_DEL)");
m_readables.erase(std::remove(m_readables.begin(), m_readables.end(), fd),
m_readables.end());

it->second->close();
}

void reactor::poll() {
Expand Down
19 changes: 16 additions & 3 deletions cybozu/reactor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ class resource {

// Close the file descriptor.
virtual ~resource() {
::close(m_fd);
close();
}

// Return the UNIX file descriptor for this resource.
int fileno() const { return m_fd; }
// This returns -1 after the resource is invalidated.
int fileno() const {
lock_guard g(m_lock);
if( m_closed ) return -1;
return m_fd;
}

// `true` if this resource is still valid.
bool valid() const {
Expand Down Expand Up @@ -118,13 +123,14 @@ class resource {
}

protected:
const int m_fd;
reactor* m_reactor = nullptr;

friend class reactor;

private:
const int m_fd;
bool m_valid = true;
bool m_closed = false;
mutable spinlock m_lock;
typedef std::unique_lock<spinlock> lock_guard;

Expand All @@ -133,6 +139,13 @@ class resource {
g.unlock();
on_invalidate();
}

void close() {
lock_guard g(m_lock);
if( m_closed ) return;
::close(m_fd);
m_closed = true;
}
};


Expand Down
7 changes: 5 additions & 2 deletions cybozu/signal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class signal_reader: public resource {
signal_reader(const sigset_t *mask, callback_t callback):
resource( signalfd(-1, mask, SFD_NONBLOCK|SFD_CLOEXEC) ),
m_callback(callback) {
if( m_fd == -1 )
if( fileno() == -1 )
throw_unix_error(errno, "signalfd");
}
// Constructor.
Expand All @@ -54,8 +54,11 @@ class signal_reader: public resource {

virtual bool on_readable() override final {
while( true ) {
int fd = fileno();
if( fd == -1 ) return true;

struct signalfd_siginfo si;
ssize_t n = read(m_fd, &si, sizeof(si));
ssize_t n = read(fd, &si, sizeof(si));
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno ==EWOULDBLOCK ) return true;
Expand Down
24 changes: 18 additions & 6 deletions cybozu/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ void tcp_socket::free_buffers() {
}

bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {
int fd = fileno();
if( fd == -1 ) return false;

while( ! can_send(len) ) {
on_buffer_full();
m_cond_write.wait(g);
Expand All @@ -168,7 +171,7 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {

if( m_pending.empty() ) {
while( len > 0 ) {
ssize_t n = ::send(m_fd, p, len, 0);
ssize_t n = ::send(fd, p, len, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
if( errno == EINTR ) continue;
Expand Down Expand Up @@ -226,6 +229,9 @@ bool tcp_socket::_send(const char* p, std::size_t len, lock_guard& g) {
}

bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {
int fd = fileno();
if( fd == -1 ) return false;

std::size_t total = 0;
for( int i = 0; i < iovcnt; ++i ) {
total += iov[i].len;
Expand All @@ -250,7 +256,7 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {

if( m_pending.empty() ) {
while( ind < v_size ) {
ssize_t n = ::writev(m_fd, &(v[ind]), v_size - ind);
ssize_t n = ::writev(fd, &(v[ind]), v_size - ind);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
if( errno == EINTR ) continue;
Expand Down Expand Up @@ -329,8 +335,11 @@ bool tcp_socket::_sendv(const iovec* iov, const int iovcnt, lock_guard& g) {
bool tcp_socket::write_pending_data() {
lock_guard g(m_lock);

int fd = fileno();
if( fd == -1 ) return true;

while( ! m_tmpbuf.empty() ) {
ssize_t n = ::send(m_fd, m_tmpbuf.data(), m_tmpbuf.size(), 0);
ssize_t n = ::send(fd, m_tmpbuf.data(), m_tmpbuf.size(), 0);
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno == EWOULDBLOCK ) return true;
Expand All @@ -353,7 +362,7 @@ bool tcp_socket::write_pending_data() {
std::tie(p, len, sent) = t;

while( len != sent ) {
ssize_t n = ::send(m_fd, p+sent, len-sent, 0);
ssize_t n = ::send(fd, p+sent, len-sent, 0);
if( n == -1 ) {
if( errno == EINTR ) continue;
if( errno == EAGAIN || errno == EWOULDBLOCK ) break;
Expand Down Expand Up @@ -453,17 +462,20 @@ setup_server_socket(const char* bind_addr, std::uint16_t port, bool freebind) {
}

bool tcp_server_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

while( true ) {
union {
struct sockaddr sa;
struct sockaddr_storage ss;
} addr;
socklen_t addrlen = sizeof(addr);
#ifdef _GNU_SOURCE
int s = ::accept4(m_fd, &(addr.sa), &addrlen,
int s = ::accept4(fd, &(addr.sa), &addrlen,
SOCK_NONBLOCK|SOCK_CLOEXEC);
#else
int s = ::accept(m_fd, &(addr.sa), &addrlen);
int s = ::accept(fd, &(addr.sa), &addrlen);
if( s != -1 ) {
int fl = fcntl(s, F_GETFL, 0);
if( fl == -1 ) fl = 0;
Expand Down
7 changes: 5 additions & 2 deletions cybozu/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class tcp_socket: public resource {
}

virtual void on_invalidate() override {
::shutdown(m_fd, SHUT_RDWR);
::shutdown(fileno(), SHUT_RDWR);
free_buffers();
m_cond_write.notify_all();
}
Expand Down Expand Up @@ -223,10 +223,13 @@ class tcp_socket: public resource {
return m_pending.empty() && m_tmpbuf.empty();
}
void _flush() {
int fd = fileno();
if( fd == -1 ) return;

// with TCP_CORK, setting TCP_NODELAY effectively flushes
// the kernel send buffer.
int v = 1;
if( setsockopt(m_fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 )
if( setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v)) == -1 )
throw_unix_error(errno, "setsockopt(TCP_NODELAY)");
}
void free_buffers();
Expand Down
5 changes: 4 additions & 1 deletion src/counter/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ counter_socket::counter_socket(int fd,
g_stats.total_connections.fetch_add(1);

m_recvjob = [this](cybozu::dynbuf& buf) {
int fd = fileno();
if( fd == -1 ) return;

// load pending data
if( ! m_pending.empty() ) {
buf.append(m_pending.data(), m_pending.size());
Expand All @@ -32,7 +35,7 @@ counter_socket::counter_socket(int fd,

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down
21 changes: 15 additions & 6 deletions src/memcache/sockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,11 @@ memcache_socket::memcache_socket(int fd,
g_stats.total_connections.fetch_add(1, relaxed);

m_recvjob = [this](cybozu::dynbuf& buf) {
int fd = fileno();
if( fd == -1 ) return;

// set lock context for objects.
g_context = m_fd;
g_context = fd;

// load pending data
if( ! m_pending.empty() ) {
Expand All @@ -54,7 +57,7 @@ memcache_socket::memcache_socket(int fd,

while( true ) {
char* p = buf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down Expand Up @@ -956,9 +959,12 @@ void memcache_socket::cmd_text(const memcache::text_request& cmd) {
}

bool repl_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

// recv and drop.
while( true ) {
ssize_t n = ::recv(m_fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, &m_recvbuf[0], MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand All @@ -967,7 +973,7 @@ bool repl_socket::on_readable() {
if( errno == ECONNRESET ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -979,7 +985,7 @@ bool repl_socket::on_readable() {
if( n == 0 ) {
std::string addr = "unknown address";
try {
addr = cybozu::get_peer_ip_address(m_fd).str();
addr = cybozu::get_peer_ip_address(fd).str();
} catch (...) {
// ignore errors
}
Expand All @@ -1003,6 +1009,9 @@ bool repl_socket::on_writable() {
}

bool repl_client_socket::on_readable() {
int fd = fileno();
if( fd == -1 ) return true;

// This function is executed in the same thread as the function that sends
// heartbeats. If this function takes a very long time, no heartbeats will
// be sent, and this process will be judged dead by the master. To prevent
Expand All @@ -1012,7 +1021,7 @@ bool repl_client_socket::on_readable() {
size_t n_iter = 0;
while( true ) {
char* p = m_recvbuf.prepare(MAX_RECVSIZE);
ssize_t n = ::recv(m_fd, p, MAX_RECVSIZE, 0);
ssize_t n = ::recv(fd, p, MAX_RECVSIZE, 0);
if( n == -1 ) {
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
Expand Down

0 comments on commit 26adb7a

Please sign in to comment.