diff --git a/src/unix/stream.c b/src/unix/stream.c index 4d62a23f..5a96b66b 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -950,10 +950,16 @@ error: static void uv__write_callbacks(uv_stream_t* stream) { uv_write_t* req; QUEUE* q; + QUEUE pq; - while (!QUEUE_EMPTY(&stream->write_completed_queue)) { + if (QUEUE_EMPTY(&stream->write_completed_queue)) + return; + + QUEUE_MOVE(&stream->write_completed_queue, &pq); + + while (!QUEUE_EMPTY(&pq)) { /* Pop a req off write_completed_queue. */ - q = QUEUE_HEAD(&stream->write_completed_queue); + q = QUEUE_HEAD(&pq); req = QUEUE_DATA(q, uv_write_t, queue); QUEUE_REMOVE(q); uv__req_unregister(stream->loop, req); @@ -969,8 +975,6 @@ static void uv__write_callbacks(uv_stream_t* stream) { if (req->cb) req->cb(req, req->error); } - - assert(QUEUE_EMPTY(&stream->write_completed_queue)); } diff --git a/test/test-list.h b/test/test-list.h index 6c490e1d..ae4ad76f 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -121,6 +121,7 @@ TEST_DECLARE (tcp_bind6_error_addrnotavail) TEST_DECLARE (tcp_bind6_error_fault) TEST_DECLARE (tcp_bind6_error_inval) TEST_DECLARE (tcp_bind6_localhost_ok) +TEST_DECLARE (tcp_write_ready) TEST_DECLARE (udp_alloc_cb_fail) TEST_DECLARE (udp_bind) TEST_DECLARE (udp_bind_reuseaddr) @@ -545,6 +546,8 @@ TASK_LIST_START TEST_ENTRY (tcp_open_bound) TEST_ENTRY (tcp_open_connected) TEST_HELPER (tcp_open_connected, tcp4_echo_server) + TEST_ENTRY (tcp_write_ready) + TEST_HELPER (tcp_write_ready, tcp4_echo_server) TEST_ENTRY (tcp_shutdown_after_write) TEST_HELPER (tcp_shutdown_after_write, tcp4_echo_server) diff --git a/test/test-tcp-open.c b/test/test-tcp-open.c index f5d8f136..fc94c846 100644 --- a/test/test-tcp-open.c +++ b/test/test-tcp-open.c @@ -30,6 +30,7 @@ #endif static int shutdown_cb_called = 0; +static int shutdown_requested = 0; static int connect_cb_called = 0; static int write_cb_called = 0; static int close_cb_called = 0; @@ -37,6 +38,8 @@ static int close_cb_called = 0; static uv_connect_t connect_req; static uv_shutdown_t shutdown_req; static uv_write_t write_req; +static uv_timer_t tm; +static uv_tcp_t client; static void startup(void) { @@ -115,6 +118,20 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { ASSERT(memcmp("PING", buf->base, nread) == 0); } else { + ASSERT(nread == UV_EOF); + uv_close((uv_handle_t*)tcp, close_cb); + } +} + + +static void read1_cb(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { + int i; + ASSERT(tcp != NULL); + + if (nread >= 0) { + for (i = 0; i < nread; ++i) + ASSERT(buf->base[i] == 'P'); + } else { ASSERT(nread == UV_EOF); printf("GOT EOF\n"); uv_close((uv_handle_t*)tcp, close_cb); @@ -134,6 +151,37 @@ static void write_cb(uv_write_t* req, int status) { } +static void write1_cb(uv_write_t* req, int status) { + uv_buf_t buf; + int r; + + ASSERT(req != NULL); + if (status) { + ASSERT(shutdown_cb_called); + return; + } + + if (shutdown_requested) + return; + + buf = uv_buf_init("P", 1); + r = uv_write(&write_req, req->handle, &buf, 1, write1_cb); + ASSERT(r == 0); + + write_cb_called++; +} + + +static void timer_cb(uv_timer_t* handle) { + int r; + + /* Shutdown on drain. */ + r = uv_shutdown(&shutdown_req, &client, shutdown_cb); + ASSERT(r == 0); + shutdown_requested++; +} + + static void connect_cb(uv_connect_t* req, int status) { uv_buf_t buf = uv_buf_init("PING", 4); uv_stream_t* stream; @@ -158,9 +206,35 @@ static void connect_cb(uv_connect_t* req, int status) { } +static void connect1_cb(uv_connect_t* req, int status) { + uv_buf_t buf; + uv_stream_t* stream; + int r; + + ASSERT(req == &connect_req); + ASSERT(status == 0); + + stream = req->handle; + connect_cb_called++; + + r = uv_timer_init(uv_default_loop(), &tm); + ASSERT(r == 0); + + r = uv_timer_start(&tm, timer_cb, 2000, 0); + ASSERT(r == 0); + + buf = uv_buf_init("P", 1); + r = uv_write(&write_req, stream, &buf, 1, write1_cb); + ASSERT(r == 0); + + /* Start reading */ + r = uv_read_start(stream, alloc_cb, read1_cb); + ASSERT(r == 0); +} + + TEST_IMPL(tcp_open) { struct sockaddr_in addr; - uv_tcp_t client; uv_os_sock_t sock; int r; @@ -289,3 +363,38 @@ TEST_IMPL(tcp_open_connected) { MAKE_VALGRIND_HAPPY(); return 0; } + + +TEST_IMPL(tcp_write_ready) { + struct sockaddr_in addr; + uv_os_sock_t sock; + int r; + + ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + startup(); + sock = create_tcp_socket(); + + r = uv_tcp_init(uv_default_loop(), &client); + ASSERT(r == 0); + + r = uv_tcp_open(&client, sock); + ASSERT(r == 0); + + r = uv_tcp_connect(&connect_req, + &client, + (const struct sockaddr*) &addr, + connect1_cb); + ASSERT(r == 0); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT(shutdown_cb_called == 1); + ASSERT(shutdown_requested == 1); + ASSERT(connect_cb_called == 1); + ASSERT(write_cb_called > 0); + ASSERT(close_cb_called == 1); + + MAKE_VALGRIND_HAPPY(); + return 0; +}