diff --git a/include/uv-win.h b/include/uv-win.h index e48b8a9a..750761c0 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -368,11 +368,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); } u; \ struct uv_req_s* next_req; -#define UV_WRITE_PRIVATE_FIELDS \ - int coalesced; /* This ABI change will be un-done in a later commit. */ \ - int ipc_header; \ - uv_buf_t write_buffer; \ - HANDLE event_handle; \ +#define UV_WRITE_PRIVATE_FIELDS \ + int coalesced; \ + uv_buf_t write_buffer; \ + HANDLE event_handle; \ HANDLE wait_handle; #define UV_CONNECT_PRIVATE_FIELDS \ @@ -460,13 +459,14 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define uv_pipe_connection_fields \ uv_timer_t* eof_timer; \ - uv_write_t ipc_header_write_req; \ - int ipc_pid; \ - uint64_t remaining_ipc_rawdata_bytes; \ - struct { \ - void* queue[2]; \ - int queue_len; \ - } pending_ipc_info; \ + uv_write_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \ + DWORD ipc_remote_pid; \ + union { \ + uint32_t payload_remaining; \ + uint64_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \ + } ipc_data_frame; \ + void* ipc_xfer_queue[2]; \ + int ipc_xfer_queue_length; \ uv_write_t* non_overlapped_writes_tail; \ CRITICAL_SECTION readfile_thread_lock; \ volatile HANDLE readfile_thread_handle; diff --git a/src/win/internal.h b/src/win/internal.h index 0b185f55..997ec931 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -126,8 +126,9 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled; typedef struct { WSAPROTOCOL_INFOW socket_info; - int delayed_error; -} uv__ipc_socket_info_ex; + uint32_t delayed_error; + uint32_t flags; /* Either zero or UV_HANDLE_CONNECTION. */ +} uv__ipc_socket_xfer_info_t; int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client); @@ -149,11 +150,10 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp); void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); -int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex, - int tcp_connection); - -int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, - LPWSAPROTOCOL_INFOW protocol_info); +int uv__tcp_xfer_export(uv_tcp_t* handle, + int pid, + uv__ipc_socket_xfer_info_t* xfer_info); +int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info); /* @@ -178,11 +178,13 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); void uv__pipe_read_stop(uv_pipe_t* handle); -int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb); -int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle, - uv_write_cb cb); +int uv__pipe_write(uv_loop_t* loop, + uv_write_t* req, + uv_pipe_t* handle, + const uv_buf_t bufs[], + size_t nbufs, + uv_stream_t* send_handle, + uv_write_cb cb); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); @@ -329,7 +331,6 @@ void uv__fs_poll_endgame(uv_loop_t* loop, uv_fs_poll_t* handle); void uv__util_init(void); uint64_t uv__hrtime(double scale); -int uv_current_pid(void); __declspec(noreturn) void uv_fatal_error(const int errorno, const char* syscall); int uv__getpwuid_r(uv_passwd_t* pwd); int uv__convert_utf16_to_utf8(const WCHAR* utf16, int utf16len, char** utf8); diff --git a/src/win/pipe.c b/src/win/pipe.c index 0ef18773..22850931 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -21,9 +21,10 @@ #include #include -#include +#include #include #include +#include #include "uv.h" #include "internal.h" @@ -34,18 +35,6 @@ #include #include -typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t; - -struct uv__ipc_queue_item_s { - /* - * NOTE: It is important for socket_info_ex to be the first field, - * because we will we assigning it to the pending_ipc_info.socket_info - */ - uv__ipc_socket_info_ex socket_info_ex; - QUEUE member; - int tcp_connection; -}; - /* A zero-size buffer for use by uv_pipe_read */ static char uv_zero_[] = ""; @@ -62,22 +51,20 @@ static const int default_pending_pipe_instances = 4; static char pipe_prefix[] = "\\\\?\\pipe"; static const int pipe_prefix_len = sizeof(pipe_prefix) - 1; -/* IPC protocol flags. */ -#define UV_IPC_RAW_DATA 0x0001 -#define UV_IPC_TCP_SERVER 0x0002 -#define UV_IPC_TCP_CONNECTION 0x0004 +/* IPC incoming xfer queue item. */ +typedef struct { + uv__ipc_socket_xfer_info_t xfer_info; + QUEUE member; +} uv__ipc_xfer_queue_item_t; + +/* IPC frame types. */ +enum { UV__IPC_DATA_FRAME = 0, UV__IPC_XFER_FRAME = 1 }; /* IPC frame header. */ typedef struct { - int flags; - uint64_t raw_data_length; -} uv_ipc_frame_header_t; - -/* IPC frame, which contains an imported TCP socket stream. */ -typedef struct { - uv_ipc_frame_header_t header; - uv__ipc_socket_info_ex socket_info_ex; -} uv_ipc_frame_uv_stream; + uint32_t type; + uint32_t payload_length; +} uv__ipc_frame_header_t; /* Coalesced write request. */ typedef struct { @@ -85,6 +72,7 @@ typedef struct { uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */ } uv__coalesced_write_t; + static void eof_timer_init(uv_pipe_t* pipe); static void eof_timer_start(uv_pipe_t* pipe); static void eof_timer_stop(uv_pipe_t* pipe); @@ -104,15 +92,13 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; - handle->pipe.conn.ipc_pid = 0; - handle->pipe.conn.remaining_ipc_rawdata_bytes = 0; - QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue); - handle->pipe.conn.pending_ipc_info.queue_len = 0; + handle->pipe.conn.ipc_remote_pid = 0; + handle->pipe.conn.ipc_data_frame.payload_remaining = 0; + QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue); + handle->pipe.conn.ipc_xfer_queue_length = 0; handle->ipc = ipc; handle->pipe.conn.non_overlapped_writes_tail = NULL; - UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ); - return 0; } @@ -351,7 +337,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - uv__ipc_queue_item_t* item; + uv__ipc_xfer_queue_item_t* xfer_queue_item; if ((handle->flags & UV_HANDLE_CONNECTION) && handle->stream.conn.shutdown_req != NULL && @@ -428,27 +414,27 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { if (handle->flags & UV_HANDLE_CONNECTION) { /* Free pending sockets */ - while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) { + while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) { QUEUE* q; SOCKET socket; - q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue); + q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue); QUEUE_REMOVE(q); - item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); + xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); /* Materialize socket and close it */ socket = WSASocketW(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, - &item->socket_info_ex.socket_info, + &xfer_queue_item->xfer_info.socket_info, 0, WSA_FLAG_OVERLAPPED); - uv__free(item); + uv__free(xfer_queue_item); if (socket != INVALID_SOCKET) closesocket(socket); } - handle->pipe.conn.pending_ipc_info.queue_len = 0; + handle->pipe.conn.ipc_xfer_queue_length = 0; if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { @@ -879,23 +865,21 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_pipe_t* pipe_client; uv_pipe_accept_t* req; QUEUE* q; - uv__ipc_queue_item_t* item; + uv__ipc_xfer_queue_item_t* item; int err; if (server->ipc) { - if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) { + if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) { /* No valid pending sockets. */ return WSAEWOULDBLOCK; } - q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue); + q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue); QUEUE_REMOVE(q); - server->pipe.conn.pending_ipc_info.queue_len--; - item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); + server->pipe.conn.ipc_xfer_queue_length--; + item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); - err = uv_tcp_import((uv_tcp_t*)client, - &item->socket_info_ex, - item->tcp_connection); + err = uv__tcp_xfer_import((uv_tcp_t*) client, &item->xfer_info); if (err != 0) return err; @@ -1242,6 +1226,7 @@ static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { } } + static int uv__build_coalesced_write_req(uv_write_t* user_req, const uv_buf_t bufs[], size_t nbufs, @@ -1302,26 +1287,17 @@ static int uv__build_coalesced_write_req(uv_write_t* user_req, } -static int uv_pipe_write_impl(uv_loop_t* loop, - uv_write_t* req, - uv_pipe_t* handle, - const uv_buf_t bufs[], - unsigned int nbufs, - uv_stream_t* send_handle, - uv_write_cb cb) { +static int uv__pipe_write_data(uv_loop_t* loop, + uv_write_t* req, + uv_pipe_t* handle, + const uv_buf_t bufs[], + size_t nbufs, + uv_stream_t* send_handle, + uv_write_cb cb, + bool copy_always) { int err; int result; uv_buf_t write_buf; - uv_tcp_t* tcp_send_handle; - uv_write_t* ipc_header_req = NULL; - uv_ipc_frame_uv_stream ipc_frame; - - /* Only TCP handles are supported for sharing. */ - if (send_handle && ((send_handle->type != UV_TCP) || - (!(send_handle->flags & UV_HANDLE_BOUND) && - !(send_handle->flags & UV_HANDLE_CONNECTION)))) { - return ERROR_NOT_SUPPORTED; - } assert(handle->handle != INVALID_HANDLE_VALUE); @@ -1331,7 +1307,6 @@ static int uv_pipe_write_impl(uv_loop_t* loop, req->cb = cb; /* Private fields. */ req->coalesced = 0; - req->ipc_header = 0; req->event_handle = NULL; req->wait_handle = INVALID_HANDLE_VALUE; memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); @@ -1340,7 +1315,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, if (nbufs == 0) { /* Write empty buffer. */ write_buf = uv_null_buf_; - } else if (nbufs == 1) { + } else if (nbufs == 1 && !copy_always) { /* Write directly from bufs[0]. */ write_buf = bufs[0]; } else { @@ -1351,112 +1326,6 @@ static int uv_pipe_write_impl(uv_loop_t* loop, return err; } - if (handle->ipc) { - assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); - ipc_frame.header.flags = 0; - - /* Use the IPC framing protocol. */ - if (send_handle) { - tcp_send_handle = (uv_tcp_t*)send_handle; - - if (handle->pipe.conn.ipc_pid == 0) { - handle->pipe.conn.ipc_pid = uv_current_pid(); - } - - err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid, - &ipc_frame.socket_info_ex.socket_info); - if (err) { - return err; - } - - ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error; - - ipc_frame.header.flags |= UV_IPC_TCP_SERVER; - - if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) { - ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION; - } - } - - if (nbufs > 0) { - ipc_frame.header.flags |= UV_IPC_RAW_DATA; - ipc_frame.header.raw_data_length = write_buf.len; - } - - /* - * Use the provided req if we're only doing a single write. - * If we're doing multiple writes, use ipc_header_write_req to do - * the first write, and then use the provided req for the second write. - */ - if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { - ipc_header_req = req; - } else { - /* - * Try to use the preallocated write req if it's available. - * Otherwise allocate a new one. - */ - if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) { - ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req; - } else { - ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t)); - if (!ipc_header_req) { - uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); - } - } - - UV_REQ_INIT(ipc_header_req, UV_WRITE); - ipc_header_req->handle = (uv_stream_t*) handle; - ipc_header_req->cb = NULL; - ipc_header_req->coalesced = 0; - ipc_header_req->ipc_header = 1; - } - - /* Write the header or the whole frame. */ - memset(&ipc_header_req->u.io.overlapped, 0, - sizeof(ipc_header_req->u.io.overlapped)); - - /* Using overlapped IO, but wait for completion before returning. - This write is blocking because ipc_frame is on stack. */ - ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL); - if (!ipc_header_req->u.io.overlapped.hEvent) { - uv_fatal_error(GetLastError(), "CreateEvent"); - } - - result = WriteFile(handle->handle, - &ipc_frame, - ipc_frame.header.flags & UV_IPC_TCP_SERVER ? - sizeof(ipc_frame) : sizeof(ipc_frame.header), - NULL, - &ipc_header_req->u.io.overlapped); - if (!result && GetLastError() != ERROR_IO_PENDING) { - err = GetLastError(); - CloseHandle(ipc_header_req->u.io.overlapped.hEvent); - return err; - } - - if (!result) { - /* Request not completed immediately. Wait for it.*/ - if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) != - WAIT_OBJECT_0) { - err = GetLastError(); - CloseHandle(ipc_header_req->u.io.overlapped.hEvent); - return err; - } - } - ipc_header_req->u.io.queued_bytes = 0; - CloseHandle(ipc_header_req->u.io.overlapped.hEvent); - ipc_header_req->u.io.overlapped.hEvent = NULL; - - REGISTER_HANDLE_REQ(loop, handle, ipc_header_req); - handle->reqs_pending++; - handle->stream.conn.write_reqs_pending++; - - /* If we don't have any raw data to write - we're done. */ - if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { - return 0; - } - } - if ((handle->flags & (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) == (UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) { @@ -1567,28 +1436,133 @@ static int uv_pipe_write_impl(uv_loop_t* loop, } -int uv_pipe_write(uv_loop_t* loop, - uv_write_t* req, - uv_pipe_t* handle, - const uv_buf_t bufs[], - unsigned int nbufs, - uv_write_cb cb) { - return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb); +static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) { + DWORD* pid = &handle->pipe.conn.ipc_remote_pid; + + /* If the both ends of the IPC pipe are owned by the same process, + * the remote end pid may not yet be set. If so, do it here. + * TODO: this is weird; it'd probably better to use a handshake. */ + if (*pid == 0) + *pid = GetCurrentProcessId(); + + return *pid; } -int uv_pipe_write2(uv_loop_t* loop, +int uv__pipe_write_ipc(uv_loop_t* loop, + uv_write_t* req, + uv_pipe_t* handle, + const uv_buf_t data_bufs[], + size_t data_buf_count, + uv_stream_t* send_handle, + uv_write_cb cb) { + uv_buf_t stack_bufs[6]; + uv_buf_t* bufs; + size_t buf_count, buf_index; + uv__ipc_frame_header_t xfer_frame_header; + uv__ipc_socket_xfer_info_t xfer_info; + uv__ipc_frame_header_t data_frame_header; + size_t data_length; + size_t i; + int err; + + /* Compute the combined size of data buffers. */ + data_length = 0; + for (i = 0; i < data_buf_count; i++) + data_length += data_bufs[i].len; + if (data_length > UINT32_MAX) + return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ + + /* Prepare xfer frame payload. */ + if (send_handle) { + uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle; + + /* Verify that `send_handle` it is indeed a tcp handle. */ + if (send_tcp_handle->type != UV_TCP) + return ERROR_NOT_SUPPORTED; + + /* Export the tcp handle. */ + err = uv__tcp_xfer_export( + send_tcp_handle, uv__pipe_get_ipc_remote_pid(handle), &xfer_info); + if (err != 0) + return err; + } + + /* Compute the number of uv_buf_t's required. */ + buf_count = 0; + if (send_handle != NULL) { + buf_count += 2; /* One for the frame header, one for the payload. */ + } + if (data_buf_count > 0) { + buf_count += 1 + data_buf_count; /* One extra for the frame header. */ + } + + /* Use the on-stack buffer array if it is big enough; otherwise allocate + * space for it on the heap. */ + if (buf_count < ARRAY_SIZE(stack_bufs)) { + /* Use on-stack buffer array. */ + bufs = stack_bufs; + } else { + /* Use heap-allocated buffer array. */ + bufs = uv__calloc(buf_count, sizeof(uv_buf_t)); + if (bufs == NULL) + return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */ + } + buf_index = 0; + + if (send_handle != NULL) { + /* Add xfer frame header. */ + xfer_frame_header.type = UV__IPC_XFER_FRAME; + xfer_frame_header.payload_length = sizeof xfer_info; + bufs[buf_index++] = + uv_buf_init((char*) &xfer_frame_header, sizeof xfer_frame_header); + + /* Add xfer frame payload. */ + bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info); + } + + if (data_length > 0) { + /* Add data frame header. */ + data_frame_header.type = UV__IPC_DATA_FRAME; + data_frame_header.payload_length = (uint32_t) data_length; + bufs[buf_index++] = + uv_buf_init((char*) &data_frame_header, sizeof data_frame_header); + + /* Add data buffers. */ + for (i = 0; i < data_buf_count; i++) + bufs[buf_index++] = data_bufs[i]; + } + + /* Write buffers. We set the `always_copy` flag, so it is not a problem that + * some of the written data lives on the stack. */ + err = uv__pipe_write_data( + loop, req, handle, bufs, buf_count, send_handle, cb, true); + + /* If we had to heap-allocate the bufs array, free it now. */ + if (bufs != stack_bufs) { + uv__free(bufs); + } + + return err; +} + + +int uv__pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, const uv_buf_t bufs[], - unsigned int nbufs, + size_t nbufs, uv_stream_t* send_handle, uv_write_cb cb) { - if (!handle->ipc) { - return WSAEINVAL; + if (handle->ipc) { + /* IPC pipe write: use framing protocol. */ + return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb); + } else { + /* Non-IPC pipe write: put data on the wire directly. */ + assert(send_handle == NULL); + return uv__pipe_write_data( + loop, req, handle, bufs, nbufs, NULL, cb, false); } - - return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb); } @@ -1627,147 +1601,189 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, } -void uv__pipe_insert_pending_socket(uv_pipe_t* handle, - uv__ipc_socket_info_ex* info, - int tcp_connection) { - uv__ipc_queue_item_t* item; +static void uv__pipe_queue_ipc_xfer_info( + uv_pipe_t* handle, uv__ipc_socket_xfer_info_t* xfer_info) { + uv__ipc_xfer_queue_item_t* item; - item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item)); + item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item)); if (item == NULL) uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); - memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex)); - item->tcp_connection = tcp_connection; - QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member); - handle->pipe.conn.pending_ipc_info.queue_len++; + memcpy(&item->xfer_info, xfer_info, sizeof(item->xfer_info)); + QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member); + handle->pipe.conn.ipc_xfer_queue_length++; } -void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, - uv_req_t* req) { - DWORD bytes, avail; - uv_buf_t buf; - uv_ipc_frame_uv_stream ipc_frame; +/* Read an exact number of bytes from a pipe. If an error or end-of-file is + * encountered before the requested number of bytes are read, an error is + * returned. */ +static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) { + DWORD bytes_read, bytes_read_now; + bytes_read = 0; + while (bytes_read < count) { + if (!ReadFile(h, + (char*) buffer + bytes_read, + count - bytes_read, + &bytes_read_now, + NULL)) { + return GetLastError(); + } + + bytes_read += bytes_read_now; + } + + assert(bytes_read == count); + return 0; +} + + +static DWORD uv__pipe_read_data(uv_loop_t* loop, + uv_pipe_t* handle, + DWORD suggested_bytes, + DWORD max_bytes) { + DWORD bytes_read; + uv_buf_t buf; + + /* Ask the user for a buffer to read data into. */ + buf = uv_buf_init(NULL, 0); + handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf); + if (buf.base == NULL || buf.len == 0) { + handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); + return 0; /* Break out of read loop. */ + } + + /* Ensure we read at most the smaller of: + * (a) the length of the user-allocated buffer. + * (b) the maximum data length as specified by the `max_bytes` argument. + */ + max_bytes = min(buf.len, max_bytes); + + /* Read into the user buffer. */ + if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf); + return 0; /* Break out of read loop. */ + } + + /* Call the read callback. */ + handle->read_cb((uv_stream_t*) handle, bytes_read, &buf); + + return bytes_read; +} + + +static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { + DWORD* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining; + int err; + + if (*data_remaining > 0) { + /* Read data frame payload. */ + DWORD bytes_read = + uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining); + *data_remaining -= bytes_read; + return bytes_read; + + } else { + /* Start of a new IPC frame. */ + uv__ipc_frame_header_t frame_header; + uv__ipc_socket_xfer_info_t xfer_info; + + /* Read the IPC frame header. */ + err = uv__pipe_read_exactly( + handle->handle, &frame_header, sizeof frame_header); + if (err) + goto error; + + if (frame_header.type == UV__IPC_DATA_FRAME) { + /* Data frame: capture payload length. Actual data will be read in + * subsequent call to uv__pipe_read_ipc(). */ + *data_remaining = frame_header.payload_length; + + /* Return number of bytes read. */ + return sizeof frame_header; + + } else if (frame_header.type == UV__IPC_XFER_FRAME) { + /* Xfer frame: read the payload. */ + assert(frame_header.payload_length == sizeof xfer_info); + err = + uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info); + if (err) + goto error; + + /* Store the pending socket info. */ + uv__pipe_queue_ipc_xfer_info(handle, &xfer_info); + + /* Return number of bytes read. */ + return sizeof frame_header + sizeof xfer_info; + } + + /* Invalid frame. */ + err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ + } + +error: + uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); + return 0; /* Break out of read loop. */ +} + + +void uv_process_pipe_read_req(uv_loop_t* loop, + uv_pipe_t* handle, + uv_req_t* req) { assert(handle->type == UV_NAMED_PIPE); handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); + DECREASE_PENDING_REQ_COUNT(handle); eof_timer_stop(handle); + /* At this point, we're done with bookkeeping. If the user has stopped + * reading the pipe in the meantime, there is nothing left to do, since there + * is no callback that we can call. */ + if (!(handle->flags & UV_HANDLE_READING)) + return; + if (!REQ_SUCCESS(req)) { - /* An error occurred doing the zero-read. - * Note that if the read was cancelled by uv__pipe_interrupt_read(), the - * request may indicate an ERROR_OPERATION_ABORTED error. This error isn't - * relevant to the user; we'll restart the read by queueing a new read - * request below. */ - if (handle->flags & UV_HANDLE_READING && - GET_REQ_ERROR(req) != ERROR_OPERATION_ABORTED) { - uv_pipe_read_error_or_eof(loop, - handle, - GET_REQ_ERROR(req), - uv_null_buf_); - } + /* An error occurred doing the zero-read. */ + DWORD err = GET_REQ_ERROR(req); + + /* If the read was cancelled by uv__pipe_interrupt_read(), the request may + * indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to + * the user; we'll start a new zero-read at the end of this function. */ + if (err != ERROR_OPERATION_ABORTED) + uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); + } else { - /* Do non-blocking reads until the buffer is empty */ - while (handle->flags & UV_HANDLE_READING) { - if (!PeekNamedPipe(handle->handle, - NULL, - 0, - NULL, - &avail, - NULL)) { - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); + /* The zero-read completed without error, indicating there is data + * available in the kernel buffer. */ + DWORD avail; + + /* Get the number of bytes available. */ + avail = 0; + if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL)) + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); + + /* Read until we've either read all the bytes available, or the 'reading' + * flag is cleared. */ + while (avail > 0 && handle->flags & UV_HANDLE_READING) { + /* Depending on the type of pipe, read either IPC frames or raw data. */ + DWORD bytes_read = + handle->ipc ? uv__pipe_read_ipc(loop, handle) + : uv__pipe_read_data(loop, handle, avail, (DWORD) -1); + + /* If no bytes were read, treat this as an indication that an error + * occurred, and break out of the read loop. */ + if (bytes_read == 0) break; - } - if (avail == 0) { - /* There is nothing to read after all. */ + /* It is possible that more bytes were read than we thought were + * available. To prevent `avail` from underflowing, break out of the loop + * if this is the case. */ + if (bytes_read > avail) break; - } - if (handle->ipc) { - /* Use the IPC framing protocol to read the incoming data. */ - if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) { - /* We're reading a new frame. First, read the header. */ - assert(avail >= sizeof(ipc_frame.header)); - - if (!ReadFile(handle->handle, - &ipc_frame.header, - sizeof(ipc_frame.header), - &bytes, - NULL)) { - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), - uv_null_buf_); - break; - } - - assert(bytes == sizeof(ipc_frame.header)); - assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA | - UV_IPC_TCP_CONNECTION)); - - if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) { - assert(avail - sizeof(ipc_frame.header) >= - sizeof(ipc_frame.socket_info_ex)); - - /* Read the TCP socket info. */ - if (!ReadFile(handle->handle, - &ipc_frame.socket_info_ex, - sizeof(ipc_frame) - sizeof(ipc_frame.header), - &bytes, - NULL)) { - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), - uv_null_buf_); - break; - } - - assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); - - /* Store the pending socket info. */ - uv__pipe_insert_pending_socket( - handle, - &ipc_frame.socket_info_ex, - ipc_frame.header.flags & UV_IPC_TCP_CONNECTION); - } - - if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { - handle->pipe.conn.remaining_ipc_rawdata_bytes = - ipc_frame.header.raw_data_length; - continue; - } - } else { - avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes); - } - } - - buf = uv_buf_init(NULL, 0); - handle->alloc_cb((uv_handle_t*) handle, avail, &buf); - if (buf.base == NULL || buf.len == 0) { - handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); - break; - } - assert(buf.base != NULL); - - if (ReadFile(handle->handle, - buf.base, - min(buf.len, avail), - &bytes, - NULL)) { - /* Successful read */ - if (handle->ipc) { - assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes); - handle->pipe.conn.remaining_ipc_rawdata_bytes = - handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes; - } - handle->read_cb((uv_stream_t*)handle, bytes, &buf); - - /* Read again only if bytes == buf.len */ - if (bytes <= buf.len) { - break; - } - } else { - uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf); - break; - } + /* Recompute the number of bytes available. */ + avail -= bytes_read; } } @@ -1776,8 +1792,6 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, !(handle->flags & UV_HANDLE_READ_PENDING)) { uv_pipe_queue_read(loop, handle); } - - DECREASE_PENDING_REQ_COUNT(handle); } @@ -1803,27 +1817,19 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, } } - if (req->ipc_header) { - if (req == &handle->pipe.conn.ipc_header_write_req) { - req->type = UV_UNKNOWN_REQ; - } else { - uv__free(req); - } - } else { - err = GET_REQ_ERROR(req); + err = GET_REQ_ERROR(req); - /* If this was a coalesced write, extract pointer to the user_provided - * uv_write_t structure so we can pass the expected pointer to the - * callback, then free the heap-allocated write req. */ - if (req->coalesced) { - uv__coalesced_write_t* coalesced_write = - container_of(req, uv__coalesced_write_t, req); - req = coalesced_write->user_req; - uv__free(coalesced_write); - } - if (req->cb) { - req->cb(req, uv_translate_sys_error(err)); - } + /* If this was a coalesced write, extract pointer to the user_provided + * uv_write_t structure so we can pass the expected pointer to the callback, + * then free the heap-allocated write req. */ + if (req->coalesced) { + uv__coalesced_write_t* coalesced_write = + container_of(req, uv__coalesced_write_t, req); + req = coalesced_write->user_req; + uv__free(coalesced_write); + } + if (req->cb) { + req->cb(req, uv_translate_sys_error(err)); } handle->stream.conn.write_reqs_pending--; @@ -2080,8 +2086,8 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) { if (pipe->ipc) { assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); - pipe->pipe.conn.ipc_pid = uv_os_getppid(); - assert(pipe->pipe.conn.ipc_pid != -1); + pipe->pipe.conn.ipc_remote_pid = uv_os_getppid(); + assert(pipe->pipe.conn.ipc_remote_pid != -1); } return 0; } @@ -2212,7 +2218,7 @@ cleanup: int uv_pipe_pending_count(uv_pipe_t* handle) { if (!handle->ipc) return 0; - return handle->pipe.conn.pending_ipc_info.queue_len; + return handle->pipe.conn.ipc_xfer_queue_length; } @@ -2245,7 +2251,7 @@ int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) { uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { if (!handle->ipc) return UV_UNKNOWN_HANDLE; - if (handle->pipe.conn.pending_ipc_info.queue_len == 0) + if (handle->pipe.conn.ipc_xfer_queue_length == 0) return UV_UNKNOWN_HANDLE; else return UV_TCP; diff --git a/src/win/process.c b/src/win/process.c index 6b240760..08910088 100644 --- a/src/win/process.c +++ b/src/win/process.c @@ -1142,7 +1142,8 @@ int uv_spawn(uv_loop_t* loop, if (fdopt->flags & UV_CREATE_PIPE && fdopt->data.stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) fdopt->data.stream)->ipc) { - ((uv_pipe_t*) fdopt->data.stream)->pipe.conn.ipc_pid = info.dwProcessId; + ((uv_pipe_t*) fdopt->data.stream)->pipe.conn.ipc_remote_pid = + info.dwProcessId; } } diff --git a/src/win/stream.c b/src/win/stream.c index 6539dc9a..3273a03c 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -134,7 +134,8 @@ int uv_write(uv_write_t* req, err = uv_tcp_write(loop, req, (uv_tcp_t*) handle, bufs, nbufs, cb); break; case UV_NAMED_PIPE: - err = uv_pipe_write(loop, req, (uv_pipe_t*) handle, bufs, nbufs, cb); + err = uv__pipe_write( + loop, req, (uv_pipe_t*) handle, bufs, nbufs, NULL, cb); break; case UV_TTY: err = uv_tty_write(loop, req, (uv_tty_t*) handle, bufs, nbufs, cb); @@ -156,25 +157,18 @@ int uv_write2(uv_write_t* req, uv_loop_t* loop = handle->loop; int err; - if (!(handle->flags & UV_HANDLE_WRITABLE)) { + if (send_handle == NULL) { + return uv_write(req, handle, bufs, nbufs, cb); + } + + if (handle->type != UV_NAMED_PIPE || !((uv_pipe_t*) handle)->ipc) { + return UV_EINVAL; + } else if (!(handle->flags & UV_HANDLE_WRITABLE)) { return UV_EPIPE; } - err = ERROR_INVALID_PARAMETER; - switch (handle->type) { - case UV_NAMED_PIPE: - err = uv_pipe_write2(loop, - req, - (uv_pipe_t*) handle, - bufs, - nbufs, - send_handle, - cb); - break; - default: - assert(0); - } - + err = uv__pipe_write( + loop, req, (uv_pipe_t*) handle, bufs, nbufs, send_handle, cb); return uv_translate_sys_error(err); } diff --git a/src/win/tcp.c b/src/win/tcp.c index 2fe4c5cb..76286cfc 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -1192,13 +1192,46 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, } -int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex, - int tcp_connection) { +int uv__tcp_xfer_export(uv_tcp_t* handle, + int target_pid, + uv__ipc_socket_xfer_info_t* xfer_info) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { + /* We're about to share the socket with another process. Because this is a + * listening socket, we assume that the other process will be accepting + * connections on it. Thus, before sharing the socket with another process, + * we call listen here in the parent process. */ + if (!(handle->flags & UV_HANDLE_LISTENING)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { + return ERROR_NOT_SUPPORTED; + } + if (handle->delayed_error == 0 && + listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) { + handle->delayed_error = WSAGetLastError(); + } + } + } + + if (WSADuplicateSocketW( + handle->socket, target_pid, &xfer_info->socket_info)) { + return WSAGetLastError(); + } + xfer_info->delayed_error = handle->delayed_error; + xfer_info->flags = handle->flags & UV_HANDLE_CONNECTION; + + /* Mark the local copy of the handle as 'shared' so we behave in a way that's + * friendly to the process(es) that we share the socket with. */ + handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET; + + return 0; +} + + +int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) { int err; SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, FROM_PROTOCOL_INFO, - &socket_info_ex->socket_info, + &xfer_info->socket_info, 0, WSA_FLAG_OVERLAPPED); @@ -1206,26 +1239,21 @@ int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex, return WSAGetLastError(); } - err = uv_tcp_set_socket(tcp->loop, - tcp, - socket, - socket_info_ex->socket_info.iAddressFamily, - 1); + err = uv_tcp_set_socket( + tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1); if (err) { closesocket(socket); return err; } - if (tcp_connection) { + tcp->delayed_error = xfer_info->delayed_error; + tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET; + + if (xfer_info->flags & UV_HANDLE_CONNECTION) { uv_connection_init((uv_stream_t*)tcp); tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; } - tcp->flags |= UV_HANDLE_BOUND; - tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET; - - tcp->delayed_error = socket_info_ex->delayed_error; - tcp->loop->active_tcp_streams++; return 0; } @@ -1271,39 +1299,6 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) { } -int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, - LPWSAPROTOCOL_INFOW protocol_info) { - if (!(handle->flags & UV_HANDLE_CONNECTION)) { - /* - * We're about to share the socket with another process. Because - * this is a listening socket, we assume that the other process will - * be accepting connections on it. So, before sharing the socket - * with another process, we call listen here in the parent process. - */ - - if (!(handle->flags & UV_HANDLE_LISTENING)) { - if (!(handle->flags & UV_HANDLE_BOUND)) { - return ERROR_INVALID_PARAMETER; - } - - if (!(handle->delayed_error)) { - if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) { - handle->delayed_error = WSAGetLastError(); - } - } - } - } - - if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) { - return WSAGetLastError(); - } - - handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET; - - return 0; -} - - int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) { if (handle->flags & UV_HANDLE_CONNECTION) { return UV_EINVAL; diff --git a/src/win/util.c b/src/win/util.c index cce7274e..3e86ff15 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -74,10 +74,6 @@ static char *process_title; static CRITICAL_SECTION process_title_lock; -/* Cached copy of the process id, written once. */ -static DWORD current_pid = 0; - - /* Interval (in seconds) of the high-resolution clock. */ static double hrtime_interval_ = 0; @@ -359,14 +355,6 @@ uv_pid_t uv_os_getppid(void) { } -int uv_current_pid(void) { - if (current_pid == 0) { - current_pid = GetCurrentProcessId(); - } - return current_pid; -} - - char** uv_setup_args(int argc, char** argv) { return argv; }