From 738b31eb3aff440ae75ff9f32ba61086a948c3f4 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Tue, 5 Jun 2012 15:09:05 +0200 Subject: [PATCH] unix: fix loop starvation under high network load uv__read() and uv__udp_recvmsg() read incoming data in a loop. If data comes in at high speeds, the kernel receive buffer never drains and said functions never terminate, stalling the event loop indefinitely. Limit the number of consecutive reads to 32 to stop that from happening. The number 32 was chosen at random. Empirically, it seems to maintain a high throughput while still making the event loop move forward at a reasonable pace. --- src/unix/stream.c | 11 +++++++++-- src/unix/udp.c | 8 +++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/src/unix/stream.c b/src/unix/stream.c index f5d3041d..602e9488 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -561,12 +561,19 @@ static void uv__read(uv_stream_t* stream) { struct msghdr msg; struct cmsghdr* cmsg; char cmsg_space[64]; + int count; + + /* Prevent loop starvation when the data comes in as fast as (or faster than) + * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. + */ + count = 32; /* XXX: Maybe instead of having UV_STREAM_READING we just test if * tcp->read_cb is NULL or not? */ - while ((stream->read_cb || stream->read2_cb) && - stream->flags & UV_STREAM_READING) { + while ((stream->read_cb || stream->read2_cb) + && (stream->flags & UV_STREAM_READING) + && (count-- > 0)) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); diff --git a/src/unix/udp.c b/src/unix/udp.c index 523d2823..3c784962 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -196,6 +196,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { ssize_t nread; uv_buf_t buf; int flags; + int count; handle = container_of(w, uv_udp_t, read_watcher); assert(handle->type == UV_UDP); @@ -204,8 +205,12 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { assert(handle->recv_cb != NULL); assert(handle->alloc_cb != NULL); + /* Prevent loop starvation when the data comes in as fast as (or faster than) + * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. + */ + count = 32; + do { - /* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */ buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024); assert(buf.len > 0); assert(buf.base != NULL); @@ -246,6 +251,7 @@ static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) { } /* recv_cb callback may decide to pause or close the handle */ while (nread != -1 + && count-- > 0 && handle->fd != -1 && handle->recv_cb != NULL); }