diff --git a/build.mk b/build.mk index 2b86ba0e..2f2dabe8 100644 --- a/build.mk +++ b/build.mk @@ -114,6 +114,7 @@ TESTS= \ test/test-tcp-bind6-error.o \ test/test-tcp-bind-error.o \ test/test-tcp-close.o \ + test/test-tcp-close-accept.o \ test/test-tcp-close-while-connecting.o \ test/test-tcp-connect6-error.o \ test/test-tcp-connect-error-after-write.o \ diff --git a/checksparse.sh b/checksparse.sh index 4a508e26..43c9441e 100755 --- a/checksparse.sh +++ b/checksparse.sh @@ -133,6 +133,7 @@ test/test-stdio-over-pipes.c test/test-tcp-bind-error.c test/test-tcp-bind6-error.c test/test-tcp-close-while-connecting.c +test/test-tcp-close-accept.c test/test-tcp-close.c test/test-tcp-connect-error-after-write.c test/test-tcp-connect-error.c diff --git a/src/unix/core.c b/src/unix/core.c index e8e5f9e6..991b18fa 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -595,20 +595,33 @@ static unsigned int next_power_of_two(unsigned int val) { static void maybe_resize(uv_loop_t* loop, unsigned int len) { uv__io_t** watchers; + void* fake_watcher_list; + void* fake_watcher_count; unsigned int nwatchers; unsigned int i; if (len <= loop->nwatchers) return; - nwatchers = next_power_of_two(len); - watchers = realloc(loop->watchers, nwatchers * sizeof(loop->watchers[0])); + nwatchers = next_power_of_two(len + 2) - 2; + watchers = realloc(loop->watchers, + (nwatchers + 2) * sizeof(loop->watchers[0])); if (watchers == NULL) abort(); + /* Copy watchers, preserving fake one in the end */ + if (loop->watchers == NULL) { + fake_watcher_list = watchers[loop->nwatchers]; + fake_watcher_count = watchers[loop->nwatchers + 1]; + } else { + fake_watcher_list = NULL; + fake_watcher_count = NULL; + } for (i = loop->nwatchers; i < nwatchers; i++) watchers[i] = NULL; + watchers[nwatchers] = fake_watcher_list; + watchers[nwatchers + 1] = fake_watcher_count; loop->watchers = watchers; loop->nwatchers = nwatchers; @@ -700,6 +713,9 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { void uv__io_close(uv_loop_t* loop, uv__io_t* w) { uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT); ngx_queue_remove(&w->pending_queue); + + /* Remove stale events for this file descriptor */ + uv__platform_invalidate_fd(loop, w->fd); } diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 77e662f4..03102543 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -102,6 +102,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct kevent* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct kevent*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].ident == fd) + events[i].ident = -1; +} + + static void uv__cf_loop_runner(void* arg) { uv_loop_t* loop; diff --git a/src/unix/internal.h b/src/unix/internal.h index 61cb1ec1..35aa894a 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -183,6 +183,7 @@ uint64_t uv__hrtime(void); int uv__kqueue_init(uv_loop_t* loop); int uv__platform_loop_init(uv_loop_t* loop, int default_loop); void uv__platform_loop_delete(uv_loop_t* loop); +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd); /* various */ void uv__async_close(uv_async_t* handle); diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index d99d6e1b..02564598 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -162,11 +162,18 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { ev = events + i; fd = ev->ident; w = loop->watchers[fd]; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + if (w == NULL) { /* File descriptor that we've stopped watching, disarm it. */ /* TODO batch up */ @@ -227,6 +234,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { w->cb(loop, w, revents); nevents++; } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/src/unix/linux-core.c b/src/unix/linux-core.c index 6659dc4d..7c34ea16 100644 --- a/src/unix/linux-core.c +++ b/src/unix/linux-core.c @@ -97,6 +97,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct uv__epoll_event* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct uv__epoll_event*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].data == fd) + events[i].data = -1; +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { struct uv__epoll_event events[1024]; struct uv__epoll_event* pe; @@ -189,10 +208,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { pe = events + i; fd = pe->data; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + assert(fd >= 0); assert((unsigned) fd < loop->nwatchers); @@ -238,6 +264,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents++; } } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/src/unix/sunos.c b/src/unix/sunos.c index 3fbb50c9..e2a72dbc 100644 --- a/src/unix/sunos.c +++ b/src/unix/sunos.c @@ -87,6 +87,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) { } +void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) { + struct port_event* events; + uintptr_t i; + uintptr_t nfds; + + assert(loop->watchers != NULL); + + events = (struct port_event*) loop->watchers[loop->nwatchers]; + nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1]; + if (events == NULL) + return; + + /* Invalidate events with same file descriptor */ + for (i = 0; i < nfds; i++) + if ((int) events[i].portev_object == fd) + events[i].portev_object = -1; +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { struct port_event events[1024]; struct port_event* pe; @@ -173,10 +192,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { nevents = 0; + assert(loop->watchers != NULL); + loop->watchers[loop->nwatchers] = (void*) events; + loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds; for (i = 0; i < nfds; i++) { pe = events + i; fd = pe->portev_object; + /* Skip invalidated events, see uv__platform_invalidate_fd */ + if (fd == -1) + continue; + assert(fd >= 0); assert((unsigned) fd < loop->nwatchers); @@ -193,6 +219,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue)) ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } + loop->watchers[loop->nwatchers] = NULL; + loop->watchers[loop->nwatchers + 1] = NULL; if (nevents != 0) { if (nfds == ARRAY_SIZE(events) && --count != 0) { diff --git a/test/test-list.h b/test/test-list.h index d883496f..f1e950e5 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -64,6 +64,7 @@ TEST_DECLARE (tcp_connect_error_fault) TEST_DECLARE (tcp_connect_timeout) TEST_DECLARE (tcp_close_while_connecting) TEST_DECLARE (tcp_close) +TEST_DECLARE (tcp_close_accept) TEST_DECLARE (tcp_flags) TEST_DECLARE (tcp_write_to_half_open_connection) TEST_DECLARE (tcp_unexpected_read) @@ -298,6 +299,7 @@ TASK_LIST_START TEST_ENTRY (tcp_connect_timeout) TEST_ENTRY (tcp_close_while_connecting) TEST_ENTRY (tcp_close) + TEST_ENTRY (tcp_close_accept) TEST_ENTRY (tcp_flags) TEST_ENTRY (tcp_write_to_half_open_connection) TEST_ENTRY (tcp_unexpected_read) diff --git a/test/test-tcp-close-accept.c b/test/test-tcp-close-accept.c new file mode 100644 index 00000000..f5336a58 --- /dev/null +++ b/test/test-tcp-close-accept.c @@ -0,0 +1,177 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +static struct sockaddr_in addr; +static uv_tcp_t tcp_server; +static uv_tcp_t tcp_outgoing[2]; +static uv_tcp_t tcp_incoming[ARRAY_SIZE(tcp_outgoing)]; +static uv_connect_t connect_reqs[ARRAY_SIZE(tcp_outgoing)]; +static uv_tcp_t tcp_check; +static uv_connect_t tcp_check_req; +static uv_write_t write_reqs[ARRAY_SIZE(tcp_outgoing)]; +static unsigned int got_connections; +static unsigned int close_cb_called; +static unsigned int write_cb_called; +static unsigned int read_cb_called; + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void write_cb(uv_write_t* req, int status) { + ASSERT(status == 0); + write_cb_called++; +} + +static void connect_cb(uv_connect_t* req, int status) { + unsigned int i; + uv_buf_t buf; + uv_stream_t* outgoing; + + if (req == &tcp_check_req) { + ASSERT(status != 0); + + /* Close check and incoming[0], time to finish test */ + uv_close((uv_handle_t*) &tcp_incoming[0], close_cb); + uv_close((uv_handle_t*) &tcp_check, close_cb); + return; + } + + ASSERT(status == 0); + ASSERT(connect_reqs <= req); + ASSERT(req <= connect_reqs + ARRAY_SIZE(connect_reqs)); + i = req - connect_reqs; + + buf = uv_buf_init("x", 1); + outgoing = (uv_stream_t*) &tcp_outgoing[i]; + ASSERT(0 == uv_write(&write_reqs[i], outgoing, &buf, 1, write_cb)); +} + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char buf[1]; + + return uv_buf_init(buf, sizeof(buf)); +} + +static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t b) { + uv_loop_t* loop; + unsigned int i; + + /* Only first stream should receive read events */ + ASSERT(stream == (uv_stream_t*) &tcp_incoming[0]); + ASSERT(0 == uv_read_stop(stream)); + ASSERT(1 == nread); + + loop = stream->loop; + read_cb_called++; + + /* Close all active incomings, except current one */ + for (i = 1; i < got_connections; i++) + uv_close((uv_handle_t*) &tcp_incoming[i], close_cb); + + /* Create new fd that should be one of the closed incomings */ + ASSERT(0 == uv_tcp_init(loop, &tcp_check)); + ASSERT(0 == uv_tcp_connect(&tcp_check_req, &tcp_check, addr, connect_cb)); + ASSERT(0 == uv_read_start((uv_stream_t*) &tcp_check, alloc_cb, read_cb)); + + /* Close server, so no one will connect to it */ + uv_close((uv_handle_t*) &tcp_server, close_cb); +} + +static void connection_cb(uv_stream_t* server, int status) { + unsigned int i; + uv_tcp_t* incoming; + + ASSERT(server == (uv_stream_t*) &tcp_server); + + /* Ignore tcp_check connection */ + if (got_connections == ARRAY_SIZE(tcp_incoming)) + return; + + /* Accept everyone */ + incoming = &tcp_incoming[got_connections++]; + ASSERT(0 == uv_tcp_init(server->loop, incoming)); + ASSERT(0 == uv_accept(server, (uv_stream_t*) incoming)); + + if (got_connections != ARRAY_SIZE(tcp_incoming)) + return; + + /* Once all clients are accepted - start reading */ + for (i = 0; i < ARRAY_SIZE(tcp_incoming); i++) { + incoming = &tcp_incoming[i]; + ASSERT(0 == uv_read_start((uv_stream_t*) incoming, alloc_cb, read_cb)); + } +} + +TEST_IMPL(tcp_close_accept) { + unsigned int i; + uv_loop_t* loop; + uv_tcp_t* client; + + /* + * A little explanation of what goes on below: + * + * We'll create server and connect to it using two clients, each writing one + * byte once connected. + * + * When all clients will be accepted by server - we'll start reading from them + * and, on first client's first byte, will close second client and server. + * After that, we'll immediately initiate new connection to server using + * tcp_check handle (thus, reusing fd from second client). + * + * In this situation uv__io_poll()'s event list should still contain read + * event for second client, and, if not cleaned up properly, `tcp_check` will + * receive stale event of second incoming and invoke `connect_cb` with zero + * status. + */ + + loop = uv_default_loop(); + addr = uv_ip4_addr("0.0.0.0", TEST_PORT); + + ASSERT(0 == uv_tcp_init(loop, &tcp_server)); + ASSERT(0 == uv_tcp_bind(&tcp_server, addr)); + ASSERT(0 == uv_listen((uv_stream_t*) &tcp_server, + ARRAY_SIZE(tcp_outgoing), + connection_cb)); + + for (i = 0; i < ARRAY_SIZE(tcp_outgoing); i++) { + client = tcp_outgoing + i; + + ASSERT(0 == uv_tcp_init(loop, client)); + ASSERT(0 == uv_tcp_connect(&connect_reqs[i], client, addr, connect_cb)); + } + + uv_run(loop, UV_RUN_DEFAULT); + + ASSERT(ARRAY_SIZE(tcp_outgoing) == got_connections); + ASSERT((ARRAY_SIZE(tcp_outgoing) + 2) == close_cb_called); + ASSERT(ARRAY_SIZE(tcp_outgoing) == write_cb_called); + ASSERT(1 == read_cb_called); + + MAKE_VALGRIND_HAPPY(); + return 0; +} diff --git a/uv.gyp b/uv.gyp index 7d43d87e..96a57609 100644 --- a/uv.gyp +++ b/uv.gyp @@ -335,6 +335,7 @@ 'test/test-tcp-bind-error.c', 'test/test-tcp-bind6-error.c', 'test/test-tcp-close.c', + 'test/test-tcp-close-accept.c', 'test/test-tcp-close-while-connecting.c', 'test/test-tcp-connect-error-after-write.c', 'test/test-tcp-shutdown-after-write.c',