unix: fix reopened fd bug
When fd is closed and new one (with the same number) is opened inside kqueue/epoll/port loop's callback - stale events might invoke callbacks on wrong watchers. Check if watcher was changed after invocation and invalidate all events with the same fd. fix #826
This commit is contained in:
parent
0c76cdb98f
commit
bbccafbe70
1
build.mk
1
build.mk
@ -114,6 +114,7 @@ TESTS= \
|
||||
test/test-tcp-bind6-error.o \
|
||||
test/test-tcp-bind-error.o \
|
||||
test/test-tcp-close.o \
|
||||
test/test-tcp-close-accept.o \
|
||||
test/test-tcp-close-while-connecting.o \
|
||||
test/test-tcp-connect6-error.o \
|
||||
test/test-tcp-connect-error-after-write.o \
|
||||
|
||||
@ -133,6 +133,7 @@ test/test-stdio-over-pipes.c
|
||||
test/test-tcp-bind-error.c
|
||||
test/test-tcp-bind6-error.c
|
||||
test/test-tcp-close-while-connecting.c
|
||||
test/test-tcp-close-accept.c
|
||||
test/test-tcp-close.c
|
||||
test/test-tcp-connect-error-after-write.c
|
||||
test/test-tcp-connect-error.c
|
||||
|
||||
@ -595,20 +595,33 @@ static unsigned int next_power_of_two(unsigned int val) {
|
||||
|
||||
static void maybe_resize(uv_loop_t* loop, unsigned int len) {
|
||||
uv__io_t** watchers;
|
||||
void* fake_watcher_list;
|
||||
void* fake_watcher_count;
|
||||
unsigned int nwatchers;
|
||||
unsigned int i;
|
||||
|
||||
if (len <= loop->nwatchers)
|
||||
return;
|
||||
|
||||
nwatchers = next_power_of_two(len);
|
||||
watchers = realloc(loop->watchers, nwatchers * sizeof(loop->watchers[0]));
|
||||
nwatchers = next_power_of_two(len + 2) - 2;
|
||||
watchers = realloc(loop->watchers,
|
||||
(nwatchers + 2) * sizeof(loop->watchers[0]));
|
||||
|
||||
if (watchers == NULL)
|
||||
abort();
|
||||
|
||||
/* Copy watchers, preserving fake one in the end */
|
||||
if (loop->watchers == NULL) {
|
||||
fake_watcher_list = watchers[loop->nwatchers];
|
||||
fake_watcher_count = watchers[loop->nwatchers + 1];
|
||||
} else {
|
||||
fake_watcher_list = NULL;
|
||||
fake_watcher_count = NULL;
|
||||
}
|
||||
for (i = loop->nwatchers; i < nwatchers; i++)
|
||||
watchers[i] = NULL;
|
||||
watchers[nwatchers] = fake_watcher_list;
|
||||
watchers[nwatchers + 1] = fake_watcher_count;
|
||||
|
||||
loop->watchers = watchers;
|
||||
loop->nwatchers = nwatchers;
|
||||
@ -700,6 +713,9 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
||||
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
|
||||
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT);
|
||||
ngx_queue_remove(&w->pending_queue);
|
||||
|
||||
/* Remove stale events for this file descriptor */
|
||||
uv__platform_invalidate_fd(loop, w->fd);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -102,6 +102,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
|
||||
struct kevent* events;
|
||||
uintptr_t i;
|
||||
uintptr_t nfds;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
|
||||
events = (struct kevent*) loop->watchers[loop->nwatchers];
|
||||
nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
|
||||
if (events == NULL)
|
||||
return;
|
||||
|
||||
/* Invalidate events with same file descriptor */
|
||||
for (i = 0; i < nfds; i++)
|
||||
if ((int) events[i].ident == fd)
|
||||
events[i].ident = -1;
|
||||
}
|
||||
|
||||
|
||||
static void uv__cf_loop_runner(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
|
||||
|
||||
@ -183,6 +183,7 @@ uint64_t uv__hrtime(void);
|
||||
int uv__kqueue_init(uv_loop_t* loop);
|
||||
int uv__platform_loop_init(uv_loop_t* loop, int default_loop);
|
||||
void uv__platform_loop_delete(uv_loop_t* loop);
|
||||
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd);
|
||||
|
||||
/* various */
|
||||
void uv__async_close(uv_async_t* handle);
|
||||
|
||||
@ -162,11 +162,18 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
|
||||
nevents = 0;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
loop->watchers[loop->nwatchers] = (void*) events;
|
||||
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
|
||||
for (i = 0; i < nfds; i++) {
|
||||
ev = events + i;
|
||||
fd = ev->ident;
|
||||
w = loop->watchers[fd];
|
||||
|
||||
/* Skip invalidated events, see uv__platform_invalidate_fd */
|
||||
if (fd == -1)
|
||||
continue;
|
||||
|
||||
if (w == NULL) {
|
||||
/* File descriptor that we've stopped watching, disarm it. */
|
||||
/* TODO batch up */
|
||||
@ -227,6 +234,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
w->cb(loop, w, revents);
|
||||
nevents++;
|
||||
}
|
||||
loop->watchers[loop->nwatchers] = NULL;
|
||||
loop->watchers[loop->nwatchers + 1] = NULL;
|
||||
|
||||
if (nevents != 0) {
|
||||
if (nfds == ARRAY_SIZE(events) && --count != 0) {
|
||||
|
||||
@ -97,6 +97,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
|
||||
struct uv__epoll_event* events;
|
||||
uintptr_t i;
|
||||
uintptr_t nfds;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
|
||||
events = (struct uv__epoll_event*) loop->watchers[loop->nwatchers];
|
||||
nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
|
||||
if (events == NULL)
|
||||
return;
|
||||
|
||||
/* Invalidate events with same file descriptor */
|
||||
for (i = 0; i < nfds; i++)
|
||||
if ((int) events[i].data == fd)
|
||||
events[i].data = -1;
|
||||
}
|
||||
|
||||
|
||||
void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct uv__epoll_event events[1024];
|
||||
struct uv__epoll_event* pe;
|
||||
@ -189,10 +208,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
|
||||
nevents = 0;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
loop->watchers[loop->nwatchers] = (void*) events;
|
||||
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
|
||||
for (i = 0; i < nfds; i++) {
|
||||
pe = events + i;
|
||||
fd = pe->data;
|
||||
|
||||
/* Skip invalidated events, see uv__platform_invalidate_fd */
|
||||
if (fd == -1)
|
||||
continue;
|
||||
|
||||
assert(fd >= 0);
|
||||
assert((unsigned) fd < loop->nwatchers);
|
||||
|
||||
@ -238,6 +264,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
nevents++;
|
||||
}
|
||||
}
|
||||
loop->watchers[loop->nwatchers] = NULL;
|
||||
loop->watchers[loop->nwatchers + 1] = NULL;
|
||||
|
||||
if (nevents != 0) {
|
||||
if (nfds == ARRAY_SIZE(events) && --count != 0) {
|
||||
|
||||
@ -87,6 +87,25 @@ void uv__platform_loop_delete(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
void uv__platform_invalidate_fd(uv_loop_t* loop, int fd) {
|
||||
struct port_event* events;
|
||||
uintptr_t i;
|
||||
uintptr_t nfds;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
|
||||
events = (struct port_event*) loop->watchers[loop->nwatchers];
|
||||
nfds = (uintptr_t) loop->watchers[loop->nwatchers + 1];
|
||||
if (events == NULL)
|
||||
return;
|
||||
|
||||
/* Invalidate events with same file descriptor */
|
||||
for (i = 0; i < nfds; i++)
|
||||
if ((int) events[i].portev_object == fd)
|
||||
events[i].portev_object = -1;
|
||||
}
|
||||
|
||||
|
||||
void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
struct port_event events[1024];
|
||||
struct port_event* pe;
|
||||
@ -173,10 +192,17 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
|
||||
nevents = 0;
|
||||
|
||||
assert(loop->watchers != NULL);
|
||||
loop->watchers[loop->nwatchers] = (void*) events;
|
||||
loop->watchers[loop->nwatchers + 1] = (void*) (uintptr_t) nfds;
|
||||
for (i = 0; i < nfds; i++) {
|
||||
pe = events + i;
|
||||
fd = pe->portev_object;
|
||||
|
||||
/* Skip invalidated events, see uv__platform_invalidate_fd */
|
||||
if (fd == -1)
|
||||
continue;
|
||||
|
||||
assert(fd >= 0);
|
||||
assert((unsigned) fd < loop->nwatchers);
|
||||
|
||||
@ -193,6 +219,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
|
||||
if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue))
|
||||
ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
|
||||
}
|
||||
loop->watchers[loop->nwatchers] = NULL;
|
||||
loop->watchers[loop->nwatchers + 1] = NULL;
|
||||
|
||||
if (nevents != 0) {
|
||||
if (nfds == ARRAY_SIZE(events) && --count != 0) {
|
||||
|
||||
@ -64,6 +64,7 @@ TEST_DECLARE (tcp_connect_error_fault)
|
||||
TEST_DECLARE (tcp_connect_timeout)
|
||||
TEST_DECLARE (tcp_close_while_connecting)
|
||||
TEST_DECLARE (tcp_close)
|
||||
TEST_DECLARE (tcp_close_accept)
|
||||
TEST_DECLARE (tcp_flags)
|
||||
TEST_DECLARE (tcp_write_to_half_open_connection)
|
||||
TEST_DECLARE (tcp_unexpected_read)
|
||||
@ -298,6 +299,7 @@ TASK_LIST_START
|
||||
TEST_ENTRY (tcp_connect_timeout)
|
||||
TEST_ENTRY (tcp_close_while_connecting)
|
||||
TEST_ENTRY (tcp_close)
|
||||
TEST_ENTRY (tcp_close_accept)
|
||||
TEST_ENTRY (tcp_flags)
|
||||
TEST_ENTRY (tcp_write_to_half_open_connection)
|
||||
TEST_ENTRY (tcp_unexpected_read)
|
||||
|
||||
177
test/test-tcp-close-accept.c
Normal file
177
test/test-tcp-close-accept.c
Normal file
@ -0,0 +1,177 @@
|
||||
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "uv.h"
|
||||
#include "task.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
static struct sockaddr_in addr;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_tcp_t tcp_outgoing[2];
|
||||
static uv_tcp_t tcp_incoming[ARRAY_SIZE(tcp_outgoing)];
|
||||
static uv_connect_t connect_reqs[ARRAY_SIZE(tcp_outgoing)];
|
||||
static uv_tcp_t tcp_check;
|
||||
static uv_connect_t tcp_check_req;
|
||||
static uv_write_t write_reqs[ARRAY_SIZE(tcp_outgoing)];
|
||||
static unsigned int got_connections;
|
||||
static unsigned int close_cb_called;
|
||||
static unsigned int write_cb_called;
|
||||
static unsigned int read_cb_called;
|
||||
|
||||
static void close_cb(uv_handle_t* handle) {
|
||||
close_cb_called++;
|
||||
}
|
||||
|
||||
static void write_cb(uv_write_t* req, int status) {
|
||||
ASSERT(status == 0);
|
||||
write_cb_called++;
|
||||
}
|
||||
|
||||
static void connect_cb(uv_connect_t* req, int status) {
|
||||
unsigned int i;
|
||||
uv_buf_t buf;
|
||||
uv_stream_t* outgoing;
|
||||
|
||||
if (req == &tcp_check_req) {
|
||||
ASSERT(status != 0);
|
||||
|
||||
/* Close check and incoming[0], time to finish test */
|
||||
uv_close((uv_handle_t*) &tcp_incoming[0], close_cb);
|
||||
uv_close((uv_handle_t*) &tcp_check, close_cb);
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(status == 0);
|
||||
ASSERT(connect_reqs <= req);
|
||||
ASSERT(req <= connect_reqs + ARRAY_SIZE(connect_reqs));
|
||||
i = req - connect_reqs;
|
||||
|
||||
buf = uv_buf_init("x", 1);
|
||||
outgoing = (uv_stream_t*) &tcp_outgoing[i];
|
||||
ASSERT(0 == uv_write(&write_reqs[i], outgoing, &buf, 1, write_cb));
|
||||
}
|
||||
|
||||
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) {
|
||||
static char buf[1];
|
||||
|
||||
return uv_buf_init(buf, sizeof(buf));
|
||||
}
|
||||
|
||||
static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t b) {
|
||||
uv_loop_t* loop;
|
||||
unsigned int i;
|
||||
|
||||
/* Only first stream should receive read events */
|
||||
ASSERT(stream == (uv_stream_t*) &tcp_incoming[0]);
|
||||
ASSERT(0 == uv_read_stop(stream));
|
||||
ASSERT(1 == nread);
|
||||
|
||||
loop = stream->loop;
|
||||
read_cb_called++;
|
||||
|
||||
/* Close all active incomings, except current one */
|
||||
for (i = 1; i < got_connections; i++)
|
||||
uv_close((uv_handle_t*) &tcp_incoming[i], close_cb);
|
||||
|
||||
/* Create new fd that should be one of the closed incomings */
|
||||
ASSERT(0 == uv_tcp_init(loop, &tcp_check));
|
||||
ASSERT(0 == uv_tcp_connect(&tcp_check_req, &tcp_check, addr, connect_cb));
|
||||
ASSERT(0 == uv_read_start((uv_stream_t*) &tcp_check, alloc_cb, read_cb));
|
||||
|
||||
/* Close server, so no one will connect to it */
|
||||
uv_close((uv_handle_t*) &tcp_server, close_cb);
|
||||
}
|
||||
|
||||
static void connection_cb(uv_stream_t* server, int status) {
|
||||
unsigned int i;
|
||||
uv_tcp_t* incoming;
|
||||
|
||||
ASSERT(server == (uv_stream_t*) &tcp_server);
|
||||
|
||||
/* Ignore tcp_check connection */
|
||||
if (got_connections == ARRAY_SIZE(tcp_incoming))
|
||||
return;
|
||||
|
||||
/* Accept everyone */
|
||||
incoming = &tcp_incoming[got_connections++];
|
||||
ASSERT(0 == uv_tcp_init(server->loop, incoming));
|
||||
ASSERT(0 == uv_accept(server, (uv_stream_t*) incoming));
|
||||
|
||||
if (got_connections != ARRAY_SIZE(tcp_incoming))
|
||||
return;
|
||||
|
||||
/* Once all clients are accepted - start reading */
|
||||
for (i = 0; i < ARRAY_SIZE(tcp_incoming); i++) {
|
||||
incoming = &tcp_incoming[i];
|
||||
ASSERT(0 == uv_read_start((uv_stream_t*) incoming, alloc_cb, read_cb));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_IMPL(tcp_close_accept) {
|
||||
unsigned int i;
|
||||
uv_loop_t* loop;
|
||||
uv_tcp_t* client;
|
||||
|
||||
/*
|
||||
* A little explanation of what goes on below:
|
||||
*
|
||||
* We'll create server and connect to it using two clients, each writing one
|
||||
* byte once connected.
|
||||
*
|
||||
* When all clients will be accepted by server - we'll start reading from them
|
||||
* and, on first client's first byte, will close second client and server.
|
||||
* After that, we'll immediately initiate new connection to server using
|
||||
* tcp_check handle (thus, reusing fd from second client).
|
||||
*
|
||||
* In this situation uv__io_poll()'s event list should still contain read
|
||||
* event for second client, and, if not cleaned up properly, `tcp_check` will
|
||||
* receive stale event of second incoming and invoke `connect_cb` with zero
|
||||
* status.
|
||||
*/
|
||||
|
||||
loop = uv_default_loop();
|
||||
addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
|
||||
|
||||
ASSERT(0 == uv_tcp_init(loop, &tcp_server));
|
||||
ASSERT(0 == uv_tcp_bind(&tcp_server, addr));
|
||||
ASSERT(0 == uv_listen((uv_stream_t*) &tcp_server,
|
||||
ARRAY_SIZE(tcp_outgoing),
|
||||
connection_cb));
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(tcp_outgoing); i++) {
|
||||
client = tcp_outgoing + i;
|
||||
|
||||
ASSERT(0 == uv_tcp_init(loop, client));
|
||||
ASSERT(0 == uv_tcp_connect(&connect_reqs[i], client, addr, connect_cb));
|
||||
}
|
||||
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
|
||||
ASSERT(ARRAY_SIZE(tcp_outgoing) == got_connections);
|
||||
ASSERT((ARRAY_SIZE(tcp_outgoing) + 2) == close_cb_called);
|
||||
ASSERT(ARRAY_SIZE(tcp_outgoing) == write_cb_called);
|
||||
ASSERT(1 == read_cb_called);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user