diff --git a/src/win/internal.h b/src/win/internal.h index 8d4081bd..783f21af 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -327,6 +327,7 @@ void uv__util_init(); uint64_t uv__hrtime(double scale); int uv_parent_pid(); +int uv_current_pid(); __declspec(noreturn) void uv_fatal_error(const int errorno, const char* syscall); diff --git a/src/win/pipe.c b/src/win/pipe.c index 796d06dc..d57d77a4 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -1246,6 +1246,10 @@ static int uv_pipe_write_impl(uv_loop_t* loop, if (send_handle) { tcp_send_handle = (uv_tcp_t*)send_handle; + if (handle->pipe.conn.ipc_pid == 0) { + handle->pipe.conn.ipc_pid = uv_current_pid(); + } + err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid, &ipc_frame.socket_info_ex.socket_info); if (err) { diff --git a/src/win/util.c b/src/win/util.c index a0d1307f..cb247513 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -59,6 +59,10 @@ static char *process_title; static CRITICAL_SECTION process_title_lock; +/* Cached copy of the process id, written once. */ +static DWORD current_pid = 0; + + /* Interval (in seconds) of the high-resolution clock. */ static double hrtime_interval_ = 0; @@ -359,6 +363,14 @@ int uv_parent_pid() { } +int uv_current_pid() { + if (current_pid == 0) { + current_pid = GetCurrentProcessId(); + } + return current_pid; +} + + char** uv_setup_args(int argc, char** argv) { return argv; } diff --git a/test/runner.c b/test/runner.c index e094defc..914a067f 100644 --- a/test/runner.c +++ b/test/runner.c @@ -210,6 +210,7 @@ int run_test(const char* test, #ifndef _WIN32 /* Clean up stale socket from previous run. */ remove(TEST_PIPENAME); + remove(TEST_PIPENAME_2); #endif /* If it's a helper the user asks for, start it directly. */ diff --git a/test/test-ipc-send-recv.c b/test/test-ipc-send-recv.c index d9b91333..d0558a94 100644 --- a/test/test-ipc-send-recv.c +++ b/test/test-ipc-send-recv.c @@ -30,6 +30,8 @@ void spawn_helper(uv_pipe_t* channel, uv_process_t* process, const char* helper); +void ipc_send_recv_helper_threadproc(void* arg); + union handles { uv_handle_t handle; uv_stream_t stream; @@ -38,15 +40,26 @@ union handles { uv_tty_t tty; }; -struct echo_ctx { +struct test_ctx { uv_pipe_t channel; + uv_connect_t connect_req; uv_write_t write_req; uv_handle_type expected_type; union handles send; union handles recv; }; -static struct echo_ctx ctx; +struct echo_ctx { + uv_pipe_t listen; + uv_pipe_t channel; + uv_write_t write_req; + uv_handle_type expected_type; + union handles recv; +}; + +static struct test_ctx ctx; +static struct echo_ctx ctx2; + static int num_recv_handles; @@ -92,13 +105,12 @@ static void recv_cb(uv_stream_t* handle, num_recv_handles++; } - -static int run_test(void) { - uv_process_t process; - uv_buf_t buf; +static void connect_cb(uv_connect_t* req, int status) { int r; + uv_buf_t buf; - spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper"); + ASSERT(req == &ctx.connect_req); + ASSERT(status == 0); buf = uv_buf_init(".", 1); r = uv_write2(&ctx.write_req, @@ -110,17 +122,43 @@ static int run_test(void) { r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb); ASSERT(r == 0); +} + +static int run_test(int inprocess) { + uv_process_t process; + uv_thread_t tid; + int r; + + if (inprocess) { + r = uv_thread_create(&tid, ipc_send_recv_helper_threadproc, (void *) 42); + ASSERT(r == 0); + + uv_sleep(1000); + + r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1); + ASSERT(r == 0); + + uv_pipe_connect(&ctx.connect_req, &ctx.channel, TEST_PIPENAME_2, connect_cb); + } else { + spawn_helper(&ctx.channel, &process, "ipc_send_recv_helper"); + + connect_cb(&ctx.connect_req, 0); + } r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); ASSERT(r == 0); ASSERT(num_recv_handles == 1); + if (inprocess) { + r = uv_thread_join(&tid); + ASSERT(r == 0); + } + return 0; } - -TEST_IMPL(ipc_send_recv_pipe) { +static int run_ipc_send_recv_pipe(int inprocess) { int r; ctx.expected_type = UV_NAMED_PIPE; @@ -131,15 +169,22 @@ TEST_IMPL(ipc_send_recv_pipe) { r = uv_pipe_bind(&ctx.send.pipe, TEST_PIPENAME); ASSERT(r == 0); - r = run_test(); + r = run_test(inprocess); ASSERT(r == 0); MAKE_VALGRIND_HAPPY(); return 0; } +TEST_IMPL(ipc_send_recv_pipe) { + return run_ipc_send_recv_pipe(0); +} -TEST_IMPL(ipc_send_recv_tcp) { +TEST_IMPL(ipc_send_recv_pipe_inprocess) { + return run_ipc_send_recv_pipe(1); +} + +static int run_ipc_send_recv_tcp(int inprocess) { struct sockaddr_in addr; int r; @@ -153,23 +198,31 @@ TEST_IMPL(ipc_send_recv_tcp) { r = uv_tcp_bind(&ctx.send.tcp, (const struct sockaddr*) &addr, 0); ASSERT(r == 0); - r = run_test(); + r = run_test(inprocess); ASSERT(r == 0); MAKE_VALGRIND_HAPPY(); return 0; } +TEST_IMPL(ipc_send_recv_tcp) { + return run_ipc_send_recv_tcp(0); +} -/* Everything here runs in a child process. */ +TEST_IMPL(ipc_send_recv_tcp_inprocess) { + return run_ipc_send_recv_tcp(1); +} + + +/* Everything here runs in a child process or second thread. */ static void write2_cb(uv_write_t* req, int status) { ASSERT(status == 0); - uv_close(&ctx.recv.handle, NULL); - uv_close((uv_handle_t*)&ctx.channel, NULL); + uv_close(&ctx2.recv.handle, NULL); + uv_close((uv_handle_t*)&ctx2.channel, NULL); + uv_close((uv_handle_t*)&ctx2.listen, NULL); } - static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* rdbuf) { @@ -178,8 +231,12 @@ static void read_cb(uv_stream_t* handle, uv_handle_type pending; int r; + if (nread == UV__EOF || nread == UV__ECONNABORTED) { + return; + } + pipe = (uv_pipe_t*) handle; - ASSERT(pipe == &ctx.channel); + ASSERT(pipe == &ctx2.channel); ASSERT(nread >= 0); ASSERT(1 == uv_pipe_pending_count(pipe)); @@ -189,25 +246,75 @@ static void read_cb(uv_stream_t* handle, wrbuf = uv_buf_init(".", 1); if (pending == UV_NAMED_PIPE) - r = uv_pipe_init(ctx.channel.loop, &ctx.recv.pipe, 0); + r = uv_pipe_init(ctx2.channel.loop, &ctx2.recv.pipe, 0); else if (pending == UV_TCP) - r = uv_tcp_init(ctx.channel.loop, &ctx.recv.tcp); + r = uv_tcp_init(ctx2.channel.loop, &ctx2.recv.tcp); else abort(); ASSERT(r == 0); - r = uv_accept(handle, &ctx.recv.stream); + r = uv_accept(handle, &ctx2.recv.stream); ASSERT(r == 0); - r = uv_write2(&ctx.write_req, - (uv_stream_t*)&ctx.channel, + r = uv_write2(&ctx2.write_req, + (uv_stream_t*)&ctx2.channel, &wrbuf, 1, - &ctx.recv.stream, + &ctx2.recv.stream, write2_cb); ASSERT(r == 0); } +static void send_recv_start() { + int r; + ASSERT(1 == uv_is_readable((uv_stream_t*)&ctx2.channel)); + ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx2.channel)); + ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx2.channel)); + + r = uv_read_start((uv_stream_t*)&ctx2.channel, alloc_cb, read_cb); + ASSERT(r == 0); +} + +static void listen_cb(uv_stream_t* handle, int status) { + int r; + ASSERT(handle == (uv_stream_t*)&ctx2.listen); + ASSERT(status == 0); + + r = uv_accept((uv_stream_t*)&ctx2.listen, (uv_stream_t*)&ctx2.channel); + ASSERT(r == 0); + + send_recv_start(); +} + +int run_ipc_send_recv_helper(uv_loop_t* loop, int inprocess) { + int r; + + memset(&ctx2, 0, sizeof(ctx2)); + + r = uv_pipe_init(loop, &ctx2.listen, 0); + ASSERT(r == 0); + + r = uv_pipe_init(loop, &ctx2.channel, 1); + ASSERT(r == 0); + + if (inprocess) { + r = uv_pipe_bind(&ctx2.listen, TEST_PIPENAME_2); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&ctx2.listen, SOMAXCONN, listen_cb); + ASSERT(r == 0); + } else { + r = uv_pipe_open(&ctx2.channel, 0); + ASSERT(r == 0); + + send_recv_start(); + } + + r = uv_run(loop, UV_RUN_DEFAULT); + ASSERT(r == 0); + + return 0; +} /* stdin is a duplex channel over which a handle is sent. * We receive it and send it back where it came from. @@ -215,22 +322,23 @@ static void read_cb(uv_stream_t* handle, int ipc_send_recv_helper(void) { int r; - memset(&ctx, 0, sizeof(ctx)); - - r = uv_pipe_init(uv_default_loop(), &ctx.channel, 1); - ASSERT(r == 0); - - uv_pipe_open(&ctx.channel, 0); - ASSERT(1 == uv_is_readable((uv_stream_t*)&ctx.channel)); - ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx.channel)); - ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx.channel)); - - r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb); - ASSERT(r == 0); - - r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); + r = run_ipc_send_recv_helper(uv_default_loop(), 0); ASSERT(r == 0); MAKE_VALGRIND_HAPPY(); return 0; } + +void ipc_send_recv_helper_threadproc(void* arg) { + int r; + uv_loop_t loop; + + r = uv_loop_init(&loop); + ASSERT(r == 0); + + r = run_ipc_send_recv_helper(&loop, 1); + ASSERT(r == 0); + + r = uv_loop_close(&loop); + ASSERT(r == 0); +} diff --git a/test/test-list.h b/test/test-list.h index 8ee74391..040f3c00 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -50,8 +50,10 @@ TEST_DECLARE (ipc_listen_before_write) TEST_DECLARE (ipc_listen_after_write) #ifndef _WIN32 TEST_DECLARE (ipc_send_recv_pipe) +TEST_DECLARE (ipc_send_recv_pipe_inprocess) #endif TEST_DECLARE (ipc_send_recv_tcp) +TEST_DECLARE (ipc_send_recv_tcp_inprocess) TEST_DECLARE (ipc_tcp_connection) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) @@ -380,8 +382,10 @@ TASK_LIST_START TEST_ENTRY (ipc_listen_after_write) #ifndef _WIN32 TEST_ENTRY (ipc_send_recv_pipe) + TEST_ENTRY (ipc_send_recv_pipe_inprocess) #endif TEST_ENTRY (ipc_send_recv_tcp) + TEST_ENTRY (ipc_send_recv_tcp_inprocess) TEST_ENTRY (ipc_tcp_connection) TEST_ENTRY (tcp_ping_pong)