diff --git a/CMakeLists.txt b/CMakeLists.txt index de1272a6..624bf2b1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -525,6 +525,7 @@ if(LIBUV_BUILD_TESTS) test/test-semaphore.c test/test-shutdown-close.c test/test-shutdown-eof.c + test/test-shutdown-simultaneous.c test/test-shutdown-twice.c test/test-signal-multiple-loops.c test/test-signal-pending-on-close.c diff --git a/Makefile.am b/Makefile.am index 4d8fb40d..02ba5468 100644 --- a/Makefile.am +++ b/Makefile.am @@ -241,6 +241,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-semaphore.c \ test/test-shutdown-close.c \ test/test-shutdown-eof.c \ + test/test-shutdown-simultaneous.c \ test/test-shutdown-twice.c \ test/test-signal-multiple-loops.c \ test/test-signal-pending-on-close.c \ diff --git a/src/unix/stream.c b/src/unix/stream.c index f64c01cf..bc64fe8f 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -926,10 +926,9 @@ static void uv__write(uv_stream_t* stream) { } req->error = n; + // XXX(jwn): this must call uv__stream_flush_write_queue(stream, n) here, since we won't generate any more events uv__write_req_finish(req); uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); - if (!uv__io_active(&stream->io_watcher, POLLIN)) - uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); } @@ -1013,8 +1012,7 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { stream->flags &= ~UV_HANDLE_READING; stream->flags &= ~UV_HANDLE_READABLE; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); stream->read_cb(stream, UV_EOF, buf); } @@ -1204,8 +1202,7 @@ static void uv__read(uv_stream_t* stream) { if (stream->flags & UV_HANDLE_READING) { stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); } } @@ -1582,8 +1579,7 @@ int uv_read_stop(uv_stream_t* stream) { stream->flags &= ~UV_HANDLE_READING; uv__io_stop(stream->loop, &stream->io_watcher, POLLIN); - if (!uv__io_active(&stream->io_watcher, POLLOUT)) - uv__handle_stop(stream); + uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); stream->read_cb = NULL; diff --git a/test/echo-server.c b/test/echo-server.c index e69c0e26..058c9925 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -65,24 +65,35 @@ static void after_write(uv_write_t* req, int status) { static void after_shutdown(uv_shutdown_t* req, int status) { + ASSERT_EQ(status, 0); uv_close((uv_handle_t*) req->handle, on_close); free(req); } + +static void on_shutdown(uv_shutdown_t* req, int status) { + ASSERT_EQ(status, 0); + free(req); +} + + static void after_read(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { int i; write_req_t *wr; uv_shutdown_t* sreq; + int shutdown = 0; if (nread < 0) { /* Error or EOF */ - ASSERT(nread == UV_EOF); + ASSERT_EQ(nread, UV_EOF); free(buf->base); sreq = malloc(sizeof* sreq); - ASSERT(0 == uv_shutdown(sreq, handle, after_shutdown)); + if (uv_is_writable(handle)) { + ASSERT_EQ(0, uv_shutdown(sreq, handle, after_shutdown)); + } return; } @@ -95,25 +106,28 @@ static void after_read(uv_stream_t* handle, /* * Scan for the letter Q which signals that we should quit the server. * If we get QS it means close the stream. + * If we get QSS it means shutdown the stream. * If we get QSH it means disable linger before close the socket. */ - if (!server_closed) { - for (i = 0; i < nread; i++) { - if (buf->base[i] == 'Q') { - if (i + 1 < nread && buf->base[i + 1] == 'S') { - int reset = 0; - if (i + 2 < nread && buf->base[i + 2] == 'H') - reset = 1; - free(buf->base); - if (reset && handle->type == UV_TCP) - ASSERT(0 == uv_tcp_close_reset((uv_tcp_t*) handle, on_close)); - else - uv_close((uv_handle_t*) handle, on_close); - return; - } else { - uv_close(server, on_server_close); - server_closed = 1; - } + for (i = 0; i < nread; i++) { + if (buf->base[i] == 'Q') { + if (i + 1 < nread && buf->base[i + 1] == 'S') { + int reset = 0; + if (i + 2 < nread && buf->base[i + 2] == 'S') + shutdown = 1; + if (i + 2 < nread && buf->base[i + 2] == 'H') + reset = 1; + if (reset && handle->type == UV_TCP) + ASSERT_EQ(0, uv_tcp_close_reset((uv_tcp_t*) handle, on_close)); + else if (shutdown) + break; + else + uv_close((uv_handle_t*) handle, on_close); + free(buf->base); + return; + } else if (!server_closed) { + uv_close(server, on_server_close); + server_closed = 1; } } } @@ -125,6 +139,9 @@ static void after_read(uv_stream_t* handle, if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) { FATAL("uv_write failed"); } + + if (shutdown) + ASSERT_EQ(0, uv_shutdown(malloc(sizeof* sreq), handle, on_shutdown)); } diff --git a/test/test-list.h b/test/test-list.h index 424eea07..59b95da9 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -206,6 +206,7 @@ TEST_DECLARE (connection_fail_doesnt_auto_close) TEST_DECLARE (shutdown_close_tcp) TEST_DECLARE (shutdown_close_pipe) TEST_DECLARE (shutdown_eof) +TEST_DECLARE (shutdown_simultaneous) TEST_DECLARE (shutdown_twice) TEST_DECLARE (callback_stack) TEST_DECLARE (env_vars) @@ -784,6 +785,9 @@ TASK_LIST_START TEST_ENTRY (shutdown_eof) TEST_HELPER (shutdown_eof, tcp4_echo_server) + TEST_ENTRY (shutdown_simultaneous) + TEST_HELPER (shutdown_simultaneous, tcp4_echo_server) + TEST_ENTRY (shutdown_twice) TEST_HELPER (shutdown_twice, tcp4_echo_server) diff --git a/test/test-shutdown-simultaneous.c b/test/test-shutdown-simultaneous.c new file mode 100644 index 00000000..7de3bd42 --- /dev/null +++ b/test/test-shutdown-simultaneous.c @@ -0,0 +1,135 @@ +/* Copyright libuv project and contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" +#include +#include + +static uv_tcp_t tcp; +static uv_connect_t connect_req; +static uv_shutdown_t shutdown_req; +static uv_buf_t qbuf; +static int got_q; +static int got_eof; +static int called_connect_cb; +static int called_shutdown_cb; +static int called_tcp_close_cb; + + +static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { + buf->base = malloc(size); + buf->len = size; +} + + +static void shutdown_cb(uv_shutdown_t *req, int status) { + ASSERT_PTR_EQ(req, &shutdown_req); + + ASSERT_EQ(called_connect_cb, 1); + ASSERT_EQ(called_tcp_close_cb, 0); +} + + +static void read_cb(uv_stream_t* t, ssize_t nread, const uv_buf_t* buf) { + ASSERT_PTR_EQ((uv_tcp_t*)t, &tcp); + + if (nread == 0) { + free(buf->base); + return; + } + + if (!got_q) { + ASSERT_EQ(nread, 4); + ASSERT_EQ(got_eof, 0); + ASSERT_MEM_EQ(buf->base, "QQSS", 4); + free(buf->base); + got_q = 1; + puts("got QQSS"); + /* Shutdown our end of the connection simultaneously */ + uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb); + called_shutdown_cb++; + } else { + ASSERT_EQ(nread, UV_EOF); + if (buf->base) { + free(buf->base); + } + got_eof = 1; + puts("got EOF"); + } +} + + +static void connect_cb(uv_connect_t *req, int status) { + ASSERT_EQ(status, 0); + ASSERT_PTR_EQ(req, &connect_req); + + /* Start reading from our connection so we can receive the EOF. */ + ASSERT_EQ(0, uv_read_start((uv_stream_t*)&tcp, alloc_cb, read_cb)); + + /* Check error handling. */ + ASSERT_EQ(UV_EALREADY, uv_read_start((uv_stream_t*)&tcp, alloc_cb, read_cb)); + ASSERT_EQ(UV_EINVAL, uv_read_start(NULL, alloc_cb, read_cb)); + ASSERT_EQ(UV_EINVAL, uv_read_start((uv_stream_t*)&tcp, NULL, read_cb)); + ASSERT_EQ(UV_EINVAL, uv_read_start((uv_stream_t*)&tcp, alloc_cb, NULL)); + + /* + * Write the letter 'Q' and 'QSS` to gracefully kill the echo-server. This + * will not effect our connection. + */ + ASSERT_EQ(qbuf.len, uv_try_write((uv_stream_t*) &tcp, &qbuf, 1)); + + called_connect_cb++; + ASSERT_EQ(called_shutdown_cb, 0); +} + + +/* + * This test has a client which connects to the echo_server and immediately + * issues a shutdown. We check that this does not cause libuv to hang. + */ +TEST_IMPL(shutdown_simultaneous) { + struct sockaddr_in server_addr; + int r; + + qbuf.base = "QQSS"; + qbuf.len = 4; + + ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr)); + r = uv_tcp_init(uv_default_loop(), &tcp); + ASSERT_EQ(r, 0); + + r = uv_tcp_connect(&connect_req, + &tcp, + (const struct sockaddr*) &server_addr, + connect_cb); + ASSERT_EQ(r, 0); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT_EQ(called_connect_cb, 1); + ASSERT_EQ(called_shutdown_cb, 1); + ASSERT_EQ(got_eof, 1); + ASSERT_EQ(got_q, 1); + + MAKE_VALGRIND_HAPPY(); + return 0; +}