diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 24ef37cb..f1999292 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -192,6 +192,11 @@ typedef void* uv_lib_t; struct termios orig_termios; \ int mode; +#define UV_STREAM_INFO_PRIVATE_FIELDS \ + union { \ + int fd; \ + }; + /* UV_FS_EVENT_PRIVATE_FIELDS */ #if defined(__linux__) diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 5a6a949e..626eb6db 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -450,6 +450,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); wchar_t* dirw; \ char* buffer; +#define UV_STREAM_INFO_PRIVATE_FIELDS \ + union { \ + WSAPROTOCOL_INFOW socket_info; \ + }; + int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, char* utf8Buffer, size_t utf8Size); int uv_utf8_to_utf16(const char* utf8Buffer, wchar_t* utf16Buffer, diff --git a/include/uv.h b/include/uv.h index cedf5293..0fbd4d68 100644 --- a/include/uv.h +++ b/include/uv.h @@ -180,6 +180,7 @@ typedef struct uv_process_s uv_process_t; typedef struct uv_counters_s uv_counters_t; typedef struct uv_cpu_info_s uv_cpu_info_t; typedef struct uv_interface_address_s uv_interface_address_t; +typedef struct uv_stream_info_s uv_stream_info_t; /* Request types */ typedef struct uv_req_s uv_req_t; typedef struct uv_shutdown_s uv_shutdown_t; @@ -530,6 +531,28 @@ UV_EXTERN int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name, UV_EXTERN int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name, int* namelen); +/* + * uv_stream_info_t is used to store exported stream (using uv_export), + * which can be imported into a different event-loop within the same process + * (using uv_import). + */ +struct uv_stream_info_s { + uv_handle_type type; + UV_STREAM_INFO_PRIVATE_FIELDS +}; + +/* + * Exports uv_stream_t as uv_stream_info_t value, which could + * be used to initialize shared streams within the same process. + */ +UV_EXTERN int uv_export(uv_stream_t* stream, uv_stream_info_t* info); + +/* + * Imports uv_stream_info_t value into uv_stream_t to initialize + * shared stream. + */ +UV_EXTERN int uv_import(uv_stream_t* stream, uv_stream_info_t* info); + /* * uv_tcp_connect, uv_tcp_connect6 * These functions establish IPv4 and IPv6 TCP connections. Provide an diff --git a/src/unix/stream.c b/src/unix/stream.c index e0689fbc..e831b24b 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -966,3 +966,13 @@ int uv_read_stop(uv_stream_t* stream) { } +int uv_export(uv_stream_t* stream, uv_stream_info_t* info) { + /* Implement me */ + return uv__new_artificial_error(UV_ENOSYS); +} + + +int uv_import(uv_stream_t* stream, uv_stream_info_t* info) { + /* Implement me */ + return uv__new_artificial_error(UV_ENOSYS); +} diff --git a/src/win/internal.h b/src/win/internal.h index 0dc551db..9435bb28 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -143,11 +143,14 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); -int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); +int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, LPWSAPROTOCOL_INFOW protocol_info); +int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info); +int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info); + /* * UDP diff --git a/src/win/pipe.c b/src/win/pipe.c index 5c20fe48..60f137e9 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -649,7 +649,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { return -1; } - return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info); + return uv__tcp_import((uv_tcp_t*)client, server->pending_socket_info); } else { pipe_client = (uv_pipe_t*)client; diff --git a/src/win/stream.c b/src/win/stream.c index c2354eec..7faa0a2b 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -186,3 +186,27 @@ size_t uv_count_bufs(uv_buf_t bufs[], int count) { return bytes; } + + +int uv_export(uv_stream_t* stream, uv_stream_info_t* info) { + switch (stream->type) { + case UV_TCP: + return uv_tcp_export((uv_tcp_t*)stream, info); + default: + assert(0); + uv__set_sys_error(stream->loop, WSAEINVAL); + return -1; + } +} + + +int uv_import(uv_stream_t* stream, uv_stream_info_t* info) { + switch (stream->type) { + case UV_TCP: + return uv_tcp_import((uv_tcp_t*)stream, info); + default: + assert(0); + uv__set_sys_error(stream->loop, WSAEINVAL); + return -1; + } +} \ No newline at end of file diff --git a/src/win/tcp.c b/src/win/tcp.c index f810913f..590d0792 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -1019,7 +1019,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, } -int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { +int uv__tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { SOCKET socket = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_IP, @@ -1140,4 +1140,25 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) { } return 0; -} \ No newline at end of file +} + + +int uv_tcp_export(uv_tcp_t* tcp, uv_stream_info_t* info) { + if (uv_tcp_duplicate_socket(tcp, GetCurrentProcessId(), + &info->socket_info) == -1) { + return -1; + } + + info->type = UV_TCP; + return 0; +} + + +int uv_tcp_import(uv_tcp_t* tcp, uv_stream_info_t* info) { + if (info->type != UV_TCP) { + uv__set_sys_error(tcp->loop, WSAEINVAL); + return -1; + } + + return uv__tcp_import(tcp, &info->socket_info); +} diff --git a/test/test-ipc-threads.c b/test/test-ipc-threads.c new file mode 100644 index 00000000..209b1507 --- /dev/null +++ b/test/test-ipc-threads.c @@ -0,0 +1,224 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "runner.h" +#include "task.h" + +#include +#include + +typedef struct { + uv_loop_t* loop; + uv_thread_t thread; + uv_async_t recv_channel; + uv_async_t send_channel; + uv_tcp_t server; + uv_tcp_t conn; + int connection_accepted; + int close_cb_called; +} worker_t; + +static worker_t parent, child; + +static volatile uv_stream_info_t dup_stream; + +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 + +static void close_cb(uv_handle_t* handle) { + worker_t* worker = (worker_t*)handle->data; + ASSERT(worker); + worker->close_cb_called++; +} + + +static void on_connection(uv_stream_t* server, int status) { + int r; + worker_t* worker = CONTAINING_RECORD(server, worker_t, server); + + if (!worker->connection_accepted) { + /* + * Accept the connection and close it. + */ + ASSERT(status == 0); + + r = uv_tcp_init(server->loop, &worker->conn); + ASSERT(r == 0); + + worker->conn.data = worker; + + r = uv_accept(server, (uv_stream_t*)&worker->conn); + ASSERT(r == 0); + + worker->connection_accepted = 1; + + uv_close((uv_handle_t*)&worker->conn, close_cb); + uv_close((uv_handle_t*)&worker->recv_channel, close_cb); + uv_close((uv_handle_t*)server, close_cb); + } +} + + +static void close_client_conn_cb(uv_handle_t* handle) { + tcp_conn* p = (tcp_conn*)handle->data; + free(p); +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, close_client_conn_cb); +} + + +static void make_many_connections() { + tcp_conn* conn; + struct sockaddr_in addr; + int r, i; + + for (i = 0; i < CONN_COUNT; i++) { + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(uv_default_loop(), &conn->conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb); + ASSERT(r == 0); + + conn->conn.data = conn; + } +} + + +void on_parent_msg(uv_async_t* handle, int status) { + int r; + + ASSERT(dup_stream.type == UV_TCP); + + /* Import the shared TCP server, and start listening on it. */ + r = uv_tcp_init(parent.loop, &parent.server); + ASSERT(r == 0); + + parent.server.data = &parent; + + r = uv_import((uv_stream_t*)&parent.server, + (uv_stream_info_t*)&dup_stream); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&parent.server, 12, on_connection); + ASSERT(r == 0); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); +} + + +void on_child_msg(uv_async_t* handle, int status) { + ASSERT(!"no"); +} + + +static void child_thread_entry(void* arg) { + int r; + int listen_after_write = (int)arg; + + r = uv_tcp_init(child.loop, &child.server); + ASSERT(r == 0); + + child.server.data = &child; + + r = uv_tcp_bind(&child.server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + if (!listen_after_write) { + r = uv_listen((uv_stream_t*)&child.server, 12, on_connection); + ASSERT(r == 0); + } + + r = uv_export((uv_stream_t*)&child.server, + (uv_stream_info_t*)&dup_stream); + ASSERT(r == 0); + + r = uv_async_send(&child.send_channel); + ASSERT(r == 0); + + if (listen_after_write) { + r = uv_listen((uv_stream_t*)&child.server, 12, on_connection); + ASSERT(r == 0); + } + + r = uv_run(child.loop); + ASSERT(r == 0); + + ASSERT(child.connection_accepted == 1); + ASSERT(child.close_cb_called == 3); +} + + +static void run_ipc_threads_test(int listen_after_write) { + int r; + + parent.loop = uv_default_loop(); + child.loop = uv_loop_new(); + ASSERT(child.loop); + + r = uv_async_init(parent.loop, &parent.recv_channel, on_parent_msg); + ASSERT(r == 0); + parent.recv_channel.data = &parent; + + r = uv_async_init(child.loop, &parent.send_channel, on_child_msg); + ASSERT(r == 0); + parent.send_channel.data = &child; + + child.send_channel = parent.recv_channel; + child.recv_channel = parent.send_channel; + + r = uv_thread_create(&child.thread, child_thread_entry, (void*)listen_after_write); + ASSERT(r == 0); + + r = uv_run(parent.loop); + ASSERT(r == 0); + + ASSERT(parent.connection_accepted == 1); + ASSERT(parent.close_cb_called == 3); + + r = uv_thread_join(&child.thread); + ASSERT(r == 0); +} + + +TEST_IMPL(ipc_threads_listen_after_write) { + run_ipc_threads_test(1); + return 0; +} + + +TEST_IMPL(ipc_threads_listen_before_write) { + run_ipc_threads_test(0); + return 0; +} diff --git a/test/test-list.h b/test/test-list.h index 31dcb0a6..bc13dd0f 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -24,6 +24,8 @@ TEST_DECLARE (tty) TEST_DECLARE (stdio_over_pipes) TEST_DECLARE (ipc_listen_before_write) TEST_DECLARE (ipc_listen_after_write) +TEST_DECLARE (ipc_threads_listen_after_write) +TEST_DECLARE (ipc_threads_listen_before_write) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) TEST_DECLARE (pipe_ping_pong) @@ -161,6 +163,8 @@ TASK_LIST_START TEST_ENTRY (stdio_over_pipes) TEST_ENTRY (ipc_listen_before_write) TEST_ENTRY (ipc_listen_after_write) + TEST_ENTRY (ipc_threads_listen_after_write) + TEST_ENTRY (ipc_threads_listen_before_write) TEST_ENTRY (tcp_ping_pong) TEST_HELPER (tcp_ping_pong, tcp4_echo_server) diff --git a/uv.gyp b/uv.gyp index 7a88cfa1..3ed08433 100644 --- a/uv.gyp +++ b/uv.gyp @@ -301,6 +301,7 @@ 'test/test-hrtime.c', 'test/test-idle.c', 'test/test-ipc.c', + 'test/test-ipc-threads.c', 'test/test-list.h', 'test/test-loop-handles.c', 'test/test-multiple-listen.c',