From 0c76cdb98ffa9a43f169e24b59a3df95f18803f3 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Fri, 8 Nov 2013 05:10:44 +0100 Subject: [PATCH 1/3] linux: handle EPOLLHUP without EPOLLIN/EPOLLOUT Work around an epoll quirk where it sometimes reports just the EPOLLERR or EPOLLHUP event. In order to force the event loop to move forward, we merge in the read/write events that the watcher is interested in; uv__read() and uv__write() will then deal with the error or hangup in the usual fashion. Fixes #982. This is a back-port of commit 24bfef2 from the master branch. --- src/unix/linux-core.c | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/unix/linux-core.c b/src/unix/linux-core.c index cb6df789..6659dc4d 100644 --- a/src/unix/linux-core.c +++ b/src/unix/linux-core.c @@ -105,7 +105,6 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { uv__io_t* w; uint64_t base; uint64_t diff; - unsigned int masked_events; int nevents; int count; int nfds; @@ -209,16 +208,35 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { continue; } - /* - * Give users only events they're interested in. Prevents spurious + /* Give users only events they're interested in. Prevents spurious * callbacks when previous callback invocation in this loop has stopped * the current watcher. Also, filters out events that users has not * requested us to watch. */ - masked_events = pe->events & w->pevents; - if (masked_events != 0) - w->cb(loop, w, masked_events); - nevents++; + pe->events &= w->pevents | UV__POLLERR | UV__POLLHUP; + + /* Work around an epoll quirk where it sometimes reports just the + * EPOLLERR or EPOLLHUP event. In order to force the event loop to + * move forward, we merge in the read/write events that the watcher + * is interested in; uv__read() and uv__write() will then deal with + * the error or hangup in the usual fashion. + * + * Note to self: happens when epoll reports EPOLLIN|EPOLLHUP, the user + * reads the available data, calls uv_read_stop(), then sometime later + * calls uv_read_start() again. By then, libuv has forgotten about the + * hangup and the kernel won't report EPOLLIN again because there's + * nothing left to read. If anything, libuv is to blame here. The + * current hack is just a quick bandaid; to properly fix it, libuv + * needs to remember the error/hangup event. We should get that for + * free when we switch over to edge-triggered I/O. + */ + if (pe->events == UV__EPOLLERR || pe->events == UV__EPOLLHUP) + pe->events |= w->pevents & (UV__EPOLLIN | UV__EPOLLOUT); + + if (pe->events != 0) { + w->cb(loop, w, pe->events); + nevents++; + } } if (nevents != 0) { From bbccafbe704090e294481d95f73862bc83f33026 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Fri, 25 Oct 2013 12:56:37 +0400 Subject: [PATCH 2/3] unix: fix reopened fd bug When fd is closed and new one (with the same number) is opened inside kqueue/epoll/port loop's callback - stale events might invoke callbacks on wrong watchers. Check if watcher was changed after invocation and invalidate all events with the same fd. fix #826 --- build.mk | 1 + checksparse.sh | 1 + src/unix/core.c | 20 +++- src/unix/darwin.c | 19 ++++ src/unix/internal.h | 1 + src/unix/kqueue.c | 9 ++ src/unix/linux-core.c | 28 ++++++ src/unix/sunos.c | 28 ++++++ test/test-list.h | 2 + test/test-tcp-close-accept.c | 177 +++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 11 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 test/test-tcp-close-accept.c diff --git a/build.mk b/build.mk index 2b86ba0e..2f2dabe8 100644 --- a/build.mk +++ b/build.mk @@ -114,6 +114,7 @@ TESTS= \ test/test-tcp-bind6-error.o \ test/test-tcp-bind-error.o \ test/test-tcp-close.o \ + test/test-tcp-close-accept.o \ test/test-tcp-close-while-connecting.o \ test/test-tcp-connect6-error.o \ test/test-tcp-connect-error-after-write.o \ diff --git a/checksparse.sh b/checksparse.sh index 4a508e26..43c9441e 100755 --- a/checksparse.sh +++ b/checksparse.sh @@ -133,6 +133,7 @@ test/test-stdio-over-pipes.c test/test-tcp-bind-error.c test/test-tcp-bind6-error.c test/test-tcp-close-while-connecting.c +test/test-tcp-close-accept.c test/test-tcp-close.c test/test-tcp-connect-error-after-write.c test/test-tcp-connect-error.c diff --git a/src/unix/core.c b/src/unix/core.c index e8e5f9e6..991b18fa 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -595,20 +595,33 @@ static unsigned int next_power_of_two(unsigned int val) { static void maybe_resize(uv_loop_t* loop, unsigned int len) { uv__io_t** watchers; + void* fake_watcher_list; + void* fake_watcher_count; unsigned int nwatchers; unsigned int i; if (len <= loop->nwatchers) return; - nwatchers = next_power_of_two(len); - watchers = realloc(loop->watchers, nwatchers * sizeof(loop->watchers[0])); + nwatchers = next_power_of_two(len + 2) - 2; + watchers = realloc(loop->watchers, + (nwatchers + 2) * sizeof(loop->watchers[0])); if (watchers == NULL) abort(); + /* Copy watchers, preserving fake one in the end */ + if (loop->watchers == NULL) { + fake_watcher_list = watchers[loop->nwatchers]; + fake_watcher_count = watchers[loop->nwatchers + 1]; + } else { + fake_watcher_list = NULL; + fake_watcher_count = NULL; + } for (i = loop->nwatchers; i < nwatchers; i++) watchers[i] = NULL; + watchers[nwatchers] = fake_watcher_list; + watchers[nwatchers + 1] = fake_watcher_count; loop->watchers = watchers; loop->nwatchers = nwatchers; @@ -700,6 +713,9 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { void uv__io_close(uv_loop_t* loop, uv__io_t* w) { uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT); ngx_queue_remove(&w->pending_queue); + + /* Remove stale events for this file descriptor */ + uv__platform_invalidate_fd(loop, w->fd); } diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 77e662f4..03102543 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -102,6 +102,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct kevent* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct kevent*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].ident == fd) + events[i].ident = -1; +} + + static void uv__cf_loop_runner(void* arg) { uv_loop_t* loop; diff --git a/src/unix/internal.h b/src/unix/internal.h index 61cb1ec1..35aa894a 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -183,6 +183,7 @@ uint64_t uv__hrtime(void); int uv__kqueue_init(uv_loop_t* loop); int uv__platform_loop_init(uv_loop_t* loop, int default_loop); void uv__platform_loop_delete(uv_loop_t* loop); +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd); /* various */ void uv__async_close(uv_async_t* handle); diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index d99d6e1b..02564598 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -162,11 +162,18 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { ev = events + i; fd = ev->ident; w = loop->watchers[fd]; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + if (w == NULL) { /* File descriptor that we've stopped watching, disarm it. */ /* TODO batch up */ @@ -227,6 +234,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { w->cb(loop, w, revents); nevents++; } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/src/unix/linux-core.c b/src/unix/linux-core.c index 6659dc4d..7c34ea16 100644 --- a/src/unix/linux-core.c +++ b/src/unix/linux-core.c @@ -97,6 +97,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct uv__epoll_event* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct uv__epoll_event*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].data == fd) + events[i].data = -1; +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { struct uv__epoll_event events[1024]; struct uv__epoll_event* pe; @@ -189,10 +208,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { pe = events + i; fd = pe->data; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + assert(fd >= 0); assert((unsigned) fd < loop->nwatchers); @@ -238,6 +264,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents++; } } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/src/unix/sunos.c b/src/unix/sunos.c index 3fbb50c9..e2a72dbc 100644 --- a/src/unix/sunos.c +++ b/src/unix/sunos.c @@ -87,6 +87,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct port_event* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct port_event*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].portev_object == fd) + events[i].portev_object = -1; +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { struct port_event events[1024]; struct port_event* pe; @@ -173,10 +192,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { pe = events + i; fd = pe->portev_object; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + assert(fd >= 0); assert((unsigned) fd < loop->nwatchers); @@ -193,6 +219,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue)) ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/test/test-list.h b/test/test-list.h index d883496f..f1e950e5 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -64,6 +64,7 @@ TEST_DECLARE (tcp_connect_error_fault) TEST_DECLARE (tcp_connect_timeout) TEST_DECLARE (tcp_close_while_connecting) TEST_DECLARE (tcp_close) +TEST_DECLARE (tcp_close_accept) TEST_DECLARE (tcp_flags) TEST_DECLARE (tcp_write_to_half_open_connection) TEST_DECLARE (tcp_unexpected_read) @@ -298,6 +299,7 @@ TASK_LIST_START TEST_ENTRY (tcp_connect_timeout) TEST_ENTRY (tcp_close_while_connecting) TEST_ENTRY (tcp_close) + TEST_ENTRY (tcp_close_accept) TEST_ENTRY (tcp_flags) TEST_ENTRY (tcp_write_to_half_open_connection) TEST_ENTRY (tcp_unexpected_read) diff --git a/test/test-tcp-close-accept.c b/test/test-tcp-close-accept.c new file mode 100644 index 00000000..f5336a58 --- /dev/null +++ b/test/test-tcp-close-accept.c @@ -0,0 +1,177 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +static struct sockaddr_in addr; +static uv_tcp_t tcp_server; +static uv_tcp_t tcp_outgoing[2]; +static uv_tcp_t tcp_incoming[ARRAY_SIZE(tcp_outgoing)]; +static uv_connect_t connect_reqs[ARRAY_SIZE(tcp_outgoing)]; +static uv_tcp_t tcp_check; +static uv_connect_t tcp_check_req; +static uv_write_t write_reqs[ARRAY_SIZE(tcp_outgoing)]; +static unsigned int got_connections; +static unsigned int close_cb_called; +static unsigned int write_cb_called; +static unsigned int read_cb_called; + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void write_cb(uv_write_t* req, int status) { + ASSERT(status == 0); + write_cb_called++; +} + +static void connect_cb(uv_connect_t* req, int status) { + unsigned int i; + uv_buf_t buf; + uv_stream_t* outgoing; + + if (req == &tcp_check_req) { + ASSERT(status != 0); + + /* Close check and incoming[0], time to finish test */ + uv_close((uv_handle_t*) &tcp_incoming[0], close_cb); + uv_close((uv_handle_t*) &tcp_check, close_cb); + return; + } + + ASSERT(status == 0); + ASSERT(connect_reqs <= req); + ASSERT(req <= connect_reqs + ARRAY_SIZE(connect_reqs)); + i = req - connect_reqs; + + buf = uv_buf_init("x", 1); + outgoing = (uv_stream_t*) &tcp_outgoing[i]; + ASSERT(0 == uv_write(&write_reqs[i], outgoing, &buf, 1, write_cb)); +} + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char buf[1]; + + return uv_buf_init(buf, sizeof(buf)); +} + +static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t b) { + uv_loop_t* loop; + unsigned int i; + + /* Only first stream should receive read events */ + ASSERT(stream == (uv_stream_t*) &tcp_incoming[0]); + ASSERT(0 == uv_read_stop(stream)); + ASSERT(1 == nread); + + loop = stream->loop; + read_cb_called++; + + /* Close all active incomings, except current one */ + for (i = 1; i < got_connections; i++) + uv_close((uv_handle_t*) &tcp_incoming[i], close_cb); + + /* Create new fd that should be one of the closed incomings */ + ASSERT(0 == uv_tcp_init(loop, &tcp_check)); + ASSERT(0 == uv_tcp_connect(&tcp_check_req, &tcp_check, addr, connect_cb)); + ASSERT(0 == uv_read_start((uv_stream_t*) &tcp_check, alloc_cb, read_cb)); + + /* Close server, so no one will connect to it */ + uv_close((uv_handle_t*) &tcp_server, close_cb); +} + +static void connection_cb(uv_stream_t* server, int status) { + unsigned int i; + uv_tcp_t* incoming; + + ASSERT(server == (uv_stream_t*) &tcp_server); + + /* Ignore tcp_check connection */ + if (got_connections == ARRAY_SIZE(tcp_incoming)) + return; + + /* Accept everyone */ + incoming = &tcp_incoming[got_connections++]; + ASSERT(0 == uv_tcp_init(server->loop, incoming)); + ASSERT(0 == uv_accept(server, (uv_stream_t*) incoming)); + + if (got_connections != ARRAY_SIZE(tcp_incoming)) + return; + + /* Once all clients are accepted - start reading */ + for (i = 0; i < ARRAY_SIZE(tcp_incoming); i++) { + incoming = &tcp_incoming[i]; + ASSERT(0 == uv_read_start((uv_stream_t*) incoming, alloc_cb, read_cb)); + } +} + +TEST_IMPL(tcp_close_accept) { + unsigned int i; + uv_loop_t* loop; + uv_tcp_t* client; + + /* + * A little explanation of what goes on below: + * + * We'll create server and connect to it using two clients, each writing one + * byte once connected. + * + * When all clients will be accepted by server - we'll start reading from them + * and, on first client's first byte, will close second client and server. + * After that, we'll immediately initiate new connection to server using + * tcp_check handle (thus, reusing fd from second client). + * + * In this situation uv__io_poll()'s event list should still contain read + * event for second client, and, if not cleaned up properly, `tcp_check` will + * receive stale event of second incoming and invoke `connect_cb` with zero + * status. + */ + + loop = uv_default_loop(); + addr = uv_ip4_addr("0.0.0.0", TEST_PORT); + + ASSERT(0 == uv_tcp_init(loop, &tcp_server)); + ASSERT(0 == uv_tcp_bind(&tcp_server, addr)); + ASSERT(0 == uv_listen((uv_stream_t*) &tcp_server, + ARRAY_SIZE(tcp_outgoing), + connection_cb)); + + for (i = 0; i < ARRAY_SIZE(tcp_outgoing); i++) { + client = tcp_outgoing + i; + + ASSERT(0 == uv_tcp_init(loop, client)); + ASSERT(0 == uv_tcp_connect(&connect_reqs[i], client, addr, connect_cb)); + } + + uv_run(loop, UV_RUN_DEFAULT); + + ASSERT(ARRAY_SIZE(tcp_outgoing) == got_connections); + ASSERT((ARRAY_SIZE(tcp_outgoing) + 2) == close_cb_called); + ASSERT(ARRAY_SIZE(tcp_outgoing) == write_cb_called); + ASSERT(1 == read_cb_called); + + MAKE_VALGRIND_HAPPY(); + return 0; +} diff --git a/uv.gyp b/uv.gyp index 7d43d87e..96a57609 100644 --- a/uv.gyp +++ b/uv.gyp @@ -335,6 +335,7 @@ 'test/test-tcp-bind-error.c', 'test/test-tcp-bind6-error.c', 'test/test-tcp-close.c', + 'test/test-tcp-close-accept.c', 'test/test-tcp-close-while-connecting.c', 'test/test-tcp-connect-error-after-write.c', 'test/test-tcp-shutdown-after-write.c', From f50ccd52388ffbcbbf0cd21ef7d6562300ef7ebb Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Tue, 12 Nov 2013 15:24:33 +0400 Subject: [PATCH 3/3] core: fix fake watcher list and count preservation Fake watcher list and count should be preserved only if `loop->watchers` was already allocated. --- src/unix/core.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/unix/core.c b/src/unix/core.c index 991b18fa..22c48dd7 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -603,21 +603,21 @@ static void maybe_resize(uv_loop_t* loop, unsigned int len) { if (len <= loop->nwatchers) return; + /* Preserve fake watcher list and count at the end of the watchers */ + if (loop->watchers != NULL) { + fake_watcher_list = loop->watchers[loop->nwatchers]; + fake_watcher_count = loop->watchers[loop->nwatchers + 1]; + } else { + fake_watcher_list = NULL; + fake_watcher_count = NULL; + } + nwatchers = next_power_of_two(len + 2) - 2; watchers = realloc(loop->watchers, (nwatchers + 2) * sizeof(loop->watchers[0])); if (watchers == NULL) abort(); - - /* Copy watchers, preserving fake one in the end */ - if (loop->watchers == NULL) { - fake_watcher_list = watchers[loop->nwatchers]; - fake_watcher_count = watchers[loop->nwatchers + 1]; - } else { - fake_watcher_list = NULL; - fake_watcher_count = NULL; - } for (i = loop->nwatchers; i < nwatchers; i++) watchers[i] = NULL; watchers[nwatchers] = fake_watcher_list;