diff --git a/CMakeLists.txt b/CMakeLists.txt index 650d6ca0..6502f6e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -555,6 +555,7 @@ if(LIBUV_BUILD_TESTS) test/test-pipe-server-close.c test/test-pipe-set-fchmod.c test/test-pipe-set-non-blocking.c + test/test-pipe-try-write.c test/test-platform-output.c test/test-poll-close-doesnt-corrupt-stack.c test/test-poll-close.c diff --git a/src/win/pipe.c b/src/win/pipe.c index 94233455..85b3e7da 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -1786,6 +1786,7 @@ int uv__pipe_try_write(uv_pipe_t* handle, const uv_buf_t bufs[], unsigned int nbufs) { OVERLAPPED overlapped; + HANDLE event; const uv_buf_t* buf; int bytes_written; unsigned int idx; @@ -1807,44 +1808,47 @@ int uv__pipe_try_write(uv_pipe_t* handle, memset(&overlapped, 0, sizeof(overlapped)); - overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); - if (overlapped.hEvent == NULL) { + event = CreateEvent(NULL, FALSE, FALSE, NULL); + if (event == NULL) { uv_fatal_error(GetLastError(), "CreateEvent"); } + overlapped.hEvent = (HANDLE)((uintptr_t)event | 1); + bytes_written = 0; for (err = 0, idx = 0; idx < nbufs; err = 0, idx += 1) { buf = &bufs[idx]; - if (WriteFile(handle->handle, buf->base, buf->len, NULL, &overlapped)) { - bytes_written += buf->len; - continue; - } + if (!WriteFile(handle->handle, buf->base, buf->len, NULL, &overlapped)) { + err = GetLastError(); + if (err != ERROR_IO_PENDING) { + break; + } - err = GetLastError(); - if (err != ERROR_IO_PENDING) { - break; + err = WaitForSingleObject(event, timeout); + if (err != WAIT_OBJECT_0) { + CancelIoEx(handle->handle, &overlapped); + } } - - err = WaitForSingleObject(overlapped.hEvent, timeout); - if (err == WAIT_OBJECT_0) { - bytes_written += buf->len; - continue; - } - - if (err == WAIT_TIMEOUT && - CancelIo(handle->handle) && - GetOverlappedResult(handle->handle, &overlapped, &err, TRUE)) { + + if (GetOverlappedResult(handle->handle, &overlapped, &err, TRUE)) { bytes_written += err; - err = WSAEWOULDBLOCK; /* Translates to UV_EAGAIN. */ + if (err == buf->len) + continue; + err = WSAEWOULDBLOCK; /* Ignored later. */ } else { err = GetLastError(); + if (err == ERROR_OPERATION_ABORTED) { + err = WSAEWOULDBLOCK; /* Translates to UV_EAGAIN. */ + } } break; } - CloseHandle(overlapped.hEvent); + if (!CloseHandle(event)) { + uv_fatal_error(GetLastError(), "CloseHandle"); + } if (bytes_written == 0 && err != 0) { return uv_translate_sys_error(err); diff --git a/test/test-pipe-try-write.c b/test/test-pipe-try-write.c index 346a1c39..cacc1244 100644 --- a/test/test-pipe-try-write.c +++ b/test/test-pipe-try-write.c @@ -1,27 +1,64 @@ #include "uv.h" #include "task.h" -static void (*spam)(uv_pipe_t* handle); -static uv_pipe_t client_handle; -static uv_pipe_t peer_handle; -static uv_pipe_t server_handle; -static uv_write_t write_req; +typedef struct pipe_ctx_s { + uv_pipe_t handle; + uv_write_t write_req; + ssize_t read; + ssize_t written; +} pipe_ctx_t; +static void (*spam)(uv_pipe_t* handle); +static pipe_ctx_t client; +static pipe_ctx_t peer; +static uv_pipe_t server_handle; static void write_cb(uv_write_t* req, int status) { - ASSERT(0 == status); + ASSERT_EQ(0, status); +} + + +static int do_try_write(uv_pipe_t* handle, uv_buf_t* buf, size_t size) { + int rc; + pipe_ctx_t* pc; + pc = container_of(handle, struct pipe_ctx_s, handle); + rc = 0; + do { + pc->written += rc; + rc = uv_try_write((uv_stream_t*)handle, buf, size); + } while (rc > 0); + + return rc; +}; + + +static void handle_read(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf) { + pipe_ctx_t* send_ctx; + pipe_ctx_t* recv_ctx; + recv_ctx = container_of(handle, struct pipe_ctx_s, handle); + send_ctx = recv_ctx == &client ? &peer : &client; + ASSERT_UINT64_GT(nread, 0); + if (send_ctx->written >= recv_ctx->read + (ssize_t)buf->len) { + ASSERT_UINT64_EQ(nread, (ssize_t)buf->len); /* Expect saturation. */ + } + recv_ctx->read += nread; } static void spam_0(uv_pipe_t* handle) { uv_buf_t buf; + pipe_ctx_t* pc; buf = uv_buf_init("", 0); - ASSERT(0 == uv_try_write((uv_stream_t*) handle, &buf, 1)); + ASSERT_EQ(0, uv_try_write((uv_stream_t*) handle, &buf, 1)); /* Non-empty write to start the event loop moving. */ + pc = container_of(handle, struct pipe_ctx_s, handle); buf = uv_buf_init("hello, world", sizeof("hello, world") - 1); - ASSERT(0 == uv_write(&write_req, (uv_stream_t*) handle, &buf, 1, write_cb)); + ASSERT_EQ(0, + uv_write(&pc->write_req,(uv_stream_t*) handle, &buf, 1, write_cb)); } @@ -30,11 +67,9 @@ static void spam_1(uv_pipe_t* handle) { int rc; buf = uv_buf_init("hello, world", sizeof("hello, world") - 1); - do - rc = uv_try_write((uv_stream_t*) handle, &buf, 1); - while (rc > 0); + rc = do_try_write(handle, &buf, 1); - ASSERT(rc == UV_EAGAIN); + ASSERT_EQ(rc, UV_EAGAIN); } @@ -44,12 +79,9 @@ static void spam_2(uv_pipe_t* handle) { bufs[0] = uv_buf_init("hello,", sizeof("hello,") - 1); bufs[1] = uv_buf_init(" world", sizeof(" world") - 1); + rc = do_try_write(handle, bufs, ARRAY_SIZE(bufs)); - do - rc = uv_try_write((uv_stream_t*) handle, bufs, ARRAY_SIZE(bufs)); - while (rc > 0); - - ASSERT(rc == UV_EAGAIN); + ASSERT_EQ(rc, UV_EAGAIN); } @@ -62,35 +94,34 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { - if (spam == spam_0) { - ASSERT(nread > 0); /* Expect some bytes. */ - } else { - ASSERT(nread == (ssize_t) buf->len); /* Expect saturation. */ + ASSERT_UINT64_GT(nread, 0); /* Expect some bytes. */ + if (spam != spam_0) { + handle_read(handle, nread, buf); } - if (handle == (uv_stream_t*) &peer_handle) { - spam(&client_handle); + if (handle == (uv_stream_t*) &client.handle) { + spam(&client.handle); } else { - uv_close((uv_handle_t*) &peer_handle, NULL); - uv_close((uv_handle_t*) &client_handle, NULL); + uv_close((uv_handle_t*) &peer.handle, NULL); + uv_close((uv_handle_t*) &client.handle, NULL); uv_close((uv_handle_t*) &server_handle, NULL); } } static void connection_cb(uv_stream_t* handle, int status) { - ASSERT(0 == status); - ASSERT(0 == uv_pipe_init(uv_default_loop(), &peer_handle, 0)); - ASSERT(0 == uv_accept((uv_stream_t*) &server_handle, - (uv_stream_t*) &peer_handle)); - ASSERT(0 == uv_read_start((uv_stream_t*) &peer_handle, alloc_cb, read_cb)); - spam(&peer_handle); + ASSERT_EQ(0, status); + ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &peer.handle, 0)); + ASSERT_EQ(0, uv_accept((uv_stream_t*) &server_handle, + (uv_stream_t*) &peer.handle)); + ASSERT_EQ(0, uv_read_start((uv_stream_t*) &peer.handle, alloc_cb, read_cb)); + spam(&peer.handle); } static void connect_cb(uv_connect_t* req, int status) { - ASSERT(0 == status); - ASSERT(0 == uv_read_start((uv_stream_t*) &client_handle, alloc_cb, read_cb)); + ASSERT_EQ(0, status); + ASSERT_EQ(0, uv_read_start((uv_stream_t*) &client.handle, alloc_cb, read_cb)); } @@ -98,12 +129,12 @@ static int pipe_try_write(void (*spammer)(uv_pipe_t*)) { uv_connect_t connect_req; spam = spammer; - ASSERT(0 == uv_pipe_init(uv_default_loop(), &client_handle, 0)); - ASSERT(0 == uv_pipe_init(uv_default_loop(), &server_handle, 0)); - ASSERT(0 == uv_pipe_bind(&server_handle, TEST_PIPENAME)); - ASSERT(0 == uv_listen((uv_stream_t*) &server_handle, 1, connection_cb)); - uv_pipe_connect(&connect_req, &client_handle, TEST_PIPENAME, connect_cb); - ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &client.handle, 0)); + ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &server_handle, 0)); + ASSERT_EQ(0, uv_pipe_bind(&server_handle, TEST_PIPENAME)); + ASSERT_EQ(0, uv_listen((uv_stream_t*) &server_handle, 1, connection_cb)); + uv_pipe_connect(&connect_req, &client.handle, TEST_PIPENAME, connect_cb); + ASSERT_EQ(0, uv_run(uv_default_loop(), UV_RUN_DEFAULT)); MAKE_VALGRIND_HAPPY(); return 0;