unix: add relaxed accept() setting

Mitigates unfair scheduling in multi-process setups that share a single listen
socket across multiple processes.
This commit is contained in:
Ben Noordhuis 2012-07-29 03:07:39 +02:00
parent cfb06db5e5
commit 9f7cdb20aa
5 changed files with 87 additions and 7 deletions

View File

@ -184,8 +184,9 @@ struct uv__io_s {
int fd; \
/* UV_TCP */
#define UV_TCP_PRIVATE_FIELDS
/* UV_TCP, idle_handle is for UV_TCP_SINGLE_ACCEPT handles */
#define UV_TCP_PRIVATE_FIELDS \
uv_idle_t* idle_handle; \
/* UV_UDP */

View File

@ -69,10 +69,13 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
break;
case UV_TTY:
case UV_TCP:
uv__stream_close((uv_stream_t*)handle);
break;
case UV_TCP:
uv__tcp_close((uv_tcp_t*)handle);
break;
case UV_UDP:
uv__udp_close((uv_udp_t*)handle);
break;

View File

@ -93,7 +93,8 @@ enum {
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */
UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */
UV_TCP_SINGLE_ACCEPT = 0x400 /* Only accept() when idle. */
};
inline static void uv__req_init(uv_loop_t* loop,
@ -162,6 +163,7 @@ void uv__poll_close(uv_poll_t* handle);
void uv__prepare_close(uv_prepare_t* handle);
void uv__process_close(uv_process_t* handle);
void uv__stream_close(uv_stream_t* handle);
void uv__tcp_close(uv_tcp_t* handle);
void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);

View File

@ -163,6 +163,16 @@ void uv__stream_destroy(uv_stream_t* stream) {
}
static void uv__next_accept(uv_idle_t* idle, int status) {
uv_stream_t* stream = idle->data;
uv_idle_stop(idle);
if (stream->accepted_fd == -1)
uv__io_start(stream->loop, &stream->read_watcher);
}
void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
int fd;
uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher);
@ -198,14 +208,44 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
}
} else {
stream->accepted_fd = fd;
stream->connection_cb((uv_stream_t*)stream, 0);
if (stream->accepted_fd >= 0) {
stream->connection_cb(stream, 0);
if (stream->accepted_fd != -1 ||
(stream->type == UV_TCP && stream->flags == UV_TCP_SINGLE_ACCEPT)) {
/* The user hasn't yet accepted called uv_accept() */
uv__io_stop(stream->loop, &stream->read_watcher);
return;
break;
}
}
}
if (stream->fd != -1 &&
stream->accepted_fd == -1 &&
(stream->type == UV_TCP && stream->flags == UV_TCP_SINGLE_ACCEPT))
{
/* Defer the next accept() syscall to the next event loop tick.
* This lets us guarantee fair load balancing in in multi-process setups.
* The problem is as follows:
*
* 1. Multiple processes listen on the same socket.
* 2. The OS scheduler commonly gives preference to one process to
* avoid task switches.
* 3. That process therefore accepts most of the new connections,
* leading to a (sometimes very) unevenly distributed load.
*
* Here is how we mitigate this issue:
*
* 1. Accept a connection.
* 2. Start an idle watcher.
* 3. Don't accept new connections until the idle callback fires.
*
* This works because the callback only fires when there have been
* no recent events, i.e. none of the watched file descriptors have
* recently been readable or writable.
*/
uv_tcp_t* tcp = (uv_tcp_t*) stream;
uv_idle_start(tcp->idle_handle, uv__next_accept);
}
}

View File

@ -22,6 +22,7 @@
#include "uv.h"
#include "internal.h"
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
@ -30,6 +31,7 @@
int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
uv__stream_init(loop, (uv_stream_t*)tcp, UV_TCP);
loop->counters.tcp_init++;
tcp->idle_handle = NULL;
return 0;
}
@ -227,9 +229,29 @@ out:
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
static int single_accept = -1;
if (tcp->delayed_error)
return uv__set_sys_error(tcp->loop, tcp->delayed_error);
if (single_accept == -1) {
const char* val = getenv("UV_TCP_SINGLE_ACCEPT");
single_accept = (val == NULL) || (atoi(val) != 0); /* on by default */
}
if (!single_accept)
goto no_single_accept;
tcp->idle_handle = malloc(sizeof(*tcp->idle_handle));
if (tcp->idle_handle == NULL)
return uv__set_sys_error(tcp->loop, ENOMEM);
if (uv_idle_init(tcp->loop, tcp->idle_handle))
abort();
tcp->flags |= UV_TCP_SINGLE_ACCEPT;
no_single_accept:
if (maybe_new_socket(tcp, AF_INET, UV_STREAM_READABLE))
return -1;
@ -356,5 +378,17 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
if (enable)
handle->flags |= UV_TCP_SINGLE_ACCEPT;
else
handle->flags &= ~UV_TCP_SINGLE_ACCEPT;
return 0;
}
void uv__tcp_close(uv_tcp_t* handle) {
if (handle->idle_handle)
uv_close((uv_handle_t*)handle->idle_handle, (uv_close_cb)free);
uv__stream_close((uv_stream_t*)handle);
}