diff --git a/CMakeLists.txt b/CMakeLists.txt index 3d5e3759..640f8a19 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -584,6 +584,7 @@ if(LIBUV_BUILD_TESTS) test/test-tcp-rst.c test/test-tcp-shutdown-after-write.c test/test-tcp-try-write.c + test/test-tcp-write-in-a-row.c test/test-tcp-try-write-error.c test/test-tcp-unexpected-read.c test/test-tcp-write-after-connect.c diff --git a/Makefile.am b/Makefile.am index 66e18b1d..b0b4f252 100644 --- a/Makefile.am +++ b/Makefile.am @@ -277,6 +277,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-writealot.c \ test/test-tcp-write-fail.c \ test/test-tcp-try-write.c \ + test/test-tcp-write-in-a-row.c \ test/test-tcp-try-write-error.c \ test/test-tcp-write-queue-order.c \ test/test-test-macros.c \ diff --git a/src/unix/stream.c b/src/unix/stream.c index 95ca92d5..0b755de6 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -841,9 +841,16 @@ static void uv__write(uv_stream_t* stream) { QUEUE* q; uv_write_t* req; ssize_t n; + int count; assert(uv__stream_fd(stream) >= 0); + /* Prevent loop starvation when the consumer of this stream read as fast as + * (or faster than) we can write it. This `count` mechanism does not need to + * change even if we switch to edge-triggered I/O. + */ + count = 32; + for (;;) { if (QUEUE_EMPTY(&stream->write_queue)) return; @@ -862,10 +869,13 @@ static void uv__write(uv_stream_t* stream) { req->send_handle = NULL; if (uv__write_req_update(stream, req, n)) { uv__write_req_finish(req); - return; /* TODO(bnoordhuis) Start trying to write the next request. */ + if (count-- > 0) + continue; /* Start trying to write the next request. */ + + return; } } else if (n != UV_EAGAIN) - break; + goto error; /* If this is a blocking stream, try again. */ if (stream->flags & UV_HANDLE_BLOCKING_WRITES) @@ -880,6 +890,7 @@ static void uv__write(uv_stream_t* stream) { return; } +error: req->error = n; uv__write_req_finish(req); uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); diff --git a/test/test-list.h b/test/test-list.h index ce4d4920..8983cbc3 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -105,6 +105,7 @@ TEST_DECLARE (tcp_write_after_connect) TEST_DECLARE (tcp_writealot) TEST_DECLARE (tcp_write_fail) TEST_DECLARE (tcp_try_write) +TEST_DECLARE (tcp_write_in_a_row) TEST_DECLARE (tcp_try_write_error) TEST_DECLARE (tcp_write_queue_order) TEST_DECLARE (tcp_open) @@ -670,6 +671,7 @@ TASK_LIST_START TEST_HELPER (tcp_write_fail, tcp4_echo_server) TEST_ENTRY (tcp_try_write) + TEST_ENTRY (tcp_write_in_a_row) TEST_ENTRY (tcp_try_write_error) TEST_ENTRY (tcp_write_queue_order) diff --git a/test/test-tcp-write-in-a-row.c b/test/test-tcp-write-in-a-row.c new file mode 100644 index 00000000..f04d48fc --- /dev/null +++ b/test/test-tcp-write-in-a-row.c @@ -0,0 +1,141 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include +#include +#include + +#include "task.h" +#include "uv.h" + +static uv_tcp_t server; +static uv_tcp_t client; +static uv_tcp_t incoming; +static int connect_cb_called; +static int close_cb_called; +static int connection_cb_called; +static int write_cb_called; +static uv_write_t small_write; +static uv_write_t big_write; + +/* 10 MB, which is large than the send buffer size and the recv buffer */ +static char data[1024 * 1024 * 10]; + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + +static void write_cb(uv_write_t* w, int status) { + /* the small write should finish immediately after the big write */ + ASSERT_EQ(0, uv_stream_get_write_queue_size((uv_stream_t*) &client)); + + write_cb_called++; + + if (write_cb_called == 2) { + /* we are done */ + uv_close((uv_handle_t*) &client, close_cb); + uv_close((uv_handle_t*) &incoming, close_cb); + uv_close((uv_handle_t*) &server, close_cb); + } +} + +static void connect_cb(uv_connect_t* _, int status) { + int r; + uv_buf_t buf; + size_t write_queue_size0, write_queue_size1; + + ASSERT_EQ(0, status); + connect_cb_called++; + + /* fire a big write */ + buf = uv_buf_init(data, sizeof(data)); + r = uv_write(&small_write, (uv_stream_t*) &client, &buf, 1, write_cb); + ASSERT_EQ(0, r); + + /* check that the write process gets stuck */ + write_queue_size0 = uv_stream_get_write_queue_size((uv_stream_t*) &client); + ASSERT_GT(write_queue_size0, 0); + + /* fire a small write, which should be queued */ + buf = uv_buf_init("A", 1); + r = uv_write(&big_write, (uv_stream_t*) &client, &buf, 1, write_cb); + ASSERT_EQ(0, r); + + write_queue_size1 = uv_stream_get_write_queue_size((uv_stream_t*) &client); + ASSERT_EQ(write_queue_size1, write_queue_size0 + 1); +} + +static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { + static char base[1024]; + + buf->base = base; + buf->len = sizeof(base); +} + +static void read_cb(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {} + +static void connection_cb(uv_stream_t* tcp, int status) { + ASSERT_EQ(0, status); + connection_cb_called++; + + ASSERT_EQ(0, uv_tcp_init(tcp->loop, &incoming)); + ASSERT_EQ(0, uv_accept(tcp, (uv_stream_t*) &incoming)); + ASSERT_EQ(0, uv_read_start((uv_stream_t*) &incoming, alloc_cb, read_cb)); +} + +static void start_server(void) { + struct sockaddr_in addr; + + ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + + ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), &server)); + ASSERT_EQ(0, uv_tcp_bind(&server, (struct sockaddr*) &addr, 0)); + ASSERT_EQ(0, uv_listen((uv_stream_t*) &server, 128, connection_cb)); +} + +TEST_IMPL(tcp_write_in_a_row) { +#if defined(_WIN32) + RETURN_SKIP("tcp_write_in_a_row does not work on Windows"); +#endif + + uv_connect_t connect_req; + struct sockaddr_in addr; + + start_server(); + + ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), &client)); + ASSERT_EQ(0, uv_tcp_connect(&connect_req, + &client, + (struct sockaddr*) &addr, + connect_cb)); + + ASSERT_EQ(0, uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + + ASSERT_EQ(1, connect_cb_called); + ASSERT_EQ(3, close_cb_called); + ASSERT_EQ(1, connection_cb_called); + ASSERT_EQ(2, write_cb_called); + + MAKE_VALGRIND_HAPPY(); + return 0; +}