win: fixes in uv__pipe_try_write() (#3825 2/2)
Return `UV_EAGAIN` on `ERROR_OPERATION_ABORTED`. Use the correct format for `overlapped.hEvent`. Some refactoring to always wait for the overlapped result. Modernize tests and some improvements.
This commit is contained in:
parent
244e0e2059
commit
e1143f1265
@ -555,6 +555,7 @@ if(LIBUV_BUILD_TESTS)
|
|||||||
test/test-pipe-server-close.c
|
test/test-pipe-server-close.c
|
||||||
test/test-pipe-set-fchmod.c
|
test/test-pipe-set-fchmod.c
|
||||||
test/test-pipe-set-non-blocking.c
|
test/test-pipe-set-non-blocking.c
|
||||||
|
test/test-pipe-try-write.c
|
||||||
test/test-platform-output.c
|
test/test-platform-output.c
|
||||||
test/test-poll-close-doesnt-corrupt-stack.c
|
test/test-poll-close-doesnt-corrupt-stack.c
|
||||||
test/test-poll-close.c
|
test/test-poll-close.c
|
||||||
|
|||||||
@ -1786,6 +1786,7 @@ int uv__pipe_try_write(uv_pipe_t* handle,
|
|||||||
const uv_buf_t bufs[],
|
const uv_buf_t bufs[],
|
||||||
unsigned int nbufs) {
|
unsigned int nbufs) {
|
||||||
OVERLAPPED overlapped;
|
OVERLAPPED overlapped;
|
||||||
|
HANDLE event;
|
||||||
const uv_buf_t* buf;
|
const uv_buf_t* buf;
|
||||||
int bytes_written;
|
int bytes_written;
|
||||||
unsigned int idx;
|
unsigned int idx;
|
||||||
@ -1807,44 +1808,47 @@ int uv__pipe_try_write(uv_pipe_t* handle,
|
|||||||
|
|
||||||
memset(&overlapped, 0, sizeof(overlapped));
|
memset(&overlapped, 0, sizeof(overlapped));
|
||||||
|
|
||||||
overlapped.hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
|
event = CreateEvent(NULL, FALSE, FALSE, NULL);
|
||||||
if (overlapped.hEvent == NULL) {
|
if (event == NULL) {
|
||||||
uv_fatal_error(GetLastError(), "CreateEvent");
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
overlapped.hEvent = (HANDLE)((uintptr_t)event | 1);
|
||||||
|
|
||||||
bytes_written = 0;
|
bytes_written = 0;
|
||||||
for (err = 0, idx = 0; idx < nbufs; err = 0, idx += 1) {
|
for (err = 0, idx = 0; idx < nbufs; err = 0, idx += 1) {
|
||||||
buf = &bufs[idx];
|
buf = &bufs[idx];
|
||||||
|
|
||||||
if (WriteFile(handle->handle, buf->base, buf->len, NULL, &overlapped)) {
|
if (!WriteFile(handle->handle, buf->base, buf->len, NULL, &overlapped)) {
|
||||||
bytes_written += buf->len;
|
err = GetLastError();
|
||||||
continue;
|
if (err != ERROR_IO_PENDING) {
|
||||||
}
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
err = GetLastError();
|
err = WaitForSingleObject(event, timeout);
|
||||||
if (err != ERROR_IO_PENDING) {
|
if (err != WAIT_OBJECT_0) {
|
||||||
break;
|
CancelIoEx(handle->handle, &overlapped);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = WaitForSingleObject(overlapped.hEvent, timeout);
|
if (GetOverlappedResult(handle->handle, &overlapped, &err, TRUE)) {
|
||||||
if (err == WAIT_OBJECT_0) {
|
|
||||||
bytes_written += buf->len;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (err == WAIT_TIMEOUT &&
|
|
||||||
CancelIo(handle->handle) &&
|
|
||||||
GetOverlappedResult(handle->handle, &overlapped, &err, TRUE)) {
|
|
||||||
bytes_written += err;
|
bytes_written += err;
|
||||||
err = WSAEWOULDBLOCK; /* Translates to UV_EAGAIN. */
|
if (err == buf->len)
|
||||||
|
continue;
|
||||||
|
err = WSAEWOULDBLOCK; /* Ignored later. */
|
||||||
} else {
|
} else {
|
||||||
err = GetLastError();
|
err = GetLastError();
|
||||||
|
if (err == ERROR_OPERATION_ABORTED) {
|
||||||
|
err = WSAEWOULDBLOCK; /* Translates to UV_EAGAIN. */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
CloseHandle(overlapped.hEvent);
|
if (!CloseHandle(event)) {
|
||||||
|
uv_fatal_error(GetLastError(), "CloseHandle");
|
||||||
|
}
|
||||||
|
|
||||||
if (bytes_written == 0 && err != 0) {
|
if (bytes_written == 0 && err != 0) {
|
||||||
return uv_translate_sys_error(err);
|
return uv_translate_sys_error(err);
|
||||||
|
|||||||
@ -1,27 +1,64 @@
|
|||||||
#include "uv.h"
|
#include "uv.h"
|
||||||
#include "task.h"
|
#include "task.h"
|
||||||
|
|
||||||
static void (*spam)(uv_pipe_t* handle);
|
typedef struct pipe_ctx_s {
|
||||||
static uv_pipe_t client_handle;
|
uv_pipe_t handle;
|
||||||
static uv_pipe_t peer_handle;
|
uv_write_t write_req;
|
||||||
static uv_pipe_t server_handle;
|
ssize_t read;
|
||||||
static uv_write_t write_req;
|
ssize_t written;
|
||||||
|
} pipe_ctx_t;
|
||||||
|
|
||||||
|
static void (*spam)(uv_pipe_t* handle);
|
||||||
|
static pipe_ctx_t client;
|
||||||
|
static pipe_ctx_t peer;
|
||||||
|
static uv_pipe_t server_handle;
|
||||||
|
|
||||||
static void write_cb(uv_write_t* req, int status) {
|
static void write_cb(uv_write_t* req, int status) {
|
||||||
ASSERT(0 == status);
|
ASSERT_EQ(0, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int do_try_write(uv_pipe_t* handle, uv_buf_t* buf, size_t size) {
|
||||||
|
int rc;
|
||||||
|
pipe_ctx_t* pc;
|
||||||
|
pc = container_of(handle, struct pipe_ctx_s, handle);
|
||||||
|
rc = 0;
|
||||||
|
do {
|
||||||
|
pc->written += rc;
|
||||||
|
rc = uv_try_write((uv_stream_t*)handle, buf, size);
|
||||||
|
} while (rc > 0);
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
static void handle_read(uv_stream_t* handle,
|
||||||
|
ssize_t nread,
|
||||||
|
const uv_buf_t* buf) {
|
||||||
|
pipe_ctx_t* send_ctx;
|
||||||
|
pipe_ctx_t* recv_ctx;
|
||||||
|
recv_ctx = container_of(handle, struct pipe_ctx_s, handle);
|
||||||
|
send_ctx = recv_ctx == &client ? &peer : &client;
|
||||||
|
ASSERT_UINT64_GT(nread, 0);
|
||||||
|
if (send_ctx->written >= recv_ctx->read + (ssize_t)buf->len) {
|
||||||
|
ASSERT_UINT64_EQ(nread, (ssize_t)buf->len); /* Expect saturation. */
|
||||||
|
}
|
||||||
|
recv_ctx->read += nread;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void spam_0(uv_pipe_t* handle) {
|
static void spam_0(uv_pipe_t* handle) {
|
||||||
uv_buf_t buf;
|
uv_buf_t buf;
|
||||||
|
pipe_ctx_t* pc;
|
||||||
|
|
||||||
buf = uv_buf_init("", 0);
|
buf = uv_buf_init("", 0);
|
||||||
ASSERT(0 == uv_try_write((uv_stream_t*) handle, &buf, 1));
|
ASSERT_EQ(0, uv_try_write((uv_stream_t*) handle, &buf, 1));
|
||||||
|
|
||||||
/* Non-empty write to start the event loop moving. */
|
/* Non-empty write to start the event loop moving. */
|
||||||
|
pc = container_of(handle, struct pipe_ctx_s, handle);
|
||||||
buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
|
buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
|
||||||
ASSERT(0 == uv_write(&write_req, (uv_stream_t*) handle, &buf, 1, write_cb));
|
ASSERT_EQ(0,
|
||||||
|
uv_write(&pc->write_req,(uv_stream_t*) handle, &buf, 1, write_cb));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -30,11 +67,9 @@ static void spam_1(uv_pipe_t* handle) {
|
|||||||
int rc;
|
int rc;
|
||||||
|
|
||||||
buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
|
buf = uv_buf_init("hello, world", sizeof("hello, world") - 1);
|
||||||
do
|
rc = do_try_write(handle, &buf, 1);
|
||||||
rc = uv_try_write((uv_stream_t*) handle, &buf, 1);
|
|
||||||
while (rc > 0);
|
|
||||||
|
|
||||||
ASSERT(rc == UV_EAGAIN);
|
ASSERT_EQ(rc, UV_EAGAIN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -44,12 +79,9 @@ static void spam_2(uv_pipe_t* handle) {
|
|||||||
|
|
||||||
bufs[0] = uv_buf_init("hello,", sizeof("hello,") - 1);
|
bufs[0] = uv_buf_init("hello,", sizeof("hello,") - 1);
|
||||||
bufs[1] = uv_buf_init(" world", sizeof(" world") - 1);
|
bufs[1] = uv_buf_init(" world", sizeof(" world") - 1);
|
||||||
|
rc = do_try_write(handle, bufs, ARRAY_SIZE(bufs));
|
||||||
|
|
||||||
do
|
ASSERT_EQ(rc, UV_EAGAIN);
|
||||||
rc = uv_try_write((uv_stream_t*) handle, bufs, ARRAY_SIZE(bufs));
|
|
||||||
while (rc > 0);
|
|
||||||
|
|
||||||
ASSERT(rc == UV_EAGAIN);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -62,35 +94,34 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
|
|||||||
|
|
||||||
|
|
||||||
static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
|
||||||
if (spam == spam_0) {
|
ASSERT_UINT64_GT(nread, 0); /* Expect some bytes. */
|
||||||
ASSERT(nread > 0); /* Expect some bytes. */
|
if (spam != spam_0) {
|
||||||
} else {
|
handle_read(handle, nread, buf);
|
||||||
ASSERT(nread == (ssize_t) buf->len); /* Expect saturation. */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle == (uv_stream_t*) &peer_handle) {
|
if (handle == (uv_stream_t*) &client.handle) {
|
||||||
spam(&client_handle);
|
spam(&client.handle);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*) &peer_handle, NULL);
|
uv_close((uv_handle_t*) &peer.handle, NULL);
|
||||||
uv_close((uv_handle_t*) &client_handle, NULL);
|
uv_close((uv_handle_t*) &client.handle, NULL);
|
||||||
uv_close((uv_handle_t*) &server_handle, NULL);
|
uv_close((uv_handle_t*) &server_handle, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void connection_cb(uv_stream_t* handle, int status) {
|
static void connection_cb(uv_stream_t* handle, int status) {
|
||||||
ASSERT(0 == status);
|
ASSERT_EQ(0, status);
|
||||||
ASSERT(0 == uv_pipe_init(uv_default_loop(), &peer_handle, 0));
|
ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &peer.handle, 0));
|
||||||
ASSERT(0 == uv_accept((uv_stream_t*) &server_handle,
|
ASSERT_EQ(0, uv_accept((uv_stream_t*) &server_handle,
|
||||||
(uv_stream_t*) &peer_handle));
|
(uv_stream_t*) &peer.handle));
|
||||||
ASSERT(0 == uv_read_start((uv_stream_t*) &peer_handle, alloc_cb, read_cb));
|
ASSERT_EQ(0, uv_read_start((uv_stream_t*) &peer.handle, alloc_cb, read_cb));
|
||||||
spam(&peer_handle);
|
spam(&peer.handle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void connect_cb(uv_connect_t* req, int status) {
|
static void connect_cb(uv_connect_t* req, int status) {
|
||||||
ASSERT(0 == status);
|
ASSERT_EQ(0, status);
|
||||||
ASSERT(0 == uv_read_start((uv_stream_t*) &client_handle, alloc_cb, read_cb));
|
ASSERT_EQ(0, uv_read_start((uv_stream_t*) &client.handle, alloc_cb, read_cb));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -98,12 +129,12 @@ static int pipe_try_write(void (*spammer)(uv_pipe_t*)) {
|
|||||||
uv_connect_t connect_req;
|
uv_connect_t connect_req;
|
||||||
|
|
||||||
spam = spammer;
|
spam = spammer;
|
||||||
ASSERT(0 == uv_pipe_init(uv_default_loop(), &client_handle, 0));
|
ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &client.handle, 0));
|
||||||
ASSERT(0 == uv_pipe_init(uv_default_loop(), &server_handle, 0));
|
ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &server_handle, 0));
|
||||||
ASSERT(0 == uv_pipe_bind(&server_handle, TEST_PIPENAME));
|
ASSERT_EQ(0, uv_pipe_bind(&server_handle, TEST_PIPENAME));
|
||||||
ASSERT(0 == uv_listen((uv_stream_t*) &server_handle, 1, connection_cb));
|
ASSERT_EQ(0, uv_listen((uv_stream_t*) &server_handle, 1, connection_cb));
|
||||||
uv_pipe_connect(&connect_req, &client_handle, TEST_PIPENAME, connect_cb);
|
uv_pipe_connect(&connect_req, &client.handle, TEST_PIPENAME, connect_cb);
|
||||||
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
|
ASSERT_EQ(0, uv_run(uv_default_loop(), UV_RUN_DEFAULT));
|
||||||
|
|
||||||
MAKE_VALGRIND_HAPPY();
|
MAKE_VALGRIND_HAPPY();
|
||||||
return 0;
|
return 0;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user