From 6921d2fc075df8933748e7543402b0ae6d686fa3 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 26 Sep 2011 09:42:41 -0700 Subject: [PATCH 01/10] Add argument to uv_pipe_init for IPC, unix impl --- include/uv.h | 7 ++- src/unix/pipe.c | 4 +- src/unix/process.c | 73 +++++++++++++------------- src/win/pipe.c | 3 +- test/benchmark-pound.c | 2 +- test/benchmark-pump.c | 6 +-- test/benchmark-spawn.c | 2 +- test/echo-server.c | 4 +- test/run-tests.c | 33 ++++++++++++ test/test-ipc.c | 102 ++++++++++++++++++++++++++++++++++++ test/test-list.h | 2 + test/test-ping-pong.c | 2 +- test/test-pipe-bind-error.c | 10 ++-- test/test-spawn.c | 8 +-- 14 files changed, 203 insertions(+), 55 deletions(-) create mode 100644 test/test-ipc.c diff --git a/include/uv.h b/include/uv.h index 824b3c41..6151bd7a 100644 --- a/include/uv.h +++ b/include/uv.h @@ -648,9 +648,14 @@ struct uv_pipe_s { UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_PIPE_PRIVATE_FIELDS + int ipc; /* non-zero if this pipe is used for passing handles */ }; -int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle); +/* + * Initialize a pipe. The last argument is a boolean to indicate if + * this pipe will be used for handle passing between processes. + */ +int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle, int ipc); /* * Opens an existing file descriptor or HANDLE as a pipe. diff --git a/src/unix/pipe.c b/src/unix/pipe.c index 86c11dea..dabdcd6c 100644 --- a/src/unix/pipe.c +++ b/src/unix/pipe.c @@ -29,10 +29,12 @@ #include #include -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { + +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); loop->counters.pipe_init++; handle->pipe_fname = NULL; + handle->ipc = ipc; return 0; } diff --git a/src/unix/process.c b/src/unix/process.c index 487f2075..06af65d5 100644 --- a/src/unix/process.c +++ b/src/unix/process.c @@ -1,4 +1,3 @@ - /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -63,6 +62,34 @@ static void uv__chld(EV_P_ ev_child* watcher, int revents) { } } + +/* + * Used for initializing stdio streams like options.stdin_stream. Returns + * zero on success. + */ +static int uv__process_init_pipe(uv_pipe_t* handle, int fds[2]) { + if (handle->type != UV_NAMED_PIPE) { + errno = EINVAL; + return -1; + } + + if (handle->ipc) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return -1; + } + } else { + if (pipe(fds) < 0) { + return -1; + } + } + + uv__cloexec(fds[0], 1); + uv__cloexec(fds[1], 1); + + return 0; +} + + #ifndef SPAWN_WAIT_EXEC # define SPAWN_WAIT_EXEC 1 #endif @@ -89,43 +116,19 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->exit_cb = options.exit_cb; - if (options.stdin_stream) { - if (options.stdin_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdin_pipe) < 0) { - goto error; - } - uv__cloexec(stdin_pipe[0], 1); - uv__cloexec(stdin_pipe[1], 1); + if (options.stdin_stream && + uv__process_init_pipe(options.stdin_stream, stdin_pipe)) { + goto error; } - if (options.stdout_stream) { - if (options.stdout_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdout_pipe) < 0) { - goto error; - } - uv__cloexec(stdout_pipe[0], 1); - uv__cloexec(stdout_pipe[1], 1); + if (options.stdout_stream && + uv__process_init_pipe(options.stdout_stream, stdout_pipe)) { + goto error; } - if (options.stderr_stream) { - if (options.stderr_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stderr_pipe) < 0) { - goto error; - } - uv__cloexec(stderr_pipe[0], 1); - uv__cloexec(stderr_pipe[1], 1); + if (options.stderr_stream && + uv__process_init_pipe(options.stderr_stream, stderr_pipe)) { + goto error; } /* This pipe is used by the parent to wait until @@ -154,7 +157,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, goto error; } # else - if (pipe(signal_pipe) < 0) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, signal_pipe) < 0) { goto error; } uv__cloexec(signal_pipe[0], 1); diff --git a/src/win/pipe.c b/src/win/pipe.c index c997a1e2..4c855058 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -51,13 +51,14 @@ static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { } -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv_stream_init(loop, (uv_stream_t*)handle); handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; + handle->ipc = ipc; loop->counters.pipe_init++; diff --git a/test/benchmark-pound.c b/test/benchmark-pound.c index 1f56e27f..af7ce247 100644 --- a/test/benchmark-pound.c +++ b/test/benchmark-pound.c @@ -222,7 +222,7 @@ static void tcp_make_connect(conn_rec* p) { static void pipe_make_connect(conn_rec* p) { int r; - r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream); + r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream, 0); ASSERT(r == 0); r = uv_pipe_connect(&((pipe_conn_rec*)p)->conn_req, (uv_pipe_t*)&p->stream, TEST_PIPENAME, connect_cb); diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index d0b09301..27e8abe0 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -253,7 +253,7 @@ static void maybe_connect_some() { } else { pipe = &pipe_write_handles[max_connect_socket++]; - r = uv_pipe_init(loop, pipe); + r = uv_pipe_init(loop, pipe, 0); ASSERT(r == 0); req = (uv_connect_t*) req_alloc(); @@ -277,7 +277,7 @@ static void connection_cb(uv_stream_t* s, int status) { ASSERT(r == 0); } else { stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); } @@ -396,7 +396,7 @@ HELPER_IMPL(pipe_pump_server) { /* Server */ server = (uv_stream_t*)&pipeServer; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); ASSERT(r == 0); r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); ASSERT(r == 0); diff --git a/test/benchmark-spawn.c b/test/benchmark-spawn.c index 6e5493d5..d34f42b9 100644 --- a/test/benchmark-spawn.c +++ b/test/benchmark-spawn.c @@ -113,7 +113,7 @@ static void spawn() { options.args = args; options.exit_cb = exit_cb; - uv_pipe_init(loop, &out); + uv_pipe_init(loop, &out, 0); options.stdout_stream = &out; r = uv_spawn(loop, &process, options); diff --git a/test/echo-server.c b/test/echo-server.c index 453ada66..8b175441 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -151,7 +151,7 @@ static void on_connection(uv_stream_t* server, int status) { case PIPE: stream = malloc(sizeof(uv_pipe_t)); ASSERT(stream != NULL); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); break; @@ -248,7 +248,7 @@ static int pipe_echo_start(char* pipeName) { server = (uv_handle_t*)&pipeServer; serverType = PIPE; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); if (r) { fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error(loop))); diff --git a/test/run-tests.c b/test/run-tests.c index a1878691..a081081b 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -24,6 +24,7 @@ #include "runner.h" #include "task.h" +#include "uv.h" /* Actual tests and helpers are defined in test-list.h */ #include "test-list.h" @@ -48,12 +49,44 @@ int main(int argc, char **argv) { } +static int ipc_helper() { + /* + * This is launched from test-ipc.c. stdin is a duplex channel that we + * over which a handle will be transmitted. In this initial version only + * data is transfered over the channel. XXX edit this comment after handle + * transfer is added. + */ + uv_pipe_t channel; + uv_write_t write_req; + int r; + uv_buf_t buf; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + uv_pipe_open(&channel, 0); + + buf = uv_buf_init("hello\n", 6); + r = uv_write(&write_req, (uv_stream_t*)&channel, &buf, 1, NULL); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + return 0; +} + + static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); return 0; } + if (strcmp(argv[1], "ipc_helper") == 0) { + return ipc_helper(); + } + if (strcmp(argv[1], "spawn_helper1") == 0) { return 1; } diff --git a/test/test-ipc.c b/test/test-ipc.c new file mode 100644 index 00000000..26c84ff7 --- /dev/null +++ b/test/test-ipc.c @@ -0,0 +1,102 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +static char exepath[1024]; +static size_t exepath_size = 1024; +static char* args[3]; +static uv_pipe_t channel; + +static int exit_cb_called; + +static uv_write_t write_req; + + +static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { + printf("exit_cb\n"); + exit_cb_called++; + ASSERT(exit_status == 1); + ASSERT(term_signal == 0); + uv_close((uv_handle_t*)process, NULL); +} + + +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { + return uv_buf_init(malloc(suggested_size), suggested_size); +} + + +static void on_read(uv_stream_t* pipe, ssize_t nread, uv_buf_t buf) { + int r; + uv_buf_t outbuf; + /* listen on the handle provided.... */ + + if (nread) { + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + fprintf(stderr, "got %d bytes\n", (int)nread); + } + + if (buf.base) { + free(buf.base); + } +} + + +TEST_IMPL(ipc) { + int r; + uv_process_options_t options; + uv_process_t process; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + memset(&options, 0, sizeof(uv_process_options_t)); + + r = uv_exepath(exepath, &exepath_size); + ASSERT(r == 0); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = "ipc_helper"; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; + options.stdin_stream = &channel; + + r = uv_spawn(uv_default_loop(), &process, options); + ASSERT(r == 0); + + uv_read_start((uv_stream_t*)&channel, on_alloc, on_read); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(exit_cb_called == 1); + return 0; +} diff --git a/test/test-list.h b/test/test-list.h index f137493b..d9f06cf4 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -20,6 +20,7 @@ */ TEST_DECLARE (tty) +TEST_DECLARE (ipc) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) TEST_DECLARE (tcp_ref) @@ -110,6 +111,7 @@ HELPER_DECLARE (pipe_echo_server) TASK_LIST_START TEST_ENTRY (tty) + TEST_ENTRY (ipc) TEST_ENTRY (tcp_ref) diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index f452fce5..0e59166c 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -204,7 +204,7 @@ static void pipe_pinger_new() { pinger->pongs = 0; /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe); + r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0); pinger->stream.pipe.data = pinger; ASSERT(!r); diff --git a/test/test-pipe-bind-error.c b/test/test-pipe-bind-error.c index 832ce023..3443f19d 100644 --- a/test/test-pipe-bind-error.c +++ b/test/test-pipe-bind-error.c @@ -45,12 +45,12 @@ TEST_IMPL(pipe_bind_error_addrinuse) { uv_pipe_t server1, server2; int r; - r = uv_pipe_init(uv_default_loop(), &server1); + r = uv_pipe_init(uv_default_loop(), &server1, 0); ASSERT(r == 0); r = uv_pipe_bind(&server1, TEST_PIPENAME); ASSERT(r == 0); - r = uv_pipe_init(uv_default_loop(), &server2); + r = uv_pipe_init(uv_default_loop(), &server2, 0); ASSERT(r == 0); r = uv_pipe_bind(&server2, TEST_PIPENAME); ASSERT(r == -1); @@ -79,7 +79,7 @@ TEST_IMPL(pipe_bind_error_addrnotavail) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, BAD_PIPENAME); @@ -100,7 +100,7 @@ TEST_IMPL(pipe_bind_error_inval) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, TEST_PIPENAME); ASSERT(r == 0); @@ -123,7 +123,7 @@ TEST_IMPL(pipe_listen_without_bind) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); ASSERT(r == -1); diff --git a/test/test-spawn.c b/test/test-spawn.c index 653f9ac9..5bb6ed62 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -67,7 +67,7 @@ static void kill_cb(uv_process_t* process, int exit_status, int term_signal) { } -uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = output + output_used; buf.len = OUTPUT_SIZE - output_used; @@ -138,7 +138,7 @@ TEST_IMPL(spawn_stdout) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; r = uv_spawn(uv_default_loop(), &process, options); @@ -169,8 +169,8 @@ int r; init_process_options("spawn_helper3", exit_cb); - uv_pipe_init(uv_default_loop(), &out); - uv_pipe_init(uv_default_loop(), &in); + uv_pipe_init(uv_default_loop(), &out, 0); + uv_pipe_init(uv_default_loop(), &in, 0); options.stdout_stream = &out; options.stdin_stream = ∈ From dc0f17d3e3e02e3ca80a7fe127aa5d7c4c7374dc Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 29 Sep 2011 10:37:59 -0700 Subject: [PATCH 02/10] Add server to ipc_helper --- test/run-tests.c | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/test/run-tests.c b/test/run-tests.c index a081081b..4595d4df 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -49,6 +49,13 @@ int main(int argc, char **argv) { } +static uv_tcp_t server; + + +static void ipc_on_connection(uv_stream_t* server, int status) { +} + + static int ipc_helper() { /* * This is launched from test-ipc.c. stdin is a duplex channel that we @@ -66,6 +73,15 @@ static int ipc_helper() { uv_pipe_open(&channel, 0); + r = uv_tcp_init(uv_default_loop(), &server); + ASSERT(r == 0); + + r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&server, 12, ipc_on_connection); + ASSERT(r == 0); + buf = uv_buf_init("hello\n", 6); r = uv_write(&write_req, (uv_stream_t*)&channel, &buf, 1, NULL); ASSERT(r == 0); From e5e6efe317f39606c4d190ea32269ea125eb93a6 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 29 Sep 2011 10:43:11 -0700 Subject: [PATCH 03/10] Add uv_write2 and uv_read2_start to header file --- include/uv.h | 237 +++++++++++++++++++++++++++------------------------ 1 file changed, 127 insertions(+), 110 deletions(-) diff --git a/include/uv.h b/include/uv.h index 6151bd7a..63284e10 100644 --- a/include/uv.h +++ b/include/uv.h @@ -41,114 +41,6 @@ extern "C" { typedef intptr_t ssize_t; #endif -typedef struct uv_loop_s uv_loop_t; -typedef struct uv_ares_task_s uv_ares_task_t; -typedef struct uv_err_s uv_err_t; -typedef struct uv_handle_s uv_handle_t; -typedef struct uv_stream_s uv_stream_t; -typedef struct uv_tcp_s uv_tcp_t; -typedef struct uv_udp_s uv_udp_t; -typedef struct uv_pipe_s uv_pipe_t; -typedef struct uv_tty_s uv_tty_t; -typedef struct uv_timer_s uv_timer_t; -typedef struct uv_prepare_s uv_prepare_t; -typedef struct uv_check_s uv_check_t; -typedef struct uv_idle_s uv_idle_t; -typedef struct uv_async_s uv_async_t; -typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; -typedef struct uv_process_s uv_process_t; -typedef struct uv_counters_s uv_counters_t; -/* Request types */ -typedef struct uv_req_s uv_req_t; -typedef struct uv_shutdown_s uv_shutdown_t; -typedef struct uv_write_s uv_write_t; -typedef struct uv_connect_s uv_connect_t; -typedef struct uv_udp_send_s uv_udp_send_t; -typedef struct uv_fs_s uv_fs_t; -/* uv_fs_event_t is a subclass of uv_handle_t. */ -typedef struct uv_fs_event_s uv_fs_event_t; -typedef struct uv_work_s uv_work_t; - -#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) -# include "uv-private/uv-unix.h" -#else -# include "uv-private/uv-win.h" -#endif - - -/* - * This function must be called before any other functions in libuv. - * - * All functions besides uv_run() are non-blocking. - * - * All callbacks in libuv are made asynchronously. That is they are never - * made by the function that takes them as a parameter. - */ -uv_loop_t* uv_loop_new(); -void uv_loop_delete(uv_loop_t*); - - -/* - * Returns the default loop. - */ -uv_loop_t* uv_default_loop(); - -/* - * This function starts the event loop. It blocks until the reference count - * of the loop drops to zero. - */ -int uv_run(uv_loop_t*); - -/* - * Manually modify the event loop's reference count. Useful if the user wants - * to have a handle or timeout that doesn't keep the loop alive. - */ -void uv_ref(uv_loop_t*); -void uv_unref(uv_loop_t*); - -void uv_update_time(uv_loop_t*); -int64_t uv_now(uv_loop_t*); - - -/* - * The status parameter is 0 if the request completed successfully, - * and should be -1 if the request was cancelled or failed. - * For uv_close_cb, -1 means that the handle was closed due to an error. - * Error details can be obtained by calling uv_last_error(). - * - * In the case of uv_read_cb the uv_buf_t returned should be freed by the - * user. - */ -typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); -typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); -typedef void (*uv_write_cb)(uv_write_t* req, int status); -typedef void (*uv_connect_cb)(uv_connect_t* req, int status); -typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); -typedef void (*uv_connection_cb)(uv_stream_t* server, int status); -typedef void (*uv_close_cb)(uv_handle_t* handle); -typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); -/* TODO: do these really need a status argument? */ -typedef void (*uv_async_cb)(uv_async_t* handle, int status); -typedef void (*uv_prepare_cb)(uv_prepare_t* handle, int status); -typedef void (*uv_check_cb)(uv_check_t* handle, int status); -typedef void (*uv_idle_cb)(uv_idle_t* handle, int status); -typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* handle, int status, - struct addrinfo* res); -typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); -typedef void (*uv_fs_cb)(uv_fs_t* req); -typedef void (*uv_work_cb)(uv_work_t* req); -typedef void (*uv_after_work_cb)(uv_work_t* req); - -/* -* This will be called repeatedly after the uv_fs_event_t is initialized. -* If uv_fs_event_t was initialized with a directory the filename parameter -* will be a relative path to a file contained in the directory. -* The events paramenter is an ORed mask of enum uv_fs_event elements. -*/ -typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, - int events, int status); - - /* Expand this list if necessary. */ typedef enum { UV_UNKNOWN = -1, @@ -232,6 +124,122 @@ typedef enum { } uv_req_type; + +typedef struct uv_loop_s uv_loop_t; +typedef struct uv_ares_task_s uv_ares_task_t; +typedef struct uv_err_s uv_err_t; +typedef struct uv_handle_s uv_handle_t; +typedef struct uv_stream_s uv_stream_t; +typedef struct uv_tcp_s uv_tcp_t; +typedef struct uv_udp_s uv_udp_t; +typedef struct uv_pipe_s uv_pipe_t; +typedef struct uv_tty_s uv_tty_t; +typedef struct uv_timer_s uv_timer_t; +typedef struct uv_prepare_s uv_prepare_t; +typedef struct uv_check_s uv_check_t; +typedef struct uv_idle_s uv_idle_t; +typedef struct uv_async_s uv_async_t; +typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; +typedef struct uv_process_s uv_process_t; +typedef struct uv_counters_s uv_counters_t; +/* Request types */ +typedef struct uv_req_s uv_req_t; +typedef struct uv_shutdown_s uv_shutdown_t; +typedef struct uv_write_s uv_write_t; +typedef struct uv_connect_s uv_connect_t; +typedef struct uv_udp_send_s uv_udp_send_t; +typedef struct uv_fs_s uv_fs_t; +/* uv_fs_event_t is a subclass of uv_handle_t. */ +typedef struct uv_fs_event_s uv_fs_event_t; +typedef struct uv_work_s uv_work_t; + +#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) +# include "uv-private/uv-unix.h" +#else +# include "uv-private/uv-win.h" +#endif + + +/* + * This function must be called before any other functions in libuv. + * + * All functions besides uv_run() are non-blocking. + * + * All callbacks in libuv are made asynchronously. That is they are never + * made by the function that takes them as a parameter. + */ +uv_loop_t* uv_loop_new(); +void uv_loop_delete(uv_loop_t*); + + +/* + * Returns the default loop. + */ +uv_loop_t* uv_default_loop(); + +/* + * This function starts the event loop. It blocks until the reference count + * of the loop drops to zero. + */ +int uv_run(uv_loop_t*); + +/* + * Manually modify the event loop's reference count. Useful if the user wants + * to have a handle or timeout that doesn't keep the loop alive. + */ +void uv_ref(uv_loop_t*); +void uv_unref(uv_loop_t*); + +void uv_update_time(uv_loop_t*); +int64_t uv_now(uv_loop_t*); + + +/* + * The status parameter is 0 if the request completed successfully, + * and should be -1 if the request was cancelled or failed. + * For uv_close_cb, -1 means that the handle was closed due to an error. + * Error details can be obtained by calling uv_last_error(). + * + * In the case of uv_read_cb the uv_buf_t returned should be freed by the + * user. + */ +typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); +typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); +/* + * Just like the uv_read_cb except that if the pending parameter is true + * then you can use uv_accept() to pull the new handle into the process. + * If no handle is pending then pending will be UV_UNKNOWN_HANDLE. + */ +typedef void (*uv_read2_cb)(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending); +typedef void (*uv_write_cb)(uv_write_t* req, int status); +typedef void (*uv_connect_cb)(uv_connect_t* req, int status); +typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); +typedef void (*uv_connection_cb)(uv_stream_t* server, int status); +typedef void (*uv_close_cb)(uv_handle_t* handle); +typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); +/* TODO: do these really need a status argument? */ +typedef void (*uv_async_cb)(uv_async_t* handle, int status); +typedef void (*uv_prepare_cb)(uv_prepare_t* handle, int status); +typedef void (*uv_check_cb)(uv_check_t* handle, int status); +typedef void (*uv_idle_cb)(uv_idle_t* handle, int status); +typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* handle, int status, + struct addrinfo* res); +typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); +typedef void (*uv_fs_cb)(uv_fs_t* req); +typedef void (*uv_work_cb)(uv_work_t* req); +typedef void (*uv_after_work_cb)(uv_work_t* req); + +/* +* This will be called repeatedly after the uv_fs_event_t is initialized. +* If uv_fs_event_t was initialized with a directory the filename parameter +* will be a relative path to a file contained in the directory. +* The events paramenter is an ORed mask of enum uv_fs_event elements. +*/ +typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, + int events, int status); + + struct uv_err_s { /* read-only */ uv_err_code code; @@ -338,8 +346,8 @@ uv_buf_t uv_buf_init(char* base, size_t len); * * uv_stream is an abstract class. * - * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t - * and soon uv_file_t. + * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t, and + * soon uv_file_t. */ struct uv_stream_s { UV_HANDLE_FIELDS @@ -375,6 +383,12 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); +/* + * Extended read methods for receiving handles over a pipe. The pipe must be + * initialized with ipc == 1. + */ +int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb); + typedef enum { UV_STDIN = 0, UV_STDOUT, @@ -404,6 +418,9 @@ uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb); + /* uv_write_t is a subclass of uv_req_t */ struct uv_write_s { UV_REQ_FIELDS From 45306f2e7fef7bd37606e3059472a88b85100c4b Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Thu, 29 Sep 2011 11:49:56 -0700 Subject: [PATCH 04/10] unix: implement uv_write2 --- include/uv.h | 1 + src/unix/stream.c | 63 ++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 11 deletions(-) diff --git a/include/uv.h b/include/uv.h index 63284e10..747f3851 100644 --- a/include/uv.h +++ b/include/uv.h @@ -426,6 +426,7 @@ struct uv_write_s { UV_REQ_FIELDS uv_write_cb cb; uv_stream_t* handle; + uv_stream_t* send_handle; UV_WRITE_PRIVATE_FIELDS }; diff --git a/src/unix/stream.c b/src/unix/stream.c index f7c0a684..2cd98867 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -349,14 +349,41 @@ static void uv__write(uv_stream_t* stream) { * inside the iov each time we write. So there is no need to offset it. */ - do { - if (iovcnt == 1) { - n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); - } else { - n = writev(stream->fd, iov, iovcnt); + if (req->send_handle) { + struct msghdr msg; + char scratch[64]; + struct cmsghdr *cmsg; + int fd_to_send = req->send_handle->fd; + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_flags = 0; + + msg.msg_control = (void*) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + + do { + n = sendmsg(stream->fd, &msg, 0); } + while (n == -1 && errno == EINTR); + } else { + do { + if (iovcnt == 1) { + n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); + } else { + n = writev(stream->fd, iov, iovcnt); + } + } + while (n == -1 && errno == EINTR); } - while (n == -1 && errno == EINTR); if (n < 0) { if (errno != EAGAIN) { @@ -672,11 +699,8 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, } -/* The buffers to be written must remain valid until the callback is called. - * This is not required for the uv_buf_t array. - */ -int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, - uv_write_cb cb) { +int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { int empty_queue; assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || @@ -688,6 +712,13 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, return -1; } + if (send_handle) { + if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) { + uv__set_sys_error(stream->loop, EOPNOTSUPP); + return -1; + } + } + empty_queue = (stream->write_queue_size == 0); /* Initialize the req */ @@ -695,6 +726,7 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, req->cb = cb; req->handle = stream; req->error = 0; + req->send_handle = send_handle; req->type = UV_WRITE; ngx_queue_init(&req->queue); @@ -737,6 +769,15 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } +/* The buffers to be written must remain valid until the callback is called. + * This is not required for the uv_buf_t array. + */ +int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + return uv_write2(req, stream, bufs, bufcnt, NULL, cb); +} + + int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); From bb6b629e6a1d46ed77196cfc9802bdea7d524119 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 30 Sep 2011 09:11:51 -0700 Subject: [PATCH 05/10] make test-ipc accept the pending tcp server --- include/uv-private/uv-unix.h | 2 -- include/uv-private/uv-win.h | 2 -- include/uv.h | 5 +++- src/unix/stream.c | 16 +++++++++++- test/run-tests.c | 3 ++- test/test-ipc.c | 50 +++++++++++++++++++++++++++--------- test/test-spawn.c | 2 +- uv.gyp | 1 + 8 files changed, 61 insertions(+), 20 deletions(-) diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 1b6d86df..72709c8a 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -91,8 +91,6 @@ typedef int uv_file; #define UV_STREAM_PRIVATE_FIELDS \ - uv_read_cb read_cb; \ - uv_alloc_cb alloc_cb; \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 6610e016..656e85df 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -132,8 +132,6 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_alloc_cb alloc_cb; \ - uv_read_cb read_cb; \ uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ diff --git a/include/uv.h b/include/uv.h index 747f3851..1358e4b9 100644 --- a/include/uv.h +++ b/include/uv.h @@ -338,6 +338,9 @@ uv_buf_t uv_buf_init(char* base, size_t len); #define UV_STREAM_FIELDS \ /* number of bytes queued for writing */ \ size_t write_queue_size; \ + uv_alloc_cb alloc_cb; \ + uv_read_cb read_cb; \ + uv_read2_cb read2_cb; \ /* private */ \ UV_STREAM_PRIVATE_FIELDS @@ -425,8 +428,8 @@ int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, struct uv_write_s { UV_REQ_FIELDS uv_write_cb cb; - uv_stream_t* handle; uv_stream_t* send_handle; + uv_stream_t* handle; UV_WRITE_PRIVATE_FIELDS }; diff --git a/src/unix/stream.c b/src/unix/stream.c index 2cd98867..a2580888 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -355,6 +355,8 @@ static void uv__write(uv_stream_t* stream) { struct cmsghdr *cmsg; int fd_to_send = req->send_handle->fd; + assert(fd_to_send >= 0); + msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; @@ -778,7 +780,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -810,6 +813,16 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) } +int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + int r; + r = uv_read_start(stream, alloc_cb, NULL); + assert(stream->read_cb == NULL); + stream->read2_cb = read_cb; + return r; +} + + int uv_read_stop(uv_stream_t* stream) { uv_tcp_t* tcp = (uv_tcp_t*)stream; @@ -817,6 +830,7 @@ int uv_read_stop(uv_stream_t* stream) { ev_io_stop(tcp->loop->ev, &tcp->read_watcher); tcp->read_cb = NULL; + tcp->read2_cb = NULL; tcp->alloc_cb = NULL; return 0; } diff --git a/test/run-tests.c b/test/run-tests.c index 4595d4df..f80dfbcb 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -83,7 +83,8 @@ static int ipc_helper() { ASSERT(r == 0); buf = uv_buf_init("hello\n", 6); - r = uv_write(&write_req, (uv_stream_t*)&channel, &buf, 1, NULL); + r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)&server, NULL); ASSERT(r == 0); r = uv_run(uv_default_loop()); diff --git a/test/test-ipc.c b/test/test-ipc.c index 26c84ff7..c16f9ce7 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -29,11 +29,18 @@ static char exepath[1024]; static size_t exepath_size = 1024; static char* args[3]; static uv_pipe_t channel; +static uv_tcp_t tcp_server; static int exit_cb_called; +static int read2_cb_called; static uv_write_t write_req; +static void ipc_on_connection(uv_stream_t* server, int status) { + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); +} + static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { printf("exit_cb\n"); @@ -49,22 +56,40 @@ static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { } -static void on_read(uv_stream_t* pipe, ssize_t nread, uv_buf_t buf) { +static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { int r; uv_buf_t outbuf; - /* listen on the handle provided.... */ - if (nread) { - outbuf = uv_buf_init("world\n", 6); - r = uv_write(&write_req, pipe, &outbuf, 1, NULL); - ASSERT(r == 0); - - fprintf(stderr, "got %d bytes\n", (int)nread); - } - - if (buf.base) { + if (nread == 0) { + /* Everything OK, but nothing read. */ free(buf.base); + return; } + + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, buf.len) == 0); + fprintf(stderr, "got %d bytes\n", (int)nread); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + free(buf.base); } @@ -92,11 +117,12 @@ TEST_IMPL(ipc) { r = uv_spawn(uv_default_loop(), &process, options); ASSERT(r == 0); - uv_read_start((uv_stream_t*)&channel, on_alloc, on_read); + uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read); r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(read2_cb_called == 1); ASSERT(exit_cb_called == 1); return 0; } diff --git a/test/test-spawn.c b/test/test-spawn.c index 5bb6ed62..238e6c9c 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -229,7 +229,7 @@ TEST_IMPL(spawn_detect_pipe_name_collisions_on_windows) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; /* Create a pipe that'll cause a collision. */ diff --git a/uv.gyp b/uv.gyp index e85927aa..0db92a93 100644 --- a/uv.gyp +++ b/uv.gyp @@ -260,6 +260,7 @@ 'test/test-getsockname.c', 'test/test-hrtime.c', 'test/test-idle.c', + 'test/test-ipc.c', 'test/test-list.h', 'test/test-loop-handles.c', 'test/test-pass-always.c', From c920db9fd15bba9a2b652bc8b0b8364090c915b3 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Mon, 3 Oct 2011 14:49:21 -0700 Subject: [PATCH 06/10] unix: handle passing kind of working --- src/unix/stream.c | 108 +++++++++++++++++++++++++++++++++++++++------- test/test-ipc.c | 17 ++++++-- 2 files changed, 107 insertions(+), 18 deletions(-) diff --git a/src/unix/stream.c b/src/unix/stream.c index a2580888..83e2ddbe 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -29,6 +29,8 @@ #include #include +#include + static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); @@ -476,12 +478,17 @@ static void uv__write_callbacks(uv_stream_t* stream) { static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; + struct msghdr msg; + struct cmsghdr* cmsg; + char cmsg_space[64]; + int received_fd = -1; struct ev_loop* ev = stream->loop->ev; /* XXX: Maybe instead of having UV_READING we just test if * tcp->read_cb is NULL or not? */ - while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { + while ((stream->read_cb || stream->read2_cb) && + stream->flags & UV_READING) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); @@ -489,10 +496,29 @@ static void uv__read(uv_stream_t* stream) { assert(buf.base); assert(stream->fd >= 0); - do { - nread = read(stream->fd, buf.base, buf.len); + if (stream->read_cb) { + do { + nread = read(stream->fd, buf.base, buf.len); + } + while (nread < 0 && errno == EINTR); + } else { + assert(stream->read2_cb); + /* read2_cb uses recvmsg */ + msg.msg_flags = 0; + msg.msg_iov = (struct iovec*) &buf; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + /* Set up to receive a descriptor even if one isn't in the message */ + msg.msg_controllen = 64; + msg.msg_control = (void *) cmsg_space; + + do { + nread = recvmsg(stream->fd, &msg, 0); + } + while (nread < 0 && errno == EINTR); } - while (nread < 0 && errno == EINTR); + if (nread < 0) { /* Error */ @@ -502,24 +528,73 @@ static void uv__read(uv_stream_t* stream) { ev_io_start(ev, &stream->read_watcher); } uv__set_sys_error(stream->loop, EAGAIN); - stream->read_cb(stream, 0, buf); + + if (stream->read_cb) { + stream->read_cb(stream, 0, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE); + } + return; } else { /* Error. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } + assert(!ev_is_active(&stream->read_watcher)); return; } + } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); ev_io_stop(ev, &stream->read_watcher); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } return; } else { /* Successful read */ - stream->read_cb(stream, nread, buf); + + if (stream->read_cb) { + stream->read_cb(stream, nread, buf); + } else { + assert(stream->read2_cb); + + /* + * XXX: Some implementations can send multiple file descriptors in a + * single message. We should be using CMSG_NXTHDR() to walk the + * chain to get at them all. This would require changing the API to + * hand these back up the caller, is a pain. + */ + + for (cmsg = CMSG_FIRSTHDR(&msg); + msg.msg_controllen > 0 && cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + + if (cmsg->cmsg_type == SCM_RIGHTS) { + if (stream->accepted_fd != -1) { + fprintf(stderr, "(libuv) ignoring extra FD received\n"); + } + + stream->accepted_fd = *(int *) CMSG_DATA(cmsg); + + } else { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + } + } + + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + } } } } @@ -780,8 +855,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -803,6 +878,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, assert(alloc_cb); stream->read_cb = read_cb; + stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; /* These should have been set by uv_tcp_init. */ @@ -813,13 +889,15 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, } +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, read_cb, NULL); +} + + int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read2_cb read_cb) { - int r; - r = uv_read_start(stream, alloc_cb, NULL); - assert(stream->read_cb == NULL); - stream->read2_cb = read_cb; - return r; + return uv__read_start_common(stream, alloc_cb, NULL, read_cb); } diff --git a/test/test-ipc.c b/test/test-ipc.c index c16f9ce7..ed263c10 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -45,8 +45,7 @@ static void ipc_on_connection(uv_stream_t* server, int status) { static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { printf("exit_cb\n"); exit_cb_called++; - ASSERT(exit_status == 1); - ASSERT(term_signal == 0); + ASSERT(exit_status == 0); uv_close((uv_handle_t*)process, NULL); } @@ -60,6 +59,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { int r; uv_buf_t outbuf; + uv_err_t err; if (nread == 0) { /* Everything OK, but nothing read. */ @@ -67,6 +67,17 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, return; } + if (nread < 0) { + err = uv_last_error(pipe->loop); + if (err.code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on channel: %s\n", uv_strerror(err)); + abort(); + } + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); read2_cb_called++; @@ -82,7 +93,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, ASSERT(r == 0); /* Make sure that the expected data is correctly multiplexed. */ - ASSERT(memcmp("hello\n", buf.base, buf.len) == 0); + ASSERT(memcmp("hello\n", buf.base, nread) == 0); fprintf(stderr, "got %d bytes\n", (int)nread); outbuf = uv_buf_init("world\n", 6); From 61fab8d1ba749e02e41350c1e1acb7af1c5e6a1c Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 4 Oct 2011 14:44:16 -0700 Subject: [PATCH 07/10] unix: return UV_UNKNOWN_HANDLE when read2 doesn't recv one unix passes ipc test on this comment. --- src/unix/stream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/unix/stream.c b/src/unix/stream.c index 83e2ddbe..02681582 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -593,7 +593,12 @@ static void uv__read(uv_stream_t* stream) { } } - stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + + if (stream->accepted_fd >= 0) { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + } else { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE); + } } } } From 81c4043c83f07dc8c365c94d8330c96c5e313d22 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Thu, 29 Sep 2011 17:58:58 -0700 Subject: [PATCH 08/10] ipc on windows --- include/uv-private/uv-win.h | 23 ++- include/uv.h | 19 +- src/unix/core.c | 7 - src/win/internal.h | 63 +++--- src/win/pipe.c | 387 ++++++++++++++++++++++++++++-------- src/win/process.c | 44 ++-- src/win/stream.c | 31 ++- src/win/tcp.c | 170 +++++++++++++--- src/win/util.c | 27 +++ src/win/winsock.c | 78 ++------ src/win/winsock.h | 18 +- test/run-tests.c | 68 ++++++- test/test-ipc.c | 124 ++++++++++-- uv.gyp | 1 - 14 files changed, 766 insertions(+), 294 deletions(-) diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 656e85df..1be477e7 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -43,6 +43,11 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; +typedef struct uv_duplicate_socket_info_s { + WSAPROTOCOL_INFOW socket_info; + struct uv_duplicate_socket_info_s* next; +} uv_duplicate_socket_info_t; + typedef int uv_file; RB_HEAD(uv_timer_tree_s, uv_timer_s); @@ -120,6 +125,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); UV_REQ_FIELDS \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + HANDLE event_handle; \ + HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ } uv_tcp_accept_t; @@ -140,10 +147,12 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define uv_tcp_server_fields \ uv_tcp_accept_t* accept_reqs; \ - uv_tcp_accept_t* pending_accepts; + uv_tcp_accept_t* pending_accepts; \ + LPFN_ACCEPTEX func_acceptex; #define uv_tcp_connection_fields \ - uv_buf_t read_buffer; + uv_buf_t read_buffer; \ + LPFN_CONNECTEX func_connectex; #define UV_TCP_PRIVATE_FIELDS \ SOCKET socket; \ @@ -164,11 +173,15 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_alloc_cb alloc_cb; #define uv_pipe_server_fields \ - uv_pipe_accept_t accept_reqs[4]; \ - uv_pipe_accept_t* pending_accepts; + uv_pipe_accept_t accept_reqs[4]; \ + uv_pipe_accept_t* pending_accepts; #define uv_pipe_connection_fields \ - uv_timer_t* eof_timer; + uv_timer_t* eof_timer; \ + uv_write_t ipc_header_write_req; \ + int ipc_pid; \ + uint64_t remaining_ipc_rawdata_bytes; \ + uv_duplicate_socket_info_t* pending_ipc_sockets; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/include/uv.h b/include/uv.h index 1358e4b9..8ea7ab0e 100644 --- a/include/uv.h +++ b/include/uv.h @@ -41,6 +41,12 @@ extern "C" { typedef intptr_t ssize_t; #endif +#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) +# include "uv-private/uv-unix.h" +#else +# include "uv-private/uv-win.h" +#endif + /* Expand this list if necessary. */ typedef enum { UV_UNKNOWN = -1, @@ -153,12 +159,6 @@ typedef struct uv_fs_s uv_fs_t; typedef struct uv_fs_event_s uv_fs_event_t; typedef struct uv_work_s uv_work_t; -#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) -# include "uv-private/uv-unix.h" -#else -# include "uv-private/uv-win.h" -#endif - /* * This function must be called before any other functions in libuv. @@ -392,13 +392,6 @@ int uv_read_stop(uv_stream_t*); */ int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb); -typedef enum { - UV_STDIN = 0, - UV_STDOUT, - UV_STDERR -} uv_std_type; - -uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); /* * Write data to stream. Buffers are written in order. Example: diff --git a/src/unix/core.c b/src/unix/core.c index 719327a9..c834aaae 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -790,10 +790,3 @@ size_t uv__strlcpy(char* dst, const char* src, size_t size) { return src - org; } - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - assert(0 && "implement me"); - return NULL; -} - diff --git a/src/win/internal.h b/src/win/internal.h index f8762145..e61aaefe 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -44,27 +44,30 @@ void uv_process_timers(uv_loop_t* loop); */ /* Private uv_handle flags */ -#define UV_HANDLE_CLOSING 0x0001 -#define UV_HANDLE_CLOSED 0x0002 -#define UV_HANDLE_BOUND 0x0004 -#define UV_HANDLE_LISTENING 0x0008 -#define UV_HANDLE_CONNECTION 0x0010 -#define UV_HANDLE_CONNECTED 0x0020 -#define UV_HANDLE_READING 0x0040 -#define UV_HANDLE_ACTIVE 0x0040 -#define UV_HANDLE_EOF 0x0080 -#define UV_HANDLE_SHUTTING 0x0100 -#define UV_HANDLE_SHUT 0x0200 -#define UV_HANDLE_ENDGAME_QUEUED 0x0400 -#define UV_HANDLE_BIND_ERROR 0x1000 -#define UV_HANDLE_IPV6 0x2000 -#define UV_HANDLE_PIPESERVER 0x4000 -#define UV_HANDLE_READ_PENDING 0x8000 -#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000 -#define UV_HANDLE_UV_ALLOCED 0x20000 -#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000 -#define UV_HANDLE_ZERO_READ 0x80000 -#define UV_HANDLE_TTY_RAW 0x100000 +#define UV_HANDLE_CLOSING 0x0001 +#define UV_HANDLE_CLOSED 0x0002 +#define UV_HANDLE_BOUND 0x0004 +#define UV_HANDLE_LISTENING 0x0008 +#define UV_HANDLE_CONNECTION 0x0010 +#define UV_HANDLE_CONNECTED 0x0020 +#define UV_HANDLE_READING 0x0040 +#define UV_HANDLE_ACTIVE 0x0040 +#define UV_HANDLE_EOF 0x0080 +#define UV_HANDLE_SHUTTING 0x0100 +#define UV_HANDLE_SHUT 0x0200 +#define UV_HANDLE_ENDGAME_QUEUED 0x0400 +#define UV_HANDLE_BIND_ERROR 0x1000 +#define UV_HANDLE_IPV6 0x2000 +#define UV_HANDLE_PIPESERVER 0x4000 +#define UV_HANDLE_READ_PENDING 0x8000 +#define UV_HANDLE_UV_ALLOCED 0x10000 +#define UV_HANDLE_SYNC_BYPASS_IOCP 0x20000 +#define UV_HANDLE_ZERO_READ 0x40000 +#define UV_HANDLE_TTY_RAW 0x80000 +#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000 +#define UV_HANDLE_EMULATE_IOCP 0x200000 +#define UV_HANDLE_DUPLICATED_SOCKET 0x400000 +#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -97,8 +100,8 @@ uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped); void uv_insert_pending_req(uv_loop_t* loop, uv_req_t* req); void uv_process_reqs(uv_loop_t* loop); -#define POST_COMPLETION_FOR_REQ(loop, req) \ - if (!PostQueuedCompletionStatus((loop)->iocp, \ +#define POST_COMPLETION_FOR_REQ(loop, req) \ + if (!PostQueuedCompletionStatus((loop)->iocp, \ 0, \ 0, \ &((req)->overlapped))) { \ @@ -135,6 +138,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); + /* * UDP @@ -149,19 +154,21 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle); /* * Pipes */ -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle); int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, char* name, size_t nameSize); void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client); +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); @@ -267,6 +274,10 @@ void uv_fs_event_close(uv_loop_t* loop, uv_fs_event_t* handle); void uv_fs_event_endgame(uv_loop_t* loop, uv_fs_event_t* handle); +/* Utils */ +int uv_parent_pid(); + + /* * Error handling */ diff --git a/src/win/pipe.c b/src/win/pipe.c index 4c855058..6473468f 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -20,6 +20,7 @@ */ #include +#include #include #include @@ -38,6 +39,22 @@ static const uv_buf_t uv_null_buf_ = { 0, NULL }; /* when the local ends wants to shut it down. */ static const int64_t eof_timeout = 50; /* ms */ +/* IPC protocol flags. */ +#define UV_IPC_RAW_DATA 0x0001 +#define UV_IPC_UV_STREAM 0x0002 + +/* IPC frame header. */ +typedef struct { + int flags; + uint64_t raw_data_length; +} uv_ipc_frame_header_t; + +/* IPC frame, which contains an imported TCP socket stream. */ +typedef struct { + uv_ipc_frame_header_t header; + WSAPROTOCOL_INFOW socket_info; +} uv_ipc_frame_uv_stream; + static void eof_timer_init(uv_pipe_t* pipe); static void eof_timer_start(uv_pipe_t* pipe); static void eof_timer_stop(uv_pipe_t* pipe); @@ -58,7 +75,13 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; - handle->ipc = ipc; + handle->ipc_pid = 0; + handle->remaining_ipc_rawdata_bytes = 0; + handle->pending_ipc_sockets = NULL; + + if (ipc) { + handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL; + } loop->counters.pipe_init++; @@ -66,24 +89,6 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { } -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle) { - int err = uv_pipe_init(loop, handle); - - if (!err) { - /* - * At this point we don't know whether the pipe will be used as a client - * or a server. So, we assume that it will be a client until - * uv_listen is called. - */ - handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; - } - - return err; -} - - static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->eof_timer = NULL; @@ -132,7 +137,6 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, uv_pipe_connection_init(handle); handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; err = 0; done: @@ -192,7 +196,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - + uv_duplicate_socket_info_t* socket_info, *next_socket_info; if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && @@ -251,6 +255,15 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (handle->flags & UV_HANDLE_CONNECTION) { + next_socket_info = handle->pending_ipc_sockets; + while (next_socket_info) { + socket_info = next_socket_info; + next_socket_info = next_socket_info->next; + free(socket_info); + } + } + /* Remember the state of this flag because the close callback is */ /* allowed to clobber or free the handle's memory */ uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED; @@ -568,30 +581,50 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, } -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { + int r; uv_loop_t* loop = server->loop; - /* Find a connection instance that has been connected, but not yet */ - /* accepted. */ - uv_pipe_accept_t* req = server->pending_accepts; + uv_pipe_t* pipe_client; + uv_pipe_accept_t* req; + uv_duplicate_socket_info_t* pending_socket; - if (!req) { - /* No valid connections found, so we error out. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; - } + if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + pending_socket = server->pending_ipc_sockets; + if (!pending_socket) { + /* No valid pending sockets. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } - /* Initialize the client handle and copy the pipeHandle to the client */ - uv_pipe_connection_init(client); - client->handle = req->pipeHandle; + server->pending_ipc_sockets = pending_socket->next; + r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info); + free(pending_socket); + return r; + } else { + pipe_client = (uv_pipe_t*)client; - /* Prepare the req to pick up a new connection */ - server->pending_accepts = req->next_pending; - req->next_pending = NULL; - req->pipeHandle = INVALID_HANDLE_VALUE; + /* Find a connection instance that has been connected, but not yet */ + /* accepted. */ + req = server->pending_accepts; - if (!(server->flags & UV_HANDLE_CLOSING) && - !(server->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { - uv_pipe_queue_accept(loop, server, req, FALSE); + if (!req) { + /* No valid connections found, so we error out. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } + + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_connection_init(pipe_client); + pipe_client->handle = req->pipeHandle; + + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; + + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(loop, server, req, FALSE); + } } return 0; @@ -603,11 +636,8 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i, errno; - uv_pipe_accept_t* req; - HANDLE pipeHandle; - if (!(handle->flags & UV_HANDLE_BOUND) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { uv__set_artificial_error(loop, UV_EINVAL); return -1; } @@ -618,8 +648,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { return -1; } - if (!(handle->flags & UV_HANDLE_PIPESERVER) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_PIPESERVER)) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -627,30 +656,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - if (handle->flags & UV_HANDLE_GIVEN_OS_HANDLE) { - handle->flags |= UV_HANDLE_PIPESERVER; - pipeHandle = handle->handle; - assert(pipeHandle != INVALID_HANDLE_VALUE); - req = &handle->accept_reqs[0]; - uv_req_init(loop, (uv_req_t*) req); - req->pipeHandle = pipeHandle; - req->type = UV_ACCEPT; - req->data = handle; - req->next_pending = NULL; + /* First pipe handle should have already been created in uv_pipe_bind */ + assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - if (uv_set_pipe_handle(loop, handle, pipeHandle)) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - - uv_pipe_queue_accept(loop, handle, req, TRUE); - } else { - /* First pipe handle should have already been created in uv_pipe_bind */ - assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); - } + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); } return 0; @@ -694,8 +704,8 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { } -int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { uv_loop_t* loop = handle->loop; if (!(handle->flags & UV_HANDLE_CONNECTION)) { @@ -715,9 +725,10 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, handle->flags |= UV_HANDLE_READING; handle->read_cb = read_cb; + handle->read2_cb = read2_cb; handle->alloc_cb = alloc_cb; - /* If reading was stopped and then started again, there could stell be a */ + /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(loop, handle); @@ -726,11 +737,33 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } -int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { - int result; +int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); +} - if (bufcnt != 1) { + +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); +} + + +static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, + uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + int result; + uv_tcp_t* tcp_send_handle; + uv_req_t* ipc_header_req; + DWORD written; + uv_ipc_frame_uv_stream ipc_frame; + + if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { + uv__set_artificial_error(loop, UV_ENOTSUP); + return -1; + } + + if (send_handle && send_handle->type != UV_TCP) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -753,6 +786,73 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol. */ + if (send_handle) { + tcp_send_handle = (uv_tcp_t*)send_handle; + if (WSADuplicateSocketW(tcp_send_handle->socket, handle->ipc_pid, + &ipc_frame.socket_info)) { + uv__set_sys_error(loop, WSAGetLastError()); + return -1; + } + ipc_frame.header.flags |= UV_IPC_UV_STREAM; + } + + if (bufcnt == 1) { + ipc_frame.header.flags |= UV_IPC_RAW_DATA; + ipc_frame.header.raw_data_length = bufs[0].len; + } + + /* + * Use the provided req if we're only doing a single write. + * If we're doing multiple writes, use ipc_header_write_req to do + * the first write, and then use the provided req for the second write. + */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + ipc_header_req = (uv_req_t*)req; + } else { + ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req; + /* Initialize the req if needed. */ + if (handle->ipc_header_write_req.type != UV_WRITE) { + uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); + handle->ipc_header_write_req.type = UV_WRITE; + handle->ipc_header_write_req.handle = (uv_stream_t*) handle; + handle->ipc_header_write_req.cb = NULL; + } + } + + /* Write the header or the whole frame. */ + memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + + result = WriteFile(handle->handle, + &ipc_frame, + ipc_frame.header.flags & UV_IPC_UV_STREAM ? + sizeof(ipc_frame) : sizeof(ipc_frame.header), + &written, + &ipc_header_req->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = written; + handle->write_queue_size += req->queued_bytes; + } + + handle->reqs_pending++; + handle->write_reqs_pending++; + + /* If we don't have any raw data to write - we're done. */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + return 0; + } + } + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, @@ -780,6 +880,23 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, } +int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb); +} + + +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { + if (!(handle->flags & UV_HANDLE_USE_IPC_PROTOCOL)) { + uv__set_artificial_error(loop, UV_EINVAL); + return -1; + } + + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); +} + + static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, */ @@ -790,7 +907,11 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_read_stop((uv_stream_t*) handle); uv__set_artificial_error(loop, UV_EOF); - handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + } } @@ -803,7 +924,11 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_read_stop((uv_stream_t*) handle); uv__set_sys_error(loop, error); - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*)handle, -1, buf); + } } @@ -821,6 +946,8 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, avail; uv_buf_t buf; + uv_ipc_frame_uv_stream ipc_frame; + uv_duplicate_socket_info_t* pending_ipc_socket; assert(handle->type == UV_NAMED_PIPE); @@ -839,11 +966,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, /* Do non-blocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { if (!PeekNamedPipe(handle->handle, - NULL, - 0, - NULL, - &avail, - NULL)) { + NULL, + 0, + NULL, + &avail, + NULL)) { uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); break; } @@ -853,6 +980,63 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, break; } + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol to read the incoming data. */ + if (handle->remaining_ipc_rawdata_bytes == 0) { + /* We're reading a new frame. First, read the header. */ + assert(avail >= sizeof(ipc_frame.header)); + + if (!ReadFile(handle->handle, + &ipc_frame.header, + sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame.header)); + + if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { + assert(avail - sizeof(ipc_frame.header) >= + sizeof(ipc_frame.socket_info)); + + /* Read the TCP socket info. */ + if (!ReadFile(handle->handle, + &ipc_frame.socket_info, + sizeof(ipc_frame) - sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); + + /* Insert a new pending socket entry. */ + pending_ipc_socket = + (uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket)); + if (!pending_ipc_socket) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + pending_ipc_socket->socket_info = ipc_frame.socket_info; + pending_ipc_socket->next = handle->pending_ipc_sockets; + handle->pending_ipc_sockets = pending_ipc_socket; + } + + if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { + handle->remaining_ipc_rawdata_bytes = + ipc_frame.header.raw_data_length; + continue; + } + } else { + avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); + } + } + buf = handle->alloc_cb((uv_handle_t*) handle, avail); assert(buf.len > 0); @@ -862,7 +1046,20 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, &bytes, NULL)) { /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + assert(handle->remaining_ipc_rawdata_bytes >= bytes); + handle->remaining_ipc_rawdata_bytes = + handle->remaining_ipc_rawdata_bytes - bytes; + if (handle->read2_cb) { + handle->read2_cb(handle, bytes, buf, + handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE); + } else if (handle->read_cb) { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + } else { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { break; @@ -928,8 +1125,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, CloseHandle(req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; } - if (!(handle->flags & UV_HANDLE_CLOSING) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(loop, handle, req, FALSE); } } @@ -1066,6 +1262,21 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - assert(0 && "implement me"); -} + HANDLE os_handle; + + /* Special-case stdin with ipc. */ + if (file == 0 && pipe->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + os_handle = (HANDLE)_get_osfhandle(file); + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } + + uv_pipe_connection_init(pipe); + pipe->ipc_pid = uv_parent_pid(); + assert(pipe->ipc_pid != -1); + + pipe->handle = os_handle; + } +} diff --git a/src/win/process.c b/src/win/process.c index 4db04832..da72d55e 100644 --- a/src/win/process.c +++ b/src/win/process.c @@ -45,7 +45,7 @@ typedef struct env_var { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); \ } \ if (!uv_utf8_to_utf16(s, t, size / sizeof(wchar_t))) { \ - uv__set_sys_error(loop, GetLastError()); \ + uv__set_sys_error(loop, GetLastError()); \ err = -1; \ goto done; \ } @@ -739,7 +739,8 @@ void uv_process_close(uv_loop_t* loop, uv_process_t* handle) { static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, - HANDLE* child_pipe, DWORD server_access, DWORD child_access) { + HANDLE* child_pipe, DWORD server_access, DWORD child_access, + int overlapped) { int err; SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE }; char pipe_name[64]; @@ -767,7 +768,7 @@ static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, 0, &sa, OPEN_EXISTING, - 0, + overlapped ? FILE_FLAG_OVERLAPPED : 0, NULL); if (*child_pipe == INVALID_HANDLE_VALUE) { @@ -848,7 +849,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, wchar_t* path = NULL; int size; BOOL result; - wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, *env = NULL, *cwd = NULL; + wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, + *env = NULL, *cwd = NULL; HANDLE* child_stdio = process->child_stdio; STARTUPINFOW startup; PROCESS_INFORMATION info; @@ -904,12 +906,23 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, /* Create stdio pipes. */ if (options.stdin_stream) { - err = uv_create_stdio_pipe_pair( - loop, - options.stdin_stream, - &child_stdio[0], - PIPE_ACCESS_OUTBOUND, - GENERIC_READ | FILE_WRITE_ATTRIBUTES); + if (options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_DUPLEX, + GENERIC_READ | FILE_WRITE_ATTRIBUTES | GENERIC_WRITE, + 1); + } else { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_OUTBOUND, + GENERIC_READ | FILE_WRITE_ATTRIBUTES, + 0); + } } else { err = duplicate_std_handle(loop, STD_INPUT_HANDLE, &child_stdio[0]); } @@ -922,7 +935,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, loop, options.stdout_stream, &child_stdio[1], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_OUTPUT_HANDLE, &child_stdio[1]); } @@ -936,7 +950,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, options.stderr_stream, &child_stdio[2], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_ERROR_HANDLE, &child_stdio[2]); } @@ -969,6 +984,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->process_handle = info.hProcess; process->pid = info.dwProcessId; + if (options.stdin_stream && + options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + options.stdin_stream->ipc_pid = info.dwProcessId; + } + /* Setup notifications for when the child process exits. */ result = RegisterWaitForSingleObject(&process->wait_handle, process->process_handle, exit_wait_callback, (void*)process, INFINITE, diff --git a/src/win/stream.c b/src/win/stream.c index c38e06bb..f1211784 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -62,13 +62,11 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int uv_accept(uv_stream_t* server, uv_stream_t* client) { - assert(client->type == server->type); - switch (server->type) { case UV_TCP: return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); case UV_NAMED_PIPE: - return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + return uv_pipe_accept((uv_pipe_t*)server, client); default: assert(0); return -1; @@ -92,6 +90,18 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, } +int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb); + default: + assert(0); + return -1; + } +} + + int uv_read_stop(uv_stream_t* handle) { if (handle->type == UV_TTY) { return uv_tty_read_stop((uv_tty_t*) handle); @@ -121,6 +131,21 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, } +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + uv_loop_t* loop = handle->loop; + + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb); + default: + assert(0); + uv__set_sys_error(loop, WSAEINVAL); + return -1; + } +} + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_loop_t* loop = handle->loop; diff --git a/src/win/tcp.c b/src/win/tcp.c index ee95aa11..a2f65cbf 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -47,7 +47,7 @@ static unsigned int active_tcp_streams = 0; static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, - SOCKET socket) { + SOCKET socket, int imported) { DWORD yes = 1; assert(handle->socket == INVALID_SOCKET); @@ -70,8 +70,12 @@ static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, loop->iocp, (ULONG_PTR)socket, 0) == NULL) { - uv__set_sys_error(loop, GetLastError()); - return -1; + if (imported) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } else { + uv__set_sys_error(loop, GetLastError()); + return -1; + } } if (pSetFileCompletionNotificationModes) { @@ -109,6 +113,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { int status; int sys_error; + unsigned int i; + uv_tcp_accept_t* req; if (handle->flags & UV_HANDLE_CONNECTION && handle->flags & UV_HANDLE_SHUTTING && @@ -139,6 +145,20 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + req = &handle->accept_reqs[i]; + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + } + free(handle->accept_reqs); handle->accept_reqs = NULL; } @@ -169,7 +189,7 @@ static int uv__bind(uv_tcp_t* handle, return -1; } - if (uv_tcp_set_socket(handle->loop, handle, sock) == -1) { + if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) { closesocket(sock); return -1; } @@ -218,24 +238,40 @@ int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } +static void CALLBACK post_completion(void* context, BOOLEAN timed_out) { + uv_tcp_accept_t* req; + uv_tcp_t* handle; + + req = (uv_tcp_accept_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { uv_loop_t* loop = handle->loop; BOOL success; DWORD bytes; SOCKET accept_socket; short family; - LPFN_ACCEPTEX pAcceptExFamily; assert(handle->flags & UV_HANDLE_LISTENING); assert(req->accept_socket == INVALID_SOCKET); /* choose family and extension function */ - if ((handle->flags & UV_HANDLE_IPV6) != 0) { + if (handle->flags & UV_HANDLE_IPV6) { family = AF_INET6; - pAcceptExFamily = pAcceptEx6; } else { family = AF_INET; - pAcceptExFamily = pAcceptEx; } /* Open a socket for the accepted connection. */ @@ -249,15 +285,18 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - success = pAcceptExFamily(handle->socket, - accept_socket, - (void*)req->accept_buffer, - 0, - sizeof(struct sockaddr_storage), - sizeof(struct sockaddr_storage), - &bytes, - &req->overlapped); + success = handle->func_acceptex(handle->socket, + accept_socket, + (void*)req->accept_buffer, + 0, + sizeof(struct sockaddr_storage), + sizeof(struct sockaddr_storage), + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -268,6 +307,15 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* The req will be processed with IOCP. */ req->accept_socket = accept_socket; handle->reqs_pending++; + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + return; + } } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); @@ -275,6 +323,11 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); + /* Destroy the event handle */ + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + CloseHandle(req->overlapped.hEvent); + req->event_handle = NULL; + } } } @@ -357,6 +410,14 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + if (listen(handle->socket, backlog) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; @@ -378,6 +439,17 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { req->type = UV_ACCEPT; req->accept_socket = INVALID_SOCKET; req->data = handle; + + req->wait_handle = INVALID_HANDLE_VALUE; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } else { + req->event_handle = NULL; + } + uv_tcp_queue_accept(handle, req); } @@ -402,7 +474,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { return -1; } - if (uv_tcp_set_socket(client->loop, client, req->accept_socket) == -1) { + if (uv_tcp_set_socket(client->loop, client, req->accept_socket, 0) == -1) { closesocket(req->accept_socket); rv = -1; } else { @@ -476,19 +548,27 @@ int uv__tcp_connect(uv_connect_t* req, uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -529,19 +609,27 @@ int uv__tcp_connect6(uv_connect_t* req, uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx6(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { handle->reqs_pending++; @@ -848,3 +936,23 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, DECREASE_PENDING_REQ_COUNT(handle); } + + +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { + SOCKET socket = WSASocketW(AF_INET, + SOCK_STREAM, + IPPROTO_IP, + socket_protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + + if (socket == INVALID_SOCKET) { + uv__set_sys_error(tcp->loop, WSAGetLastError()); + return -1; + } + + tcp->flags |= UV_HANDLE_BOUND; + tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET; + + return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); +} diff --git a/src/win/util.c b/src/win/util.c index cb2d4438..cc6f93cf 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -25,6 +25,7 @@ #include "uv.h" #include "internal.h" +#include "Tlhelp32.h" int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, @@ -95,11 +96,13 @@ done: return retVal; } + void uv_loadavg(double avg[3]) { /* Can't be implemented */ avg[0] = avg[1] = avg[2] = 0; } + double uv_get_free_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -112,6 +115,7 @@ double uv_get_free_memory(void) { return (double)memory_status.ullAvailPhys; } + double uv_get_total_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -123,3 +127,26 @@ double uv_get_total_memory(void) { return (double)memory_status.ullTotalPhys; } + + +int uv_parent_pid() { + int parent_pid = -1; + HANDLE handle; + PROCESSENTRY32 pe; + int current_pid = GetCurrentProcessId(); + + pe.dwSize = sizeof(PROCESSENTRY32); + handle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + + if (Process32First(handle, &pe)) { + do { + if (pe.th32ProcessID == current_pid) { + parent_pid = pe.th32ParentProcessID; + break; + } + } while( Process32Next(handle, &pe)); + } + + CloseHandle(handle); + return parent_pid; +} diff --git a/src/win/winsock.c b/src/win/winsock.c index 1f56b3d7..e37a60a9 100644 --- a/src/win/winsock.c +++ b/src/win/winsock.c @@ -25,21 +25,6 @@ #include "../uv-common.h" #include "internal.h" - -/* Winsock extension functions (ipv4) */ -LPFN_CONNECTEX pConnectEx; -LPFN_ACCEPTEX pAcceptEx; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -LPFN_DISCONNECTEX pDisconnectEx; -LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -LPFN_CONNECTEX pConnectEx6; -LPFN_ACCEPTEX pAcceptEx6; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -LPFN_DISCONNECTEX pDisconnectEx6; -LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ int uv_allow_ipv6; @@ -74,6 +59,18 @@ static BOOL uv_get_extension_function(SOCKET socket, GUID guid, } +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target) { + const GUID wsaid_acceptex = WSAID_ACCEPTEX; + return uv_get_extension_function(socket, wsaid_acceptex, (void**)target); +} + + +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target) { + const GUID wsaid_connectex = WSAID_CONNECTEX; + return uv_get_extension_function(socket, wsaid_connectex, (void**)target); +} + + void uv_winsock_init() { const GUID wsaid_connectex = WSAID_CONNECTEX; const GUID wsaid_acceptex = WSAID_ACCEPTEX; @@ -83,7 +80,6 @@ void uv_winsock_init() { WSADATA wsa_data; int errorno; - SOCKET dummy; SOCKET dummy6; /* Initialize winsock */ @@ -96,58 +92,10 @@ void uv_winsock_init() { uv_addr_ip4_any_ = uv_ip4_addr("0.0.0.0", 0); uv_addr_ip6_any_ = uv_ip6_addr("::", 0); - /* Retrieve the needed winsock extension function pointers. */ - dummy = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - if (dummy == INVALID_SOCKET) { - uv_fatal_error(WSAGetLastError(), "socket"); - } - - if (!uv_get_extension_function(dummy, - wsaid_connectex, - (void**)&pConnectEx) || - !uv_get_extension_function(dummy, - wsaid_acceptex, - (void**)&pAcceptEx) || - !uv_get_extension_function(dummy, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs) || - !uv_get_extension_function(dummy, - wsaid_disconnectex, - (void**)&pDisconnectEx) || - !uv_get_extension_function(dummy, - wsaid_transmitfile, - (void**)&pTransmitFile)) { - uv_fatal_error(WSAGetLastError(), - "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); - } - - if (closesocket(dummy) == SOCKET_ERROR) { - uv_fatal_error(WSAGetLastError(), "closesocket"); - } - - /* optional IPv6 versions of winsock extension functions */ + /* Detect IPV6 support */ dummy6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_IP); if (dummy6 != INVALID_SOCKET) { uv_allow_ipv6 = TRUE; - - if (!uv_get_extension_function(dummy6, - wsaid_connectex, - (void**)&pConnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_acceptex, - (void**)&pAcceptEx6) || - !uv_get_extension_function(dummy6, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs6) || - !uv_get_extension_function(dummy6, - wsaid_disconnectex, - (void**)&pDisconnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_transmitfile, - (void**)&pTransmitFile6)) { - uv_allow_ipv6 = FALSE; - } - if (closesocket(dummy6) == SOCKET_ERROR) { uv_fatal_error(WSAGetLastError(), "closesocket"); } diff --git a/src/win/winsock.h b/src/win/winsock.h index 2c9fb92d..f879cc65 100644 --- a/src/win/winsock.h +++ b/src/win/winsock.h @@ -109,24 +109,12 @@ #define IPV6_V6ONLY 27 #endif - -/* Winsock extension functions (ipv4) */ -extern LPFN_CONNECTEX pConnectEx; -extern LPFN_ACCEPTEX pAcceptEx; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -extern LPFN_DISCONNECTEX pDisconnectEx; -extern LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -extern LPFN_CONNECTEX pConnectEx6; -extern LPFN_ACCEPTEX pAcceptEx6; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -extern LPFN_DISCONNECTEX pDisconnectEx6; -extern LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ extern int uv_allow_ipv6; +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); + /* Ip address used to bind to any port at any interface */ extern struct sockaddr_in uv_addr_ip4_any_; extern struct sockaddr_in6 uv_addr_ip6_any_; diff --git a/test/run-tests.c b/test/run-tests.c index f80dfbcb..fa7b8b8f 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -22,9 +22,9 @@ #include #include +#include "uv.h" #include "runner.h" #include "task.h" -#include "uv.h" /* Actual tests and helpers are defined in test-list.h */ #include "test-list.h" @@ -49,10 +49,61 @@ int main(int argc, char **argv) { } -static uv_tcp_t server; +static uv_pipe_t channel; +static uv_tcp_t tcp_server; +static uv_write_t conn_notify_req; +static int close_cb_called; +static int connection_accepted; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void close_conn_cb(uv_handle_t* handle) { + free(handle); + close_cb_called++; +} + + +void conn_notify_write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*)&tcp_server, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); +} static void ipc_on_connection(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + uv_tcp_t* conn; + + if (!connection_accepted) { + /* + * Accept the connection and close it. Also let the other + * side know. + */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_conn_cb); + + buf = uv_buf_init("accepted_connection\n", 20); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + NULL, conn_notify_write_cb); + ASSERT(r == 0); + + connection_accepted = 1; + } } @@ -63,7 +114,7 @@ static int ipc_helper() { * data is transfered over the channel. XXX edit this comment after handle * transfer is added. */ - uv_pipe_t channel; + uv_write_t write_req; int r; uv_buf_t buf; @@ -73,23 +124,26 @@ static int ipc_helper() { uv_pipe_open(&channel, 0); - r = uv_tcp_init(uv_default_loop(), &server); + r = uv_tcp_init(uv_default_loop(), &tcp_server); ASSERT(r == 0); - r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); ASSERT(r == 0); - r = uv_listen((uv_stream_t*)&server, 12, ipc_on_connection); + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); ASSERT(r == 0); buf = uv_buf_init("hello\n", 6); r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, - (uv_stream_t*)&server, NULL); + (uv_stream_t*)&tcp_server, NULL); ASSERT(r == 0); r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(connection_accepted == 1); + ASSERT(close_cb_called == 3); + return 0; } diff --git a/test/test-ipc.c b/test/test-ipc.c index ed263c10..0024cdee 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -33,12 +33,46 @@ static uv_tcp_t tcp_server; static int exit_cb_called; static int read2_cb_called; +static int local_conn_accepted; +static int remote_conn_accepted; +static int tcp_server_listening; static uv_write_t write_req; +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 + + +static void close_server_conn_cb(uv_handle_t* handle) { + free(handle); +} + + static void ipc_on_connection(uv_stream_t* server, int status) { - ASSERT(status == 0); - ASSERT((uv_stream_t*)&tcp_server == server); + uv_tcp_t* conn; + int r; + + if (!local_conn_accepted) { + /* Accept the connection and close it. Also and close the server. */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_server_conn_cb); + uv_close((uv_handle_t*)server, NULL); + local_conn_accepted = 1; + } } @@ -55,6 +89,39 @@ static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { } +static void close_client_conn_cb(uv_handle_t* handle) { + tcp_conn* p = (tcp_conn*)handle->data; + free(p); +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, close_client_conn_cb); +} + + +static void make_many_connections() { + tcp_conn* conn; + struct sockaddr_in addr; + int r, i; + + for (i = 0; i < CONN_COUNT; i++) { + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(uv_default_loop(), &conn->conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb); + ASSERT(r == 0); + + conn->conn.data = conn; + } +} + + static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { int r; @@ -78,27 +145,40 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, abort(); } - ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); - read2_cb_called++; - - /* Accept the pending TCP server, and start listening on it. */ - ASSERT(pending == UV_TCP); - r = uv_tcp_init(uv_default_loop(), &tcp_server); - ASSERT(r == 0); - - r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); - ASSERT(r == 0); - - r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); - ASSERT(r == 0); - - /* Make sure that the expected data is correctly multiplexed. */ - ASSERT(memcmp("hello\n", buf.base, nread) == 0); fprintf(stderr, "got %d bytes\n", (int)nread); - outbuf = uv_buf_init("world\n", 6); - r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); - ASSERT(r == 0); + if (!tcp_server_listening) { + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + tcp_server_listening = 1; + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, nread) == 0); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); + } else if (memcmp("accepted_connection\n", buf.base, nread) == 0) { + /* Remote server has accepted a connection. Close the channel. */ + ASSERT(pending == UV_UNKNOWN_HANDLE); + remote_conn_accepted = 1; + uv_close((uv_handle_t*)&channel, NULL); + } free(buf.base); } @@ -133,6 +213,8 @@ TEST_IMPL(ipc) { r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); ASSERT(read2_cb_called == 1); ASSERT(exit_cb_called == 1); return 0; diff --git a/uv.gyp b/uv.gyp index 0db92a93..bd1c9d9a 100644 --- a/uv.gyp +++ b/uv.gyp @@ -117,7 +117,6 @@ 'src/win/pipe.c', 'src/win/process.c', 'src/win/req.c', - 'src/win/stdio.c', 'src/win/stream.c', 'src/win/tcp.c', 'src/win/tty.c', From 90e88aabf6cac0f79538a1bf45c0a081b950522e Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Wed, 5 Oct 2011 02:35:38 -0700 Subject: [PATCH 09/10] remove stdio.c --- src/win/stdio.c | 75 ------------------------------------------------- 1 file changed, 75 deletions(-) delete mode 100644 src/win/stdio.c diff --git a/src/win/stdio.c b/src/win/stdio.c deleted file mode 100644 index b65e7fb5..00000000 --- a/src/win/stdio.c +++ /dev/null @@ -1,75 +0,0 @@ -/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to - * deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or - * sell copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -#include -#include - -#include "uv.h" -#include "../uv-common.h" -#include "internal.h" - - -static uv_pipe_t* uv_make_pipe_for_std_handle(uv_loop_t* loop, HANDLE handle) { - uv_pipe_t* pipe = NULL; - - pipe = (uv_pipe_t*)malloc(sizeof(uv_pipe_t)); - if (!pipe) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - if (uv_pipe_init_with_handle(loop, pipe, handle)) { - free(pipe); - return NULL; - } - - pipe->flags |= UV_HANDLE_UV_ALLOCED; - return pipe; -} - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - HANDLE handle; - - switch (type) { - case UV_STDIN: - handle = GetStdHandle(STD_INPUT_HANDLE); - if (handle == INVALID_HANDLE_VALUE) { - return NULL; - } - - /* Assume only named pipes for now. */ - return (uv_stream_t*)uv_make_pipe_for_std_handle(loop, handle); - break; - - case UV_STDOUT: - return NULL; - break; - - case UV_STDERR: - return NULL; - break; - - default: - assert(0); - uv__set_artificial_error(loop, UV_EINVAL); - return NULL; - } -} From 34f719d7a529c7e4a8c4d65a3e8fd0c0477c2309 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Thu, 6 Oct 2011 00:58:25 -0700 Subject: [PATCH 10/10] windows ipc fixes --- include/uv-private/uv-win.h | 9 +--- src/win/internal.h | 2 - src/win/pipe.c | 89 ++++++++++++++++++++++--------------- src/win/tcp.c | 12 +++-- 4 files changed, 59 insertions(+), 53 deletions(-) diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 1be477e7..7ede6882 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -43,11 +43,6 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; -typedef struct uv_duplicate_socket_info_s { - WSAPROTOCOL_INFOW socket_info; - struct uv_duplicate_socket_info_s* next; -} uv_duplicate_socket_info_t; - typedef int uv_file; RB_HEAD(uv_timer_tree_s, uv_timer_s); @@ -103,7 +98,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ - /* empty */ + int ipc_header; #define UV_CONNECT_PRIVATE_FIELDS \ /* empty */ @@ -181,7 +176,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_write_t ipc_header_write_req; \ int ipc_pid; \ uint64_t remaining_ipc_rawdata_bytes; \ - uv_duplicate_socket_info_t* pending_ipc_sockets; + WSAPROTOCOL_INFOW* pending_socket_info; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/src/win/internal.h b/src/win/internal.h index e61aaefe..7753e70b 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -66,8 +66,6 @@ void uv_process_timers(uv_loop_t* loop); #define UV_HANDLE_TTY_RAW 0x80000 #define UV_HANDLE_USE_IPC_PROTOCOL 0x100000 #define UV_HANDLE_EMULATE_IOCP 0x200000 -#define UV_HANDLE_DUPLICATED_SOCKET 0x400000 -#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); diff --git a/src/win/pipe.c b/src/win/pipe.c index 6473468f..34ff0245 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -77,7 +77,9 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->name = NULL; handle->ipc_pid = 0; handle->remaining_ipc_rawdata_bytes = 0; - handle->pending_ipc_sockets = NULL; + handle->pending_socket_info = NULL; + + uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); if (ipc) { handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL; @@ -196,7 +198,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - uv_duplicate_socket_info_t* socket_info, *next_socket_info; if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && @@ -256,11 +257,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (handle->flags & UV_HANDLE_CONNECTION) { - next_socket_info = handle->pending_ipc_sockets; - while (next_socket_info) { - socket_info = next_socket_info; - next_socket_info = next_socket_info->next; - free(socket_info); + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; } } @@ -582,24 +581,18 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { - int r; uv_loop_t* loop = server->loop; uv_pipe_t* pipe_client; uv_pipe_accept_t* req; - uv_duplicate_socket_info_t* pending_socket; if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) { - pending_socket = server->pending_ipc_sockets; - if (!pending_socket) { + if (!server->pending_socket_info) { /* No valid pending sockets. */ uv__set_sys_error(loop, WSAEWOULDBLOCK); return -1; } - server->pending_ipc_sockets = pending_socket->next; - r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info); - free(pending_socket); - return r; + return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info); } else { pipe_client = (uv_pipe_t*)client; @@ -754,7 +747,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, uv_stream_t* send_handle, uv_write_cb cb) { int result; uv_tcp_t* tcp_send_handle; - uv_req_t* ipc_header_req; + uv_write_t* ipc_header_req; DWORD written; uv_ipc_frame_uv_stream ipc_frame; @@ -784,6 +777,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, req->type = UV_WRITE; req->handle = (uv_stream_t*) handle; req->cb = cb; + req->ipc_header = 0; memset(&req->overlapped, 0, sizeof(req->overlapped)); if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { @@ -809,16 +803,26 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, * the first write, and then use the provided req for the second write. */ if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { - ipc_header_req = (uv_req_t*)req; + ipc_header_req = req; } else { - ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req; - /* Initialize the req if needed. */ + /* + * Try to use the preallocated write req if it's available. + * Otherwise allocate a new one. + */ if (handle->ipc_header_write_req.type != UV_WRITE) { - uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); - handle->ipc_header_write_req.type = UV_WRITE; - handle->ipc_header_write_req.handle = (uv_stream_t*) handle; - handle->ipc_header_write_req.cb = NULL; + ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; + } else { + ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); + if (!handle->accept_reqs) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } } + + uv_req_init(loop, (uv_req_t*) ipc_header_req); + ipc_header_req->type = UV_WRITE; + ipc_header_req->handle = (uv_stream_t*) handle; + ipc_header_req->cb = NULL; + ipc_header_req->ipc_header = 1; } /* Write the header or the whole frame. */ @@ -947,7 +951,6 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, DWORD bytes, avail; uv_buf_t buf; uv_ipc_frame_uv_stream ipc_frame; - uv_duplicate_socket_info_t* pending_ipc_socket; assert(handle->type == UV_NAMED_PIPE); @@ -1015,16 +1018,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); - /* Insert a new pending socket entry. */ - pending_ipc_socket = - (uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket)); - if (!pending_ipc_socket) { + /* Store the pending socket info. */ + assert(!handle->pending_socket_info); + handle->pending_socket_info = + (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info))); + if (!handle->pending_socket_info) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } - pending_ipc_socket->socket_info = ipc_frame.socket_info; - pending_ipc_socket->next = handle->pending_ipc_sockets; - handle->pending_ipc_sockets = pending_ipc_socket; + *(handle->pending_socket_info) = ipc_frame.socket_info; } if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { @@ -1052,10 +1054,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, handle->remaining_ipc_rawdata_bytes - bytes; if (handle->read2_cb) { handle->read2_cb(handle, bytes, buf, - handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE); + handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); } else if (handle->read_cb) { handle->read_cb((uv_stream_t*)handle, bytes, buf); } + + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; + } } else { handle->read_cb((uv_stream_t*)handle, bytes, buf); } @@ -1087,12 +1094,20 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, handle->write_queue_size -= req->queued_bytes; - if (req->cb) { - if (!REQ_SUCCESS(req)) { - uv__set_sys_error(loop, GET_REQ_ERROR(req)); - ((uv_write_cb)req->cb)(req, -1); + if (req->ipc_header) { + if (req == &handle->ipc_header_write_req) { + req->type = UV_UNKNOWN_REQ; } else { - ((uv_write_cb)req->cb)(req, 0); + free(req); + } + } else { + if (req->cb) { + if (!REQ_SUCCESS(req)) { + uv__set_sys_error(loop, GET_REQ_ERROR(req)); + ((uv_write_cb)req->cb)(req, -1); + } else { + ((uv_write_cb)req->cb)(req, 0); + } } } diff --git a/src/win/tcp.c b/src/win/tcp.c index a2f65cbf..897ea5e9 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -103,6 +103,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { handle->socket = INVALID_SOCKET; handle->type = UV_TCP; handle->reqs_pending = 0; + handle->func_acceptex = NULL; + handle->func_connectex = NULL; loop->counters.tcp_init++; @@ -410,12 +412,11 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; - if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if (!handle->func_acceptex) { if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) { uv__set_sys_error(loop, WSAEAFNOSUPPORT); return -1; } - handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; } if (listen(handle->socket, backlog) == SOCKET_ERROR) { @@ -548,12 +549,11 @@ int uv__tcp_connect(uv_connect_t* req, uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; - if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if (!handle->func_connectex) { if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { uv__set_sys_error(loop, WSAEAFNOSUPPORT); return -1; } - handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; } uv_req_init(loop, (uv_req_t*) req); @@ -609,12 +609,11 @@ int uv__tcp_connect6(uv_connect_t* req, uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; - if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if (!handle->func_connectex) { if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { uv__set_sys_error(loop, WSAEAFNOSUPPORT); return -1; } - handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; } uv_req_init(loop, (uv_req_t*) req); @@ -952,7 +951,6 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { } tcp->flags |= UV_HANDLE_BOUND; - tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET; return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); }