From 27ba66281199bdcade823677af8dedc161152fb6 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 13 Jul 2018 00:06:05 +0200 Subject: [PATCH] win,pipe: restore compatibility with the old IPC framing protocol Fixes: https://github.com/libuv/libuv/issues/1922 Refs: https://github.com/nodejs/node/issues/21671 PR-URL: https://github.com/libuv/libuv/pull/1923 Reviewed-By: Bartosz Sosnowski Reviewed-By: Colin Ihrig --- src/win/internal.h | 12 +++- src/win/pipe.c | 176 +++++++++++++++++++++++++++++---------------- src/win/tcp.c | 34 +++++---- 3 files changed, 146 insertions(+), 76 deletions(-) diff --git a/src/win/internal.h b/src/win/internal.h index b37b4c0c..634b9f77 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -61,10 +61,15 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled; * TCP */ +typedef enum { + UV__IPC_SOCKET_XFER_NONE = 0, + UV__IPC_SOCKET_XFER_TCP_CONNECTION, + UV__IPC_SOCKET_XFER_TCP_SERVER +} uv__ipc_socket_xfer_type_t; + typedef struct { WSAPROTOCOL_INFOW socket_info; uint32_t delayed_error; - uint32_t flags; /* Either zero or UV_HANDLE_CONNECTION. */ } uv__ipc_socket_xfer_info_t; int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); @@ -89,8 +94,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); int uv__tcp_xfer_export(uv_tcp_t* handle, int pid, + uv__ipc_socket_xfer_type_t* xfer_type, + uv__ipc_socket_xfer_info_t* xfer_info); +int uv__tcp_xfer_import(uv_tcp_t* tcp, + uv__ipc_socket_xfer_type_t xfer_type, uv__ipc_socket_xfer_info_t* xfer_info); -int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info); /* diff --git a/src/win/pipe.c b/src/win/pipe.c index 42380f65..382290e6 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -25,11 +25,12 @@ #include #include -#include "uv.h" -#include "internal.h" #include "handle-inl.h" -#include "stream-inl.h" +#include "internal.h" #include "req-inl.h" +#include "stream-inl.h" +#include "uv-common.h" +#include "uv.h" #include #include @@ -52,19 +53,36 @@ static const int pipe_prefix_len = sizeof(pipe_prefix) - 1; /* IPC incoming xfer queue item. */ typedef struct { + uv__ipc_socket_xfer_type_t xfer_type; uv__ipc_socket_xfer_info_t xfer_info; QUEUE member; } uv__ipc_xfer_queue_item_t; -/* IPC frame types. */ -enum { UV__IPC_DATA_FRAME = 0, UV__IPC_XFER_FRAME = 1 }; +/* IPC frame header flags. */ +/* clang-format off */ +enum { + UV__IPC_FRAME_HAS_DATA = 0x01, + UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02, + UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04, + /* These are combinations of the flags above. */ + UV__IPC_FRAME_XFER_FLAGS = 0x06, + UV__IPC_FRAME_VALID_FLAGS = 0x07 +}; +/* clang-format on */ /* IPC frame header. */ typedef struct { - uint32_t type; - uint32_t payload_length; + uint32_t flags; + uint32_t reserved1; /* Ignored. */ + uint32_t data_length; /* Must be zero if there is no data. */ + uint32_t reserved2; /* Must be zero. */ } uv__ipc_frame_header_t; +/* To implement the IPC protocol correctly, these structures must have exactly + * the right size. */ +STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16); +STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632); + /* Coalesced write request. */ typedef struct { uv_write_t req; /* Internal heap-allocated write request. */ @@ -878,7 +896,8 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { server->pipe.conn.ipc_xfer_queue_length--; item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); - err = uv__tcp_xfer_import((uv_tcp_t*) client, &item->xfer_info); + err = uv__tcp_xfer_import( + (uv_tcp_t*) client, item->xfer_type, &item->xfer_info); if (err != 0) return err; @@ -1458,10 +1477,10 @@ int uv__pipe_write_ipc(uv_loop_t* loop, uv_buf_t stack_bufs[6]; uv_buf_t* bufs; size_t buf_count, buf_index; - uv__ipc_frame_header_t xfer_frame_header; + uv__ipc_frame_header_t frame_header; + uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE; uv__ipc_socket_xfer_info_t xfer_info; - uv__ipc_frame_header_t data_frame_header; - size_t data_length; + uint64_t data_length; size_t i; int err; @@ -1472,8 +1491,8 @@ int uv__pipe_write_ipc(uv_loop_t* loop, if (data_length > UINT32_MAX) return WSAENOBUFS; /* Maps to UV_ENOBUFS. */ - /* Prepare xfer frame payload. */ - if (send_handle) { + /* Prepare the frame's socket xfer payload. */ + if (send_handle != NULL) { uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle; /* Verify that `send_handle` it is indeed a tcp handle. */ @@ -1481,20 +1500,18 @@ int uv__pipe_write_ipc(uv_loop_t* loop, return ERROR_NOT_SUPPORTED; /* Export the tcp handle. */ - err = uv__tcp_xfer_export( - send_tcp_handle, uv__pipe_get_ipc_remote_pid(handle), &xfer_info); + err = uv__tcp_xfer_export(send_tcp_handle, + uv__pipe_get_ipc_remote_pid(handle), + &xfer_type, + &xfer_info); if (err != 0) return err; } /* Compute the number of uv_buf_t's required. */ - buf_count = 0; - if (send_handle != NULL) { - buf_count += 2; /* One for the frame header, one for the payload. */ - } - if (data_buf_count > 0) { - buf_count += 1 + data_buf_count; /* One extra for the frame header. */ - } + buf_count = 1 + data_buf_count; /* Frame header and data buffers. */ + if (send_handle != NULL) + buf_count += 1; /* One extra for the socket xfer information. */ /* Use the on-stack buffer array if it is big enough; otherwise allocate * space for it on the heap. */ @@ -1509,25 +1526,32 @@ int uv__pipe_write_ipc(uv_loop_t* loop, } buf_index = 0; - if (send_handle != NULL) { - /* Add xfer frame header. */ - xfer_frame_header.type = UV__IPC_XFER_FRAME; - xfer_frame_header.payload_length = sizeof xfer_info; - bufs[buf_index++] = - uv_buf_init((char*) &xfer_frame_header, sizeof xfer_frame_header); + /* Initialize frame header and add it to the buffers list. */ + memset(&frame_header, 0, sizeof frame_header); + bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header); - /* Add xfer frame payload. */ + if (send_handle != NULL) { + /* Add frame header flags. */ + switch (xfer_type) { + case UV__IPC_SOCKET_XFER_TCP_CONNECTION: + frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER | + UV__IPC_FRAME_XFER_IS_TCP_CONNECTION; + break; + case UV__IPC_SOCKET_XFER_TCP_SERVER: + frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER; + break; + default: + assert(0); // Unreachable. + } + /* Add xfer info buffer. */ bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info); } if (data_length > 0) { - /* Add data frame header. */ - data_frame_header.type = UV__IPC_DATA_FRAME; - data_frame_header.payload_length = (uint32_t) data_length; - bufs[buf_index++] = - uv_buf_init((char*) &data_frame_header, sizeof data_frame_header); - - /* Add data buffers. */ + /* Update frame header. */ + frame_header.flags |= UV__IPC_FRAME_HAS_DATA; + frame_header.data_length = (uint32_t) data_length; + /* Add data buffers to buffers list. */ for (i = 0; i < data_buf_count; i++) bufs[buf_index++] = data_bufs[i]; } @@ -1601,14 +1625,18 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, static void uv__pipe_queue_ipc_xfer_info( - uv_pipe_t* handle, uv__ipc_socket_xfer_info_t* xfer_info) { + uv_pipe_t* handle, + uv__ipc_socket_xfer_type_t xfer_type, + uv__ipc_socket_xfer_info_t* xfer_info) { uv__ipc_xfer_queue_item_t* item; item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item)); if (item == NULL) uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc"); - memcpy(&item->xfer_info, xfer_info, sizeof(item->xfer_info)); + item->xfer_type = xfer_type; + item->xfer_info = *xfer_info; + QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member); handle->pipe.conn.ipc_xfer_queue_length++; } @@ -1678,7 +1706,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { int err; if (*data_remaining > 0) { - /* Read data frame payload. */ + /* Read frame data payload. */ DWORD bytes_read = uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining); *data_remaining -= bytes_read; @@ -1687,6 +1715,8 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { } else { /* Start of a new IPC frame. */ uv__ipc_frame_header_t frame_header; + uint32_t xfer_flags; + uv__ipc_socket_xfer_type_t xfer_type; uv__ipc_socket_xfer_info_t xfer_info; /* Read the IPC frame header. */ @@ -1695,33 +1725,57 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) { if (err) goto error; - if (frame_header.type == UV__IPC_DATA_FRAME) { - /* Data frame: capture payload length. Actual data will be read in - * subsequent call to uv__pipe_read_ipc(). */ - *data_remaining = frame_header.payload_length; + /* Validate that flags are valid. */ + if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0) + goto invalid; + /* Validate that reserved2 is zero. */ + if (frame_header.reserved2 != 0) + goto invalid; - /* Return number of bytes read. */ - return sizeof frame_header; - - } else if (frame_header.type == UV__IPC_XFER_FRAME) { - /* Xfer frame: read the payload. */ - assert(frame_header.payload_length == sizeof xfer_info); - err = - uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info); - if (err) - goto error; - - /* Store the pending socket info. */ - uv__pipe_queue_ipc_xfer_info(handle, &xfer_info); - - /* Return number of bytes read. */ - return sizeof frame_header + sizeof xfer_info; + /* Parse xfer flags. */ + xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS; + if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) { + /* Socket coming -- determine the type. */ + xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION + ? UV__IPC_SOCKET_XFER_TCP_CONNECTION + : UV__IPC_SOCKET_XFER_TCP_SERVER; + } else if (xfer_flags == 0) { + /* No socket. */ + xfer_type = UV__IPC_SOCKET_XFER_NONE; + } else { + /* Invalid flags. */ + goto invalid; } - /* Invalid frame. */ - err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ + /* Parse data frame information. */ + if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) { + *data_remaining = frame_header.data_length; + } else if (frame_header.data_length != 0) { + /* Data length greater than zero but data flag not set -- invalid. */ + goto invalid; + } + + /* If no socket xfer info follows, return here. Data will be read in a + * subsequent invocation of uv__pipe_read_ipc(). */ + if (xfer_type == UV__IPC_SOCKET_XFER_NONE) + return sizeof frame_header; /* Number of bytes read. */ + + /* Read transferred socket information. */ + err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info); + if (err) + goto error; + + /* Store the pending socket info. */ + uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info); + + /* Return number of bytes read. */ + return sizeof frame_header + sizeof xfer_info; } +invalid: + /* Invalid frame. */ + err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */ + error: uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_); return 0; /* Break out of read loop. */ diff --git a/src/win/tcp.c b/src/win/tcp.c index a97ab2a5..8b6f0a5c 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -1191,8 +1191,12 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, int uv__tcp_xfer_export(uv_tcp_t* handle, int target_pid, + uv__ipc_socket_xfer_type_t* xfer_type, uv__ipc_socket_xfer_info_t* xfer_info) { - if (!(handle->flags & UV_HANDLE_CONNECTION)) { + if (handle->flags & UV_HANDLE_CONNECTION) { + *xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION; + } else { + *xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER; /* We're about to share the socket with another process. Because this is a * listening socket, we assume that the other process will be accepting * connections on it. Thus, before sharing the socket with another process, @@ -1208,12 +1212,9 @@ int uv__tcp_xfer_export(uv_tcp_t* handle, } } - if (WSADuplicateSocketW( - handle->socket, target_pid, &xfer_info->socket_info)) { + if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info)) return WSAGetLastError(); - } xfer_info->delayed_error = handle->delayed_error; - xfer_info->flags = handle->flags & UV_HANDLE_CONNECTION; /* Mark the local copy of the handle as 'shared' so we behave in a way that's * friendly to the process(es) that we share the socket with. */ @@ -1223,14 +1224,21 @@ int uv__tcp_xfer_export(uv_tcp_t* handle, } -int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) { +int uv__tcp_xfer_import(uv_tcp_t* tcp, + uv__ipc_socket_xfer_type_t xfer_type, + uv__ipc_socket_xfer_info_t* xfer_info) { int err; - SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO, - FROM_PROTOCOL_INFO, - FROM_PROTOCOL_INFO, - &xfer_info->socket_info, - 0, - WSA_FLAG_OVERLAPPED); + SOCKET socket; + + assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER || + xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION); + + socket = WSASocketW(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + &xfer_info->socket_info, + 0, + WSA_FLAG_OVERLAPPED); if (socket == INVALID_SOCKET) { return WSAGetLastError(); @@ -1246,7 +1254,7 @@ int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) { tcp->delayed_error = xfer_info->delayed_error; tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET; - if (xfer_info->flags & UV_HANDLE_CONNECTION) { + if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) { uv_connection_init((uv_stream_t*)tcp); tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; }