win,pipe: restore compatibility with the old IPC framing protocol
Fixes: https://github.com/libuv/libuv/issues/1922 Refs: https://github.com/nodejs/node/issues/21671 PR-URL: https://github.com/libuv/libuv/pull/1923 Reviewed-By: Bartosz Sosnowski <bartosz@janeasystems.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
parent
31a06f25e1
commit
27ba662811
@ -61,10 +61,15 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
|
||||
* TCP
|
||||
*/
|
||||
|
||||
typedef enum {
|
||||
UV__IPC_SOCKET_XFER_NONE = 0,
|
||||
UV__IPC_SOCKET_XFER_TCP_CONNECTION,
|
||||
UV__IPC_SOCKET_XFER_TCP_SERVER
|
||||
} uv__ipc_socket_xfer_type_t;
|
||||
|
||||
typedef struct {
|
||||
WSAPROTOCOL_INFOW socket_info;
|
||||
uint32_t delayed_error;
|
||||
uint32_t flags; /* Either zero or UV_HANDLE_CONNECTION. */
|
||||
} uv__ipc_socket_xfer_info_t;
|
||||
|
||||
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
|
||||
@ -89,8 +94,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
|
||||
|
||||
int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
int pid,
|
||||
uv__ipc_socket_xfer_type_t* xfer_type,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info);
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp,
|
||||
uv__ipc_socket_xfer_type_t xfer_type,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info);
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info);
|
||||
|
||||
|
||||
/*
|
||||
|
||||
176
src/win/pipe.c
176
src/win/pipe.c
@ -25,11 +25,12 @@
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "uv.h"
|
||||
#include "internal.h"
|
||||
#include "handle-inl.h"
|
||||
#include "stream-inl.h"
|
||||
#include "internal.h"
|
||||
#include "req-inl.h"
|
||||
#include "stream-inl.h"
|
||||
#include "uv-common.h"
|
||||
#include "uv.h"
|
||||
|
||||
#include <aclapi.h>
|
||||
#include <accctrl.h>
|
||||
@ -52,19 +53,36 @@ static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
|
||||
|
||||
/* IPC incoming xfer queue item. */
|
||||
typedef struct {
|
||||
uv__ipc_socket_xfer_type_t xfer_type;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
QUEUE member;
|
||||
} uv__ipc_xfer_queue_item_t;
|
||||
|
||||
/* IPC frame types. */
|
||||
enum { UV__IPC_DATA_FRAME = 0, UV__IPC_XFER_FRAME = 1 };
|
||||
/* IPC frame header flags. */
|
||||
/* clang-format off */
|
||||
enum {
|
||||
UV__IPC_FRAME_HAS_DATA = 0x01,
|
||||
UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
|
||||
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
|
||||
/* These are combinations of the flags above. */
|
||||
UV__IPC_FRAME_XFER_FLAGS = 0x06,
|
||||
UV__IPC_FRAME_VALID_FLAGS = 0x07
|
||||
};
|
||||
/* clang-format on */
|
||||
|
||||
/* IPC frame header. */
|
||||
typedef struct {
|
||||
uint32_t type;
|
||||
uint32_t payload_length;
|
||||
uint32_t flags;
|
||||
uint32_t reserved1; /* Ignored. */
|
||||
uint32_t data_length; /* Must be zero if there is no data. */
|
||||
uint32_t reserved2; /* Must be zero. */
|
||||
} uv__ipc_frame_header_t;
|
||||
|
||||
/* To implement the IPC protocol correctly, these structures must have exactly
|
||||
* the right size. */
|
||||
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
|
||||
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
|
||||
|
||||
/* Coalesced write request. */
|
||||
typedef struct {
|
||||
uv_write_t req; /* Internal heap-allocated write request. */
|
||||
@ -878,7 +896,8 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
||||
server->pipe.conn.ipc_xfer_queue_length--;
|
||||
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
||||
|
||||
err = uv__tcp_xfer_import((uv_tcp_t*) client, &item->xfer_info);
|
||||
err = uv__tcp_xfer_import(
|
||||
(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
|
||||
if (err != 0)
|
||||
return err;
|
||||
|
||||
@ -1458,10 +1477,10 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
|
||||
uv_buf_t stack_bufs[6];
|
||||
uv_buf_t* bufs;
|
||||
size_t buf_count, buf_index;
|
||||
uv__ipc_frame_header_t xfer_frame_header;
|
||||
uv__ipc_frame_header_t frame_header;
|
||||
uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
uv__ipc_frame_header_t data_frame_header;
|
||||
size_t data_length;
|
||||
uint64_t data_length;
|
||||
size_t i;
|
||||
int err;
|
||||
|
||||
@ -1472,8 +1491,8 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
|
||||
if (data_length > UINT32_MAX)
|
||||
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
|
||||
|
||||
/* Prepare xfer frame payload. */
|
||||
if (send_handle) {
|
||||
/* Prepare the frame's socket xfer payload. */
|
||||
if (send_handle != NULL) {
|
||||
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
|
||||
|
||||
/* Verify that `send_handle` it is indeed a tcp handle. */
|
||||
@ -1481,20 +1500,18 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
|
||||
return ERROR_NOT_SUPPORTED;
|
||||
|
||||
/* Export the tcp handle. */
|
||||
err = uv__tcp_xfer_export(
|
||||
send_tcp_handle, uv__pipe_get_ipc_remote_pid(handle), &xfer_info);
|
||||
err = uv__tcp_xfer_export(send_tcp_handle,
|
||||
uv__pipe_get_ipc_remote_pid(handle),
|
||||
&xfer_type,
|
||||
&xfer_info);
|
||||
if (err != 0)
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Compute the number of uv_buf_t's required. */
|
||||
buf_count = 0;
|
||||
if (send_handle != NULL) {
|
||||
buf_count += 2; /* One for the frame header, one for the payload. */
|
||||
}
|
||||
if (data_buf_count > 0) {
|
||||
buf_count += 1 + data_buf_count; /* One extra for the frame header. */
|
||||
}
|
||||
buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
|
||||
if (send_handle != NULL)
|
||||
buf_count += 1; /* One extra for the socket xfer information. */
|
||||
|
||||
/* Use the on-stack buffer array if it is big enough; otherwise allocate
|
||||
* space for it on the heap. */
|
||||
@ -1509,25 +1526,32 @@ int uv__pipe_write_ipc(uv_loop_t* loop,
|
||||
}
|
||||
buf_index = 0;
|
||||
|
||||
if (send_handle != NULL) {
|
||||
/* Add xfer frame header. */
|
||||
xfer_frame_header.type = UV__IPC_XFER_FRAME;
|
||||
xfer_frame_header.payload_length = sizeof xfer_info;
|
||||
bufs[buf_index++] =
|
||||
uv_buf_init((char*) &xfer_frame_header, sizeof xfer_frame_header);
|
||||
/* Initialize frame header and add it to the buffers list. */
|
||||
memset(&frame_header, 0, sizeof frame_header);
|
||||
bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
|
||||
|
||||
/* Add xfer frame payload. */
|
||||
if (send_handle != NULL) {
|
||||
/* Add frame header flags. */
|
||||
switch (xfer_type) {
|
||||
case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
|
||||
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
|
||||
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
|
||||
break;
|
||||
case UV__IPC_SOCKET_XFER_TCP_SERVER:
|
||||
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
|
||||
break;
|
||||
default:
|
||||
assert(0); // Unreachable.
|
||||
}
|
||||
/* Add xfer info buffer. */
|
||||
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
|
||||
}
|
||||
|
||||
if (data_length > 0) {
|
||||
/* Add data frame header. */
|
||||
data_frame_header.type = UV__IPC_DATA_FRAME;
|
||||
data_frame_header.payload_length = (uint32_t) data_length;
|
||||
bufs[buf_index++] =
|
||||
uv_buf_init((char*) &data_frame_header, sizeof data_frame_header);
|
||||
|
||||
/* Add data buffers. */
|
||||
/* Update frame header. */
|
||||
frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
|
||||
frame_header.data_length = (uint32_t) data_length;
|
||||
/* Add data buffers to buffers list. */
|
||||
for (i = 0; i < data_buf_count; i++)
|
||||
bufs[buf_index++] = data_bufs[i];
|
||||
}
|
||||
@ -1601,14 +1625,18 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
|
||||
|
||||
static void uv__pipe_queue_ipc_xfer_info(
|
||||
uv_pipe_t* handle, uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
uv_pipe_t* handle,
|
||||
uv__ipc_socket_xfer_type_t xfer_type,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
uv__ipc_xfer_queue_item_t* item;
|
||||
|
||||
item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
|
||||
if (item == NULL)
|
||||
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
||||
|
||||
memcpy(&item->xfer_info, xfer_info, sizeof(item->xfer_info));
|
||||
item->xfer_type = xfer_type;
|
||||
item->xfer_info = *xfer_info;
|
||||
|
||||
QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
|
||||
handle->pipe.conn.ipc_xfer_queue_length++;
|
||||
}
|
||||
@ -1678,7 +1706,7 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
int err;
|
||||
|
||||
if (*data_remaining > 0) {
|
||||
/* Read data frame payload. */
|
||||
/* Read frame data payload. */
|
||||
DWORD bytes_read =
|
||||
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
|
||||
*data_remaining -= bytes_read;
|
||||
@ -1687,6 +1715,8 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
} else {
|
||||
/* Start of a new IPC frame. */
|
||||
uv__ipc_frame_header_t frame_header;
|
||||
uint32_t xfer_flags;
|
||||
uv__ipc_socket_xfer_type_t xfer_type;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
|
||||
/* Read the IPC frame header. */
|
||||
@ -1695,33 +1725,57 @@ static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
if (err)
|
||||
goto error;
|
||||
|
||||
if (frame_header.type == UV__IPC_DATA_FRAME) {
|
||||
/* Data frame: capture payload length. Actual data will be read in
|
||||
* subsequent call to uv__pipe_read_ipc(). */
|
||||
*data_remaining = frame_header.payload_length;
|
||||
/* Validate that flags are valid. */
|
||||
if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
|
||||
goto invalid;
|
||||
/* Validate that reserved2 is zero. */
|
||||
if (frame_header.reserved2 != 0)
|
||||
goto invalid;
|
||||
|
||||
/* Return number of bytes read. */
|
||||
return sizeof frame_header;
|
||||
|
||||
} else if (frame_header.type == UV__IPC_XFER_FRAME) {
|
||||
/* Xfer frame: read the payload. */
|
||||
assert(frame_header.payload_length == sizeof xfer_info);
|
||||
err =
|
||||
uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
|
||||
if (err)
|
||||
goto error;
|
||||
|
||||
/* Store the pending socket info. */
|
||||
uv__pipe_queue_ipc_xfer_info(handle, &xfer_info);
|
||||
|
||||
/* Return number of bytes read. */
|
||||
return sizeof frame_header + sizeof xfer_info;
|
||||
/* Parse xfer flags. */
|
||||
xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
|
||||
if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
|
||||
/* Socket coming -- determine the type. */
|
||||
xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
|
||||
? UV__IPC_SOCKET_XFER_TCP_CONNECTION
|
||||
: UV__IPC_SOCKET_XFER_TCP_SERVER;
|
||||
} else if (xfer_flags == 0) {
|
||||
/* No socket. */
|
||||
xfer_type = UV__IPC_SOCKET_XFER_NONE;
|
||||
} else {
|
||||
/* Invalid flags. */
|
||||
goto invalid;
|
||||
}
|
||||
|
||||
/* Invalid frame. */
|
||||
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
|
||||
/* Parse data frame information. */
|
||||
if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
|
||||
*data_remaining = frame_header.data_length;
|
||||
} else if (frame_header.data_length != 0) {
|
||||
/* Data length greater than zero but data flag not set -- invalid. */
|
||||
goto invalid;
|
||||
}
|
||||
|
||||
/* If no socket xfer info follows, return here. Data will be read in a
|
||||
* subsequent invocation of uv__pipe_read_ipc(). */
|
||||
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
|
||||
return sizeof frame_header; /* Number of bytes read. */
|
||||
|
||||
/* Read transferred socket information. */
|
||||
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
|
||||
if (err)
|
||||
goto error;
|
||||
|
||||
/* Store the pending socket info. */
|
||||
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
|
||||
|
||||
/* Return number of bytes read. */
|
||||
return sizeof frame_header + sizeof xfer_info;
|
||||
}
|
||||
|
||||
invalid:
|
||||
/* Invalid frame. */
|
||||
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
|
||||
|
||||
error:
|
||||
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
|
||||
return 0; /* Break out of read loop. */
|
||||
|
||||
@ -1191,8 +1191,12 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
|
||||
int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
int target_pid,
|
||||
uv__ipc_socket_xfer_type_t* xfer_type,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
|
||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||
*xfer_type = UV__IPC_SOCKET_XFER_TCP_CONNECTION;
|
||||
} else {
|
||||
*xfer_type = UV__IPC_SOCKET_XFER_TCP_SERVER;
|
||||
/* We're about to share the socket with another process. Because this is a
|
||||
* listening socket, we assume that the other process will be accepting
|
||||
* connections on it. Thus, before sharing the socket with another process,
|
||||
@ -1208,12 +1212,9 @@ int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
}
|
||||
}
|
||||
|
||||
if (WSADuplicateSocketW(
|
||||
handle->socket, target_pid, &xfer_info->socket_info)) {
|
||||
if (WSADuplicateSocketW(handle->socket, target_pid, &xfer_info->socket_info))
|
||||
return WSAGetLastError();
|
||||
}
|
||||
xfer_info->delayed_error = handle->delayed_error;
|
||||
xfer_info->flags = handle->flags & UV_HANDLE_CONNECTION;
|
||||
|
||||
/* Mark the local copy of the handle as 'shared' so we behave in a way that's
|
||||
* friendly to the process(es) that we share the socket with. */
|
||||
@ -1223,14 +1224,21 @@ int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
}
|
||||
|
||||
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp,
|
||||
uv__ipc_socket_xfer_type_t xfer_type,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
int err;
|
||||
SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
&xfer_info->socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
SOCKET socket;
|
||||
|
||||
assert(xfer_type == UV__IPC_SOCKET_XFER_TCP_SERVER ||
|
||||
xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION);
|
||||
|
||||
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
&xfer_info->socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
|
||||
if (socket == INVALID_SOCKET) {
|
||||
return WSAGetLastError();
|
||||
@ -1246,7 +1254,7 @@ int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
tcp->delayed_error = xfer_info->delayed_error;
|
||||
tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
if (xfer_info->flags & UV_HANDLE_CONNECTION) {
|
||||
if (xfer_type == UV__IPC_SOCKET_XFER_TCP_CONNECTION) {
|
||||
uv_connection_init((uv_stream_t*)tcp);
|
||||
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user