From 81c4043c83f07dc8c365c94d8330c96c5e313d22 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Thu, 29 Sep 2011 17:58:58 -0700 Subject: [PATCH] ipc on windows --- include/uv-private/uv-win.h | 23 ++- include/uv.h | 19 +- src/unix/core.c | 7 - src/win/internal.h | 63 +++--- src/win/pipe.c | 387 ++++++++++++++++++++++++++++-------- src/win/process.c | 44 ++-- src/win/stream.c | 31 ++- src/win/tcp.c | 170 +++++++++++++--- src/win/util.c | 27 +++ src/win/winsock.c | 78 ++------ src/win/winsock.h | 18 +- test/run-tests.c | 68 ++++++- test/test-ipc.c | 124 ++++++++++-- uv.gyp | 1 - 14 files changed, 766 insertions(+), 294 deletions(-) diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 656e85df..1be477e7 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -43,6 +43,11 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; +typedef struct uv_duplicate_socket_info_s { + WSAPROTOCOL_INFOW socket_info; + struct uv_duplicate_socket_info_s* next; +} uv_duplicate_socket_info_t; + typedef int uv_file; RB_HEAD(uv_timer_tree_s, uv_timer_s); @@ -120,6 +125,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); UV_REQ_FIELDS \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + HANDLE event_handle; \ + HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ } uv_tcp_accept_t; @@ -140,10 +147,12 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define uv_tcp_server_fields \ uv_tcp_accept_t* accept_reqs; \ - uv_tcp_accept_t* pending_accepts; + uv_tcp_accept_t* pending_accepts; \ + LPFN_ACCEPTEX func_acceptex; #define uv_tcp_connection_fields \ - uv_buf_t read_buffer; + uv_buf_t read_buffer; \ + LPFN_CONNECTEX func_connectex; #define UV_TCP_PRIVATE_FIELDS \ SOCKET socket; \ @@ -164,11 +173,15 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_alloc_cb alloc_cb; #define uv_pipe_server_fields \ - uv_pipe_accept_t accept_reqs[4]; \ - uv_pipe_accept_t* pending_accepts; + uv_pipe_accept_t accept_reqs[4]; \ + uv_pipe_accept_t* pending_accepts; #define uv_pipe_connection_fields \ - uv_timer_t* eof_timer; + uv_timer_t* eof_timer; \ + uv_write_t ipc_header_write_req; \ + int ipc_pid; \ + uint64_t remaining_ipc_rawdata_bytes; \ + uv_duplicate_socket_info_t* pending_ipc_sockets; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/include/uv.h b/include/uv.h index 1358e4b9..8ea7ab0e 100644 --- a/include/uv.h +++ b/include/uv.h @@ -41,6 +41,12 @@ extern "C" { typedef intptr_t ssize_t; #endif +#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) +# include "uv-private/uv-unix.h" +#else +# include "uv-private/uv-win.h" +#endif + /* Expand this list if necessary. */ typedef enum { UV_UNKNOWN = -1, @@ -153,12 +159,6 @@ typedef struct uv_fs_s uv_fs_t; typedef struct uv_fs_event_s uv_fs_event_t; typedef struct uv_work_s uv_work_t; -#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) -# include "uv-private/uv-unix.h" -#else -# include "uv-private/uv-win.h" -#endif - /* * This function must be called before any other functions in libuv. @@ -392,13 +392,6 @@ int uv_read_stop(uv_stream_t*); */ int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb); -typedef enum { - UV_STDIN = 0, - UV_STDOUT, - UV_STDERR -} uv_std_type; - -uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); /* * Write data to stream. Buffers are written in order. Example: diff --git a/src/unix/core.c b/src/unix/core.c index 719327a9..c834aaae 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -790,10 +790,3 @@ size_t uv__strlcpy(char* dst, const char* src, size_t size) { return src - org; } - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - assert(0 && "implement me"); - return NULL; -} - diff --git a/src/win/internal.h b/src/win/internal.h index f8762145..e61aaefe 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -44,27 +44,30 @@ void uv_process_timers(uv_loop_t* loop); */ /* Private uv_handle flags */ -#define UV_HANDLE_CLOSING 0x0001 -#define UV_HANDLE_CLOSED 0x0002 -#define UV_HANDLE_BOUND 0x0004 -#define UV_HANDLE_LISTENING 0x0008 -#define UV_HANDLE_CONNECTION 0x0010 -#define UV_HANDLE_CONNECTED 0x0020 -#define UV_HANDLE_READING 0x0040 -#define UV_HANDLE_ACTIVE 0x0040 -#define UV_HANDLE_EOF 0x0080 -#define UV_HANDLE_SHUTTING 0x0100 -#define UV_HANDLE_SHUT 0x0200 -#define UV_HANDLE_ENDGAME_QUEUED 0x0400 -#define UV_HANDLE_BIND_ERROR 0x1000 -#define UV_HANDLE_IPV6 0x2000 -#define UV_HANDLE_PIPESERVER 0x4000 -#define UV_HANDLE_READ_PENDING 0x8000 -#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000 -#define UV_HANDLE_UV_ALLOCED 0x20000 -#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000 -#define UV_HANDLE_ZERO_READ 0x80000 -#define UV_HANDLE_TTY_RAW 0x100000 +#define UV_HANDLE_CLOSING 0x0001 +#define UV_HANDLE_CLOSED 0x0002 +#define UV_HANDLE_BOUND 0x0004 +#define UV_HANDLE_LISTENING 0x0008 +#define UV_HANDLE_CONNECTION 0x0010 +#define UV_HANDLE_CONNECTED 0x0020 +#define UV_HANDLE_READING 0x0040 +#define UV_HANDLE_ACTIVE 0x0040 +#define UV_HANDLE_EOF 0x0080 +#define UV_HANDLE_SHUTTING 0x0100 +#define UV_HANDLE_SHUT 0x0200 +#define UV_HANDLE_ENDGAME_QUEUED 0x0400 +#define UV_HANDLE_BIND_ERROR 0x1000 +#define UV_HANDLE_IPV6 0x2000 +#define UV_HANDLE_PIPESERVER 0x4000 +#define UV_HANDLE_READ_PENDING 0x8000 +#define UV_HANDLE_UV_ALLOCED 0x10000 +#define UV_HANDLE_SYNC_BYPASS_IOCP 0x20000 +#define UV_HANDLE_ZERO_READ 0x40000 +#define UV_HANDLE_TTY_RAW 0x80000 +#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000 +#define UV_HANDLE_EMULATE_IOCP 0x200000 +#define UV_HANDLE_DUPLICATED_SOCKET 0x400000 +#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -97,8 +100,8 @@ uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped); void uv_insert_pending_req(uv_loop_t* loop, uv_req_t* req); void uv_process_reqs(uv_loop_t* loop); -#define POST_COMPLETION_FOR_REQ(loop, req) \ - if (!PostQueuedCompletionStatus((loop)->iocp, \ +#define POST_COMPLETION_FOR_REQ(loop, req) \ + if (!PostQueuedCompletionStatus((loop)->iocp, \ 0, \ 0, \ &((req)->overlapped))) { \ @@ -135,6 +138,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); + /* * UDP @@ -149,19 +154,21 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle); /* * Pipes */ -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle); int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, char* name, size_t nameSize); void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client); +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); @@ -267,6 +274,10 @@ void uv_fs_event_close(uv_loop_t* loop, uv_fs_event_t* handle); void uv_fs_event_endgame(uv_loop_t* loop, uv_fs_event_t* handle); +/* Utils */ +int uv_parent_pid(); + + /* * Error handling */ diff --git a/src/win/pipe.c b/src/win/pipe.c index 4c855058..6473468f 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -20,6 +20,7 @@ */ #include +#include #include #include @@ -38,6 +39,22 @@ static const uv_buf_t uv_null_buf_ = { 0, NULL }; /* when the local ends wants to shut it down. */ static const int64_t eof_timeout = 50; /* ms */ +/* IPC protocol flags. */ +#define UV_IPC_RAW_DATA 0x0001 +#define UV_IPC_UV_STREAM 0x0002 + +/* IPC frame header. */ +typedef struct { + int flags; + uint64_t raw_data_length; +} uv_ipc_frame_header_t; + +/* IPC frame, which contains an imported TCP socket stream. */ +typedef struct { + uv_ipc_frame_header_t header; + WSAPROTOCOL_INFOW socket_info; +} uv_ipc_frame_uv_stream; + static void eof_timer_init(uv_pipe_t* pipe); static void eof_timer_start(uv_pipe_t* pipe); static void eof_timer_stop(uv_pipe_t* pipe); @@ -58,7 +75,13 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; - handle->ipc = ipc; + handle->ipc_pid = 0; + handle->remaining_ipc_rawdata_bytes = 0; + handle->pending_ipc_sockets = NULL; + + if (ipc) { + handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL; + } loop->counters.pipe_init++; @@ -66,24 +89,6 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { } -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle) { - int err = uv_pipe_init(loop, handle); - - if (!err) { - /* - * At this point we don't know whether the pipe will be used as a client - * or a server. So, we assume that it will be a client until - * uv_listen is called. - */ - handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; - } - - return err; -} - - static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->eof_timer = NULL; @@ -132,7 +137,6 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, uv_pipe_connection_init(handle); handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; err = 0; done: @@ -192,7 +196,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - + uv_duplicate_socket_info_t* socket_info, *next_socket_info; if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && @@ -251,6 +255,15 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (handle->flags & UV_HANDLE_CONNECTION) { + next_socket_info = handle->pending_ipc_sockets; + while (next_socket_info) { + socket_info = next_socket_info; + next_socket_info = next_socket_info->next; + free(socket_info); + } + } + /* Remember the state of this flag because the close callback is */ /* allowed to clobber or free the handle's memory */ uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED; @@ -568,30 +581,50 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, } -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { + int r; uv_loop_t* loop = server->loop; - /* Find a connection instance that has been connected, but not yet */ - /* accepted. */ - uv_pipe_accept_t* req = server->pending_accepts; + uv_pipe_t* pipe_client; + uv_pipe_accept_t* req; + uv_duplicate_socket_info_t* pending_socket; - if (!req) { - /* No valid connections found, so we error out. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; - } + if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + pending_socket = server->pending_ipc_sockets; + if (!pending_socket) { + /* No valid pending sockets. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } - /* Initialize the client handle and copy the pipeHandle to the client */ - uv_pipe_connection_init(client); - client->handle = req->pipeHandle; + server->pending_ipc_sockets = pending_socket->next; + r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info); + free(pending_socket); + return r; + } else { + pipe_client = (uv_pipe_t*)client; - /* Prepare the req to pick up a new connection */ - server->pending_accepts = req->next_pending; - req->next_pending = NULL; - req->pipeHandle = INVALID_HANDLE_VALUE; + /* Find a connection instance that has been connected, but not yet */ + /* accepted. */ + req = server->pending_accepts; - if (!(server->flags & UV_HANDLE_CLOSING) && - !(server->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { - uv_pipe_queue_accept(loop, server, req, FALSE); + if (!req) { + /* No valid connections found, so we error out. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } + + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_connection_init(pipe_client); + pipe_client->handle = req->pipeHandle; + + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; + + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(loop, server, req, FALSE); + } } return 0; @@ -603,11 +636,8 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i, errno; - uv_pipe_accept_t* req; - HANDLE pipeHandle; - if (!(handle->flags & UV_HANDLE_BOUND) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { uv__set_artificial_error(loop, UV_EINVAL); return -1; } @@ -618,8 +648,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { return -1; } - if (!(handle->flags & UV_HANDLE_PIPESERVER) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_PIPESERVER)) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -627,30 +656,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - if (handle->flags & UV_HANDLE_GIVEN_OS_HANDLE) { - handle->flags |= UV_HANDLE_PIPESERVER; - pipeHandle = handle->handle; - assert(pipeHandle != INVALID_HANDLE_VALUE); - req = &handle->accept_reqs[0]; - uv_req_init(loop, (uv_req_t*) req); - req->pipeHandle = pipeHandle; - req->type = UV_ACCEPT; - req->data = handle; - req->next_pending = NULL; + /* First pipe handle should have already been created in uv_pipe_bind */ + assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - if (uv_set_pipe_handle(loop, handle, pipeHandle)) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - - uv_pipe_queue_accept(loop, handle, req, TRUE); - } else { - /* First pipe handle should have already been created in uv_pipe_bind */ - assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); - } + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); } return 0; @@ -694,8 +704,8 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { } -int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { uv_loop_t* loop = handle->loop; if (!(handle->flags & UV_HANDLE_CONNECTION)) { @@ -715,9 +725,10 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, handle->flags |= UV_HANDLE_READING; handle->read_cb = read_cb; + handle->read2_cb = read2_cb; handle->alloc_cb = alloc_cb; - /* If reading was stopped and then started again, there could stell be a */ + /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(loop, handle); @@ -726,11 +737,33 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } -int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { - int result; +int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); +} - if (bufcnt != 1) { + +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); +} + + +static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, + uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + int result; + uv_tcp_t* tcp_send_handle; + uv_req_t* ipc_header_req; + DWORD written; + uv_ipc_frame_uv_stream ipc_frame; + + if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { + uv__set_artificial_error(loop, UV_ENOTSUP); + return -1; + } + + if (send_handle && send_handle->type != UV_TCP) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -753,6 +786,73 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol. */ + if (send_handle) { + tcp_send_handle = (uv_tcp_t*)send_handle; + if (WSADuplicateSocketW(tcp_send_handle->socket, handle->ipc_pid, + &ipc_frame.socket_info)) { + uv__set_sys_error(loop, WSAGetLastError()); + return -1; + } + ipc_frame.header.flags |= UV_IPC_UV_STREAM; + } + + if (bufcnt == 1) { + ipc_frame.header.flags |= UV_IPC_RAW_DATA; + ipc_frame.header.raw_data_length = bufs[0].len; + } + + /* + * Use the provided req if we're only doing a single write. + * If we're doing multiple writes, use ipc_header_write_req to do + * the first write, and then use the provided req for the second write. + */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + ipc_header_req = (uv_req_t*)req; + } else { + ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req; + /* Initialize the req if needed. */ + if (handle->ipc_header_write_req.type != UV_WRITE) { + uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); + handle->ipc_header_write_req.type = UV_WRITE; + handle->ipc_header_write_req.handle = (uv_stream_t*) handle; + handle->ipc_header_write_req.cb = NULL; + } + } + + /* Write the header or the whole frame. */ + memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + + result = WriteFile(handle->handle, + &ipc_frame, + ipc_frame.header.flags & UV_IPC_UV_STREAM ? + sizeof(ipc_frame) : sizeof(ipc_frame.header), + &written, + &ipc_header_req->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = written; + handle->write_queue_size += req->queued_bytes; + } + + handle->reqs_pending++; + handle->write_reqs_pending++; + + /* If we don't have any raw data to write - we're done. */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + return 0; + } + } + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, @@ -780,6 +880,23 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, } +int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb); +} + + +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { + if (!(handle->flags & UV_HANDLE_USE_IPC_PROTOCOL)) { + uv__set_artificial_error(loop, UV_EINVAL); + return -1; + } + + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); +} + + static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, */ @@ -790,7 +907,11 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_read_stop((uv_stream_t*) handle); uv__set_artificial_error(loop, UV_EOF); - handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + } } @@ -803,7 +924,11 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_read_stop((uv_stream_t*) handle); uv__set_sys_error(loop, error); - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*)handle, -1, buf); + } } @@ -821,6 +946,8 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, avail; uv_buf_t buf; + uv_ipc_frame_uv_stream ipc_frame; + uv_duplicate_socket_info_t* pending_ipc_socket; assert(handle->type == UV_NAMED_PIPE); @@ -839,11 +966,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, /* Do non-blocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { if (!PeekNamedPipe(handle->handle, - NULL, - 0, - NULL, - &avail, - NULL)) { + NULL, + 0, + NULL, + &avail, + NULL)) { uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); break; } @@ -853,6 +980,63 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, break; } + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol to read the incoming data. */ + if (handle->remaining_ipc_rawdata_bytes == 0) { + /* We're reading a new frame. First, read the header. */ + assert(avail >= sizeof(ipc_frame.header)); + + if (!ReadFile(handle->handle, + &ipc_frame.header, + sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame.header)); + + if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { + assert(avail - sizeof(ipc_frame.header) >= + sizeof(ipc_frame.socket_info)); + + /* Read the TCP socket info. */ + if (!ReadFile(handle->handle, + &ipc_frame.socket_info, + sizeof(ipc_frame) - sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); + + /* Insert a new pending socket entry. */ + pending_ipc_socket = + (uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket)); + if (!pending_ipc_socket) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + pending_ipc_socket->socket_info = ipc_frame.socket_info; + pending_ipc_socket->next = handle->pending_ipc_sockets; + handle->pending_ipc_sockets = pending_ipc_socket; + } + + if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { + handle->remaining_ipc_rawdata_bytes = + ipc_frame.header.raw_data_length; + continue; + } + } else { + avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); + } + } + buf = handle->alloc_cb((uv_handle_t*) handle, avail); assert(buf.len > 0); @@ -862,7 +1046,20 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, &bytes, NULL)) { /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + assert(handle->remaining_ipc_rawdata_bytes >= bytes); + handle->remaining_ipc_rawdata_bytes = + handle->remaining_ipc_rawdata_bytes - bytes; + if (handle->read2_cb) { + handle->read2_cb(handle, bytes, buf, + handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE); + } else if (handle->read_cb) { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + } else { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { break; @@ -928,8 +1125,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, CloseHandle(req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; } - if (!(handle->flags & UV_HANDLE_CLOSING) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(loop, handle, req, FALSE); } } @@ -1066,6 +1262,21 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - assert(0 && "implement me"); -} + HANDLE os_handle; + + /* Special-case stdin with ipc. */ + if (file == 0 && pipe->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + os_handle = (HANDLE)_get_osfhandle(file); + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } + + uv_pipe_connection_init(pipe); + pipe->ipc_pid = uv_parent_pid(); + assert(pipe->ipc_pid != -1); + + pipe->handle = os_handle; + } +} diff --git a/src/win/process.c b/src/win/process.c index 4db04832..da72d55e 100644 --- a/src/win/process.c +++ b/src/win/process.c @@ -45,7 +45,7 @@ typedef struct env_var { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); \ } \ if (!uv_utf8_to_utf16(s, t, size / sizeof(wchar_t))) { \ - uv__set_sys_error(loop, GetLastError()); \ + uv__set_sys_error(loop, GetLastError()); \ err = -1; \ goto done; \ } @@ -739,7 +739,8 @@ void uv_process_close(uv_loop_t* loop, uv_process_t* handle) { static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, - HANDLE* child_pipe, DWORD server_access, DWORD child_access) { + HANDLE* child_pipe, DWORD server_access, DWORD child_access, + int overlapped) { int err; SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE }; char pipe_name[64]; @@ -767,7 +768,7 @@ static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, 0, &sa, OPEN_EXISTING, - 0, + overlapped ? FILE_FLAG_OVERLAPPED : 0, NULL); if (*child_pipe == INVALID_HANDLE_VALUE) { @@ -848,7 +849,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, wchar_t* path = NULL; int size; BOOL result; - wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, *env = NULL, *cwd = NULL; + wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, + *env = NULL, *cwd = NULL; HANDLE* child_stdio = process->child_stdio; STARTUPINFOW startup; PROCESS_INFORMATION info; @@ -904,12 +906,23 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, /* Create stdio pipes. */ if (options.stdin_stream) { - err = uv_create_stdio_pipe_pair( - loop, - options.stdin_stream, - &child_stdio[0], - PIPE_ACCESS_OUTBOUND, - GENERIC_READ | FILE_WRITE_ATTRIBUTES); + if (options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_DUPLEX, + GENERIC_READ | FILE_WRITE_ATTRIBUTES | GENERIC_WRITE, + 1); + } else { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_OUTBOUND, + GENERIC_READ | FILE_WRITE_ATTRIBUTES, + 0); + } } else { err = duplicate_std_handle(loop, STD_INPUT_HANDLE, &child_stdio[0]); } @@ -922,7 +935,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, loop, options.stdout_stream, &child_stdio[1], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_OUTPUT_HANDLE, &child_stdio[1]); } @@ -936,7 +950,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, options.stderr_stream, &child_stdio[2], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_ERROR_HANDLE, &child_stdio[2]); } @@ -969,6 +984,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->process_handle = info.hProcess; process->pid = info.dwProcessId; + if (options.stdin_stream && + options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + options.stdin_stream->ipc_pid = info.dwProcessId; + } + /* Setup notifications for when the child process exits. */ result = RegisterWaitForSingleObject(&process->wait_handle, process->process_handle, exit_wait_callback, (void*)process, INFINITE, diff --git a/src/win/stream.c b/src/win/stream.c index c38e06bb..f1211784 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -62,13 +62,11 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int uv_accept(uv_stream_t* server, uv_stream_t* client) { - assert(client->type == server->type); - switch (server->type) { case UV_TCP: return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); case UV_NAMED_PIPE: - return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + return uv_pipe_accept((uv_pipe_t*)server, client); default: assert(0); return -1; @@ -92,6 +90,18 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, } +int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb); + default: + assert(0); + return -1; + } +} + + int uv_read_stop(uv_stream_t* handle) { if (handle->type == UV_TTY) { return uv_tty_read_stop((uv_tty_t*) handle); @@ -121,6 +131,21 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, } +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + uv_loop_t* loop = handle->loop; + + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb); + default: + assert(0); + uv__set_sys_error(loop, WSAEINVAL); + return -1; + } +} + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_loop_t* loop = handle->loop; diff --git a/src/win/tcp.c b/src/win/tcp.c index ee95aa11..a2f65cbf 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -47,7 +47,7 @@ static unsigned int active_tcp_streams = 0; static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, - SOCKET socket) { + SOCKET socket, int imported) { DWORD yes = 1; assert(handle->socket == INVALID_SOCKET); @@ -70,8 +70,12 @@ static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, loop->iocp, (ULONG_PTR)socket, 0) == NULL) { - uv__set_sys_error(loop, GetLastError()); - return -1; + if (imported) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } else { + uv__set_sys_error(loop, GetLastError()); + return -1; + } } if (pSetFileCompletionNotificationModes) { @@ -109,6 +113,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { int status; int sys_error; + unsigned int i; + uv_tcp_accept_t* req; if (handle->flags & UV_HANDLE_CONNECTION && handle->flags & UV_HANDLE_SHUTTING && @@ -139,6 +145,20 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + req = &handle->accept_reqs[i]; + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + } + free(handle->accept_reqs); handle->accept_reqs = NULL; } @@ -169,7 +189,7 @@ static int uv__bind(uv_tcp_t* handle, return -1; } - if (uv_tcp_set_socket(handle->loop, handle, sock) == -1) { + if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) { closesocket(sock); return -1; } @@ -218,24 +238,40 @@ int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } +static void CALLBACK post_completion(void* context, BOOLEAN timed_out) { + uv_tcp_accept_t* req; + uv_tcp_t* handle; + + req = (uv_tcp_accept_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { uv_loop_t* loop = handle->loop; BOOL success; DWORD bytes; SOCKET accept_socket; short family; - LPFN_ACCEPTEX pAcceptExFamily; assert(handle->flags & UV_HANDLE_LISTENING); assert(req->accept_socket == INVALID_SOCKET); /* choose family and extension function */ - if ((handle->flags & UV_HANDLE_IPV6) != 0) { + if (handle->flags & UV_HANDLE_IPV6) { family = AF_INET6; - pAcceptExFamily = pAcceptEx6; } else { family = AF_INET; - pAcceptExFamily = pAcceptEx; } /* Open a socket for the accepted connection. */ @@ -249,15 +285,18 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - success = pAcceptExFamily(handle->socket, - accept_socket, - (void*)req->accept_buffer, - 0, - sizeof(struct sockaddr_storage), - sizeof(struct sockaddr_storage), - &bytes, - &req->overlapped); + success = handle->func_acceptex(handle->socket, + accept_socket, + (void*)req->accept_buffer, + 0, + sizeof(struct sockaddr_storage), + sizeof(struct sockaddr_storage), + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -268,6 +307,15 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* The req will be processed with IOCP. */ req->accept_socket = accept_socket; handle->reqs_pending++; + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + return; + } } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); @@ -275,6 +323,11 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); + /* Destroy the event handle */ + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + CloseHandle(req->overlapped.hEvent); + req->event_handle = NULL; + } } } @@ -357,6 +410,14 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + if (listen(handle->socket, backlog) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; @@ -378,6 +439,17 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { req->type = UV_ACCEPT; req->accept_socket = INVALID_SOCKET; req->data = handle; + + req->wait_handle = INVALID_HANDLE_VALUE; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } else { + req->event_handle = NULL; + } + uv_tcp_queue_accept(handle, req); } @@ -402,7 +474,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { return -1; } - if (uv_tcp_set_socket(client->loop, client, req->accept_socket) == -1) { + if (uv_tcp_set_socket(client->loop, client, req->accept_socket, 0) == -1) { closesocket(req->accept_socket); rv = -1; } else { @@ -476,19 +548,27 @@ int uv__tcp_connect(uv_connect_t* req, uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -529,19 +609,27 @@ int uv__tcp_connect6(uv_connect_t* req, uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; + if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT; + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx6(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { handle->reqs_pending++; @@ -848,3 +936,23 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, DECREASE_PENDING_REQ_COUNT(handle); } + + +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { + SOCKET socket = WSASocketW(AF_INET, + SOCK_STREAM, + IPPROTO_IP, + socket_protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + + if (socket == INVALID_SOCKET) { + uv__set_sys_error(tcp->loop, WSAGetLastError()); + return -1; + } + + tcp->flags |= UV_HANDLE_BOUND; + tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET; + + return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); +} diff --git a/src/win/util.c b/src/win/util.c index cb2d4438..cc6f93cf 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -25,6 +25,7 @@ #include "uv.h" #include "internal.h" +#include "Tlhelp32.h" int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, @@ -95,11 +96,13 @@ done: return retVal; } + void uv_loadavg(double avg[3]) { /* Can't be implemented */ avg[0] = avg[1] = avg[2] = 0; } + double uv_get_free_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -112,6 +115,7 @@ double uv_get_free_memory(void) { return (double)memory_status.ullAvailPhys; } + double uv_get_total_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -123,3 +127,26 @@ double uv_get_total_memory(void) { return (double)memory_status.ullTotalPhys; } + + +int uv_parent_pid() { + int parent_pid = -1; + HANDLE handle; + PROCESSENTRY32 pe; + int current_pid = GetCurrentProcessId(); + + pe.dwSize = sizeof(PROCESSENTRY32); + handle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + + if (Process32First(handle, &pe)) { + do { + if (pe.th32ProcessID == current_pid) { + parent_pid = pe.th32ParentProcessID; + break; + } + } while( Process32Next(handle, &pe)); + } + + CloseHandle(handle); + return parent_pid; +} diff --git a/src/win/winsock.c b/src/win/winsock.c index 1f56b3d7..e37a60a9 100644 --- a/src/win/winsock.c +++ b/src/win/winsock.c @@ -25,21 +25,6 @@ #include "../uv-common.h" #include "internal.h" - -/* Winsock extension functions (ipv4) */ -LPFN_CONNECTEX pConnectEx; -LPFN_ACCEPTEX pAcceptEx; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -LPFN_DISCONNECTEX pDisconnectEx; -LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -LPFN_CONNECTEX pConnectEx6; -LPFN_ACCEPTEX pAcceptEx6; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -LPFN_DISCONNECTEX pDisconnectEx6; -LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ int uv_allow_ipv6; @@ -74,6 +59,18 @@ static BOOL uv_get_extension_function(SOCKET socket, GUID guid, } +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target) { + const GUID wsaid_acceptex = WSAID_ACCEPTEX; + return uv_get_extension_function(socket, wsaid_acceptex, (void**)target); +} + + +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target) { + const GUID wsaid_connectex = WSAID_CONNECTEX; + return uv_get_extension_function(socket, wsaid_connectex, (void**)target); +} + + void uv_winsock_init() { const GUID wsaid_connectex = WSAID_CONNECTEX; const GUID wsaid_acceptex = WSAID_ACCEPTEX; @@ -83,7 +80,6 @@ void uv_winsock_init() { WSADATA wsa_data; int errorno; - SOCKET dummy; SOCKET dummy6; /* Initialize winsock */ @@ -96,58 +92,10 @@ void uv_winsock_init() { uv_addr_ip4_any_ = uv_ip4_addr("0.0.0.0", 0); uv_addr_ip6_any_ = uv_ip6_addr("::", 0); - /* Retrieve the needed winsock extension function pointers. */ - dummy = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - if (dummy == INVALID_SOCKET) { - uv_fatal_error(WSAGetLastError(), "socket"); - } - - if (!uv_get_extension_function(dummy, - wsaid_connectex, - (void**)&pConnectEx) || - !uv_get_extension_function(dummy, - wsaid_acceptex, - (void**)&pAcceptEx) || - !uv_get_extension_function(dummy, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs) || - !uv_get_extension_function(dummy, - wsaid_disconnectex, - (void**)&pDisconnectEx) || - !uv_get_extension_function(dummy, - wsaid_transmitfile, - (void**)&pTransmitFile)) { - uv_fatal_error(WSAGetLastError(), - "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); - } - - if (closesocket(dummy) == SOCKET_ERROR) { - uv_fatal_error(WSAGetLastError(), "closesocket"); - } - - /* optional IPv6 versions of winsock extension functions */ + /* Detect IPV6 support */ dummy6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_IP); if (dummy6 != INVALID_SOCKET) { uv_allow_ipv6 = TRUE; - - if (!uv_get_extension_function(dummy6, - wsaid_connectex, - (void**)&pConnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_acceptex, - (void**)&pAcceptEx6) || - !uv_get_extension_function(dummy6, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs6) || - !uv_get_extension_function(dummy6, - wsaid_disconnectex, - (void**)&pDisconnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_transmitfile, - (void**)&pTransmitFile6)) { - uv_allow_ipv6 = FALSE; - } - if (closesocket(dummy6) == SOCKET_ERROR) { uv_fatal_error(WSAGetLastError(), "closesocket"); } diff --git a/src/win/winsock.h b/src/win/winsock.h index 2c9fb92d..f879cc65 100644 --- a/src/win/winsock.h +++ b/src/win/winsock.h @@ -109,24 +109,12 @@ #define IPV6_V6ONLY 27 #endif - -/* Winsock extension functions (ipv4) */ -extern LPFN_CONNECTEX pConnectEx; -extern LPFN_ACCEPTEX pAcceptEx; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -extern LPFN_DISCONNECTEX pDisconnectEx; -extern LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -extern LPFN_CONNECTEX pConnectEx6; -extern LPFN_ACCEPTEX pAcceptEx6; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -extern LPFN_DISCONNECTEX pDisconnectEx6; -extern LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ extern int uv_allow_ipv6; +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); + /* Ip address used to bind to any port at any interface */ extern struct sockaddr_in uv_addr_ip4_any_; extern struct sockaddr_in6 uv_addr_ip6_any_; diff --git a/test/run-tests.c b/test/run-tests.c index f80dfbcb..fa7b8b8f 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -22,9 +22,9 @@ #include #include +#include "uv.h" #include "runner.h" #include "task.h" -#include "uv.h" /* Actual tests and helpers are defined in test-list.h */ #include "test-list.h" @@ -49,10 +49,61 @@ int main(int argc, char **argv) { } -static uv_tcp_t server; +static uv_pipe_t channel; +static uv_tcp_t tcp_server; +static uv_write_t conn_notify_req; +static int close_cb_called; +static int connection_accepted; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void close_conn_cb(uv_handle_t* handle) { + free(handle); + close_cb_called++; +} + + +void conn_notify_write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*)&tcp_server, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); +} static void ipc_on_connection(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + uv_tcp_t* conn; + + if (!connection_accepted) { + /* + * Accept the connection and close it. Also let the other + * side know. + */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_conn_cb); + + buf = uv_buf_init("accepted_connection\n", 20); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + NULL, conn_notify_write_cb); + ASSERT(r == 0); + + connection_accepted = 1; + } } @@ -63,7 +114,7 @@ static int ipc_helper() { * data is transfered over the channel. XXX edit this comment after handle * transfer is added. */ - uv_pipe_t channel; + uv_write_t write_req; int r; uv_buf_t buf; @@ -73,23 +124,26 @@ static int ipc_helper() { uv_pipe_open(&channel, 0); - r = uv_tcp_init(uv_default_loop(), &server); + r = uv_tcp_init(uv_default_loop(), &tcp_server); ASSERT(r == 0); - r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); ASSERT(r == 0); - r = uv_listen((uv_stream_t*)&server, 12, ipc_on_connection); + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); ASSERT(r == 0); buf = uv_buf_init("hello\n", 6); r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, - (uv_stream_t*)&server, NULL); + (uv_stream_t*)&tcp_server, NULL); ASSERT(r == 0); r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(connection_accepted == 1); + ASSERT(close_cb_called == 3); + return 0; } diff --git a/test/test-ipc.c b/test/test-ipc.c index ed263c10..0024cdee 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -33,12 +33,46 @@ static uv_tcp_t tcp_server; static int exit_cb_called; static int read2_cb_called; +static int local_conn_accepted; +static int remote_conn_accepted; +static int tcp_server_listening; static uv_write_t write_req; +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 + + +static void close_server_conn_cb(uv_handle_t* handle) { + free(handle); +} + + static void ipc_on_connection(uv_stream_t* server, int status) { - ASSERT(status == 0); - ASSERT((uv_stream_t*)&tcp_server == server); + uv_tcp_t* conn; + int r; + + if (!local_conn_accepted) { + /* Accept the connection and close it. Also and close the server. */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_server_conn_cb); + uv_close((uv_handle_t*)server, NULL); + local_conn_accepted = 1; + } } @@ -55,6 +89,39 @@ static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { } +static void close_client_conn_cb(uv_handle_t* handle) { + tcp_conn* p = (tcp_conn*)handle->data; + free(p); +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, close_client_conn_cb); +} + + +static void make_many_connections() { + tcp_conn* conn; + struct sockaddr_in addr; + int r, i; + + for (i = 0; i < CONN_COUNT; i++) { + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(uv_default_loop(), &conn->conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb); + ASSERT(r == 0); + + conn->conn.data = conn; + } +} + + static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { int r; @@ -78,27 +145,40 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, abort(); } - ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); - read2_cb_called++; - - /* Accept the pending TCP server, and start listening on it. */ - ASSERT(pending == UV_TCP); - r = uv_tcp_init(uv_default_loop(), &tcp_server); - ASSERT(r == 0); - - r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); - ASSERT(r == 0); - - r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); - ASSERT(r == 0); - - /* Make sure that the expected data is correctly multiplexed. */ - ASSERT(memcmp("hello\n", buf.base, nread) == 0); fprintf(stderr, "got %d bytes\n", (int)nread); - outbuf = uv_buf_init("world\n", 6); - r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); - ASSERT(r == 0); + if (!tcp_server_listening) { + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + tcp_server_listening = 1; + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, nread) == 0); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); + } else if (memcmp("accepted_connection\n", buf.base, nread) == 0) { + /* Remote server has accepted a connection. Close the channel. */ + ASSERT(pending == UV_UNKNOWN_HANDLE); + remote_conn_accepted = 1; + uv_close((uv_handle_t*)&channel, NULL); + } free(buf.base); } @@ -133,6 +213,8 @@ TEST_IMPL(ipc) { r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); ASSERT(read2_cb_called == 1); ASSERT(exit_cb_called == 1); return 0; diff --git a/uv.gyp b/uv.gyp index 0db92a93..bd1c9d9a 100644 --- a/uv.gyp +++ b/uv.gyp @@ -117,7 +117,6 @@ 'src/win/pipe.c', 'src/win/process.c', 'src/win/req.c', - 'src/win/stdio.c', 'src/win/stream.c', 'src/win/tcp.c', 'src/win/tty.c',