diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index b330830b..a8dda72a 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -184,8 +184,9 @@ struct uv__io_s { int fd; \ -/* UV_TCP */ -#define UV_TCP_PRIVATE_FIELDS +/* UV_TCP, idle_handle is for UV_TCP_SINGLE_ACCEPT handles */ +#define UV_TCP_PRIVATE_FIELDS \ + uv_idle_t* idle_handle; \ /* UV_UDP */ diff --git a/src/unix/core.c b/src/unix/core.c index 0e4919c6..4bc6b77d 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -69,10 +69,13 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { break; case UV_TTY: - case UV_TCP: uv__stream_close((uv_stream_t*)handle); break; + case UV_TCP: + uv__tcp_close((uv_tcp_t*)handle); + break; + case UV_UDP: uv__udp_close((uv_udp_t*)handle); break; diff --git a/src/unix/internal.h b/src/unix/internal.h index dd46c95e..233f7428 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -93,7 +93,8 @@ enum { UV_STREAM_WRITABLE = 0x40, /* The stream is writable */ UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */ UV_TCP_NODELAY = 0x100, /* Disable Nagle. */ - UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */ + UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */ + UV_TCP_SINGLE_ACCEPT = 0x400 /* Only accept() when idle. */ }; inline static void uv__req_init(uv_loop_t* loop, @@ -162,6 +163,7 @@ void uv__poll_close(uv_poll_t* handle); void uv__prepare_close(uv_prepare_t* handle); void uv__process_close(uv_process_t* handle); void uv__stream_close(uv_stream_t* handle); +void uv__tcp_close(uv_tcp_t* handle); void uv__timer_close(uv_timer_t* handle); void uv__udp_close(uv_udp_t* handle); void uv__udp_finish_close(uv_udp_t* handle); diff --git a/src/unix/stream.c b/src/unix/stream.c index 47e32b24..26875a7c 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -163,6 +163,16 @@ void uv__stream_destroy(uv_stream_t* stream) { } +static void uv__next_accept(uv_idle_t* idle, int status) { + uv_stream_t* stream = idle->data; + + uv_idle_stop(idle); + + if (stream->accepted_fd == -1) + uv__io_start(stream->loop, &stream->read_watcher); +} + + void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { int fd; uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher); @@ -198,14 +208,44 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) { } } else { stream->accepted_fd = fd; - stream->connection_cb((uv_stream_t*)stream, 0); - if (stream->accepted_fd >= 0) { + stream->connection_cb(stream, 0); + + if (stream->accepted_fd != -1 || + (stream->type == UV_TCP && stream->flags == UV_TCP_SINGLE_ACCEPT)) { /* The user hasn't yet accepted called uv_accept() */ uv__io_stop(stream->loop, &stream->read_watcher); - return; + break; } } } + + if (stream->fd != -1 && + stream->accepted_fd == -1 && + (stream->type == UV_TCP && stream->flags == UV_TCP_SINGLE_ACCEPT)) + { + /* Defer the next accept() syscall to the next event loop tick. + * This lets us guarantee fair load balancing in in multi-process setups. + * The problem is as follows: + * + * 1. Multiple processes listen on the same socket. + * 2. The OS scheduler commonly gives preference to one process to + * avoid task switches. + * 3. That process therefore accepts most of the new connections, + * leading to a (sometimes very) unevenly distributed load. + * + * Here is how we mitigate this issue: + * + * 1. Accept a connection. + * 2. Start an idle watcher. + * 3. Don't accept new connections until the idle callback fires. + * + * This works because the callback only fires when there have been + * no recent events, i.e. none of the watched file descriptors have + * recently been readable or writable. + */ + uv_tcp_t* tcp = (uv_tcp_t*) stream; + uv_idle_start(tcp->idle_handle, uv__next_accept); + } } diff --git a/src/unix/tcp.c b/src/unix/tcp.c index 233be825..d9cbd0b4 100644 --- a/src/unix/tcp.c +++ b/src/unix/tcp.c @@ -22,6 +22,7 @@ #include "uv.h" #include "internal.h" +#include #include #include #include @@ -30,6 +31,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP); loop->counters.tcp_init++; + tcp->idle_handle = NULL; return 0; } @@ -227,9 +229,29 @@ out: int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { + static int single_accept = -1; + if (tcp->delayed_error) return uv__set_sys_error(tcp->loop, tcp->delayed_error); + if (single_accept == -1) { + const char* val = getenv("UV_TCP_SINGLE_ACCEPT"); + single_accept = (val == NULL) || (atoi(val) != 0); /* on by default */ + } + + if (!single_accept) + goto no_single_accept; + + tcp->idle_handle = malloc(sizeof(*tcp->idle_handle)); + if (tcp->idle_handle == NULL) + return uv__set_sys_error(tcp->loop, ENOMEM); + + if (uv_idle_init(tcp->loop, tcp->idle_handle)) + abort(); + + tcp->flags |= UV_TCP_SINGLE_ACCEPT; + +no_single_accept: if (maybe_new_socket(tcp, AF_INET, UV_STREAM_READABLE)) return -1; @@ -356,5 +378,17 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) { int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) { + if (enable) + handle->flags |= UV_TCP_SINGLE_ACCEPT; + else + handle->flags &= ~UV_TCP_SINGLE_ACCEPT; return 0; } + + +void uv__tcp_close(uv_tcp_t* handle) { + if (handle->idle_handle) + uv_close((uv_handle_t*)handle->idle_handle, (uv_close_cb)free); + + uv__stream_close((uv_stream_t*)handle); +}