win,pipe: fix IPC pipe deadlock
This fixes a bug where IPC pipe communication would deadlock when both ends of the pipe are written to simultaneously, and the kernel pipe buffer has already been filled up by earlier writes. The root cause of the deadlock is that, while writes to an IPC pipe are generally asynchronous, the IPC frame header is written synchronously. So when both ends of the pipe are sending a frame header at the same time, neither will read data off the pipe, causing both header writes to block indefinitely. Additionally, this patch somewhat reduces the spaghetti level in win/pipe.c. Fixes: https://github.com/libuv/libuv/issues/1099 Refs: https://github.com/nodejs/node/issues/7657 Refs: https://github.com/electron/electron/issues/10107 Refs: https://github.com/parcel-bundler/parcel/issues/637 Refs: https://github.com/parcel-bundler/parcel/issues/900 Refs: https://github.com/parcel-bundler/parcel/issues/1137 PR-URL: https://github.com/libuv/libuv/pull/1843 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Bartosz Sosnowski <bartosz@janeasystems.com> Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com>
This commit is contained in:
parent
421d7571a3
commit
4e53af9120
@ -368,11 +368,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
|
||||
} u; \
|
||||
struct uv_req_s* next_req;
|
||||
|
||||
#define UV_WRITE_PRIVATE_FIELDS \
|
||||
int coalesced; /* This ABI change will be un-done in a later commit. */ \
|
||||
int ipc_header; \
|
||||
uv_buf_t write_buffer; \
|
||||
HANDLE event_handle; \
|
||||
#define UV_WRITE_PRIVATE_FIELDS \
|
||||
int coalesced; \
|
||||
uv_buf_t write_buffer; \
|
||||
HANDLE event_handle; \
|
||||
HANDLE wait_handle;
|
||||
|
||||
#define UV_CONNECT_PRIVATE_FIELDS \
|
||||
@ -460,13 +459,14 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
|
||||
|
||||
#define uv_pipe_connection_fields \
|
||||
uv_timer_t* eof_timer; \
|
||||
uv_write_t ipc_header_write_req; \
|
||||
int ipc_pid; \
|
||||
uint64_t remaining_ipc_rawdata_bytes; \
|
||||
struct { \
|
||||
void* queue[2]; \
|
||||
int queue_len; \
|
||||
} pending_ipc_info; \
|
||||
uv_write_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \
|
||||
DWORD ipc_remote_pid; \
|
||||
union { \
|
||||
uint32_t payload_remaining; \
|
||||
uint64_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \
|
||||
} ipc_data_frame; \
|
||||
void* ipc_xfer_queue[2]; \
|
||||
int ipc_xfer_queue_length; \
|
||||
uv_write_t* non_overlapped_writes_tail; \
|
||||
CRITICAL_SECTION readfile_thread_lock; \
|
||||
volatile HANDLE readfile_thread_handle;
|
||||
|
||||
@ -126,8 +126,9 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled;
|
||||
|
||||
typedef struct {
|
||||
WSAPROTOCOL_INFOW socket_info;
|
||||
int delayed_error;
|
||||
} uv__ipc_socket_info_ex;
|
||||
uint32_t delayed_error;
|
||||
uint32_t flags; /* Either zero or UV_HANDLE_CONNECTION. */
|
||||
} uv__ipc_socket_xfer_info_t;
|
||||
|
||||
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
|
||||
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client);
|
||||
@ -149,11 +150,10 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp);
|
||||
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex,
|
||||
int tcp_connection);
|
||||
|
||||
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
LPWSAPROTOCOL_INFOW protocol_info);
|
||||
int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
int pid,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info);
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info);
|
||||
|
||||
|
||||
/*
|
||||
@ -178,11 +178,13 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client);
|
||||
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
||||
uv_read_cb read_cb);
|
||||
void uv__pipe_read_stop(uv_pipe_t* handle);
|
||||
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);
|
||||
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle,
|
||||
uv_write_cb cb);
|
||||
int uv__pipe_write(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[],
|
||||
size_t nbufs,
|
||||
uv_stream_t* send_handle,
|
||||
uv_write_cb cb);
|
||||
|
||||
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
uv_req_t* req);
|
||||
@ -329,7 +331,6 @@ void uv__fs_poll_endgame(uv_loop_t* loop, uv_fs_poll_t* handle);
|
||||
void uv__util_init(void);
|
||||
|
||||
uint64_t uv__hrtime(double scale);
|
||||
int uv_current_pid(void);
|
||||
__declspec(noreturn) void uv_fatal_error(const int errorno, const char* syscall);
|
||||
int uv__getpwuid_r(uv_passwd_t* pwd);
|
||||
int uv__convert_utf16_to_utf8(const WCHAR* utf16, int utf16len, char** utf8);
|
||||
|
||||
672
src/win/pipe.c
672
src/win/pipe.c
@ -21,9 +21,10 @@
|
||||
|
||||
#include <assert.h>
|
||||
#include <io.h>
|
||||
#include <string.h>
|
||||
#include <stdbool.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
#include "uv.h"
|
||||
#include "internal.h"
|
||||
@ -34,18 +35,6 @@
|
||||
#include <aclapi.h>
|
||||
#include <accctrl.h>
|
||||
|
||||
typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
|
||||
|
||||
struct uv__ipc_queue_item_s {
|
||||
/*
|
||||
* NOTE: It is important for socket_info_ex to be the first field,
|
||||
* because we will we assigning it to the pending_ipc_info.socket_info
|
||||
*/
|
||||
uv__ipc_socket_info_ex socket_info_ex;
|
||||
QUEUE member;
|
||||
int tcp_connection;
|
||||
};
|
||||
|
||||
/* A zero-size buffer for use by uv_pipe_read */
|
||||
static char uv_zero_[] = "";
|
||||
|
||||
@ -62,22 +51,20 @@ static const int default_pending_pipe_instances = 4;
|
||||
static char pipe_prefix[] = "\\\\?\\pipe";
|
||||
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
|
||||
|
||||
/* IPC protocol flags. */
|
||||
#define UV_IPC_RAW_DATA 0x0001
|
||||
#define UV_IPC_TCP_SERVER 0x0002
|
||||
#define UV_IPC_TCP_CONNECTION 0x0004
|
||||
/* IPC incoming xfer queue item. */
|
||||
typedef struct {
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
QUEUE member;
|
||||
} uv__ipc_xfer_queue_item_t;
|
||||
|
||||
/* IPC frame types. */
|
||||
enum { UV__IPC_DATA_FRAME = 0, UV__IPC_XFER_FRAME = 1 };
|
||||
|
||||
/* IPC frame header. */
|
||||
typedef struct {
|
||||
int flags;
|
||||
uint64_t raw_data_length;
|
||||
} uv_ipc_frame_header_t;
|
||||
|
||||
/* IPC frame, which contains an imported TCP socket stream. */
|
||||
typedef struct {
|
||||
uv_ipc_frame_header_t header;
|
||||
uv__ipc_socket_info_ex socket_info_ex;
|
||||
} uv_ipc_frame_uv_stream;
|
||||
uint32_t type;
|
||||
uint32_t payload_length;
|
||||
} uv__ipc_frame_header_t;
|
||||
|
||||
/* Coalesced write request. */
|
||||
typedef struct {
|
||||
@ -85,6 +72,7 @@ typedef struct {
|
||||
uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
|
||||
} uv__coalesced_write_t;
|
||||
|
||||
|
||||
static void eof_timer_init(uv_pipe_t* pipe);
|
||||
static void eof_timer_start(uv_pipe_t* pipe);
|
||||
static void eof_timer_stop(uv_pipe_t* pipe);
|
||||
@ -104,15 +92,13 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
||||
handle->reqs_pending = 0;
|
||||
handle->handle = INVALID_HANDLE_VALUE;
|
||||
handle->name = NULL;
|
||||
handle->pipe.conn.ipc_pid = 0;
|
||||
handle->pipe.conn.remaining_ipc_rawdata_bytes = 0;
|
||||
QUEUE_INIT(&handle->pipe.conn.pending_ipc_info.queue);
|
||||
handle->pipe.conn.pending_ipc_info.queue_len = 0;
|
||||
handle->pipe.conn.ipc_remote_pid = 0;
|
||||
handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
|
||||
QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
|
||||
handle->pipe.conn.ipc_xfer_queue_length = 0;
|
||||
handle->ipc = ipc;
|
||||
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
||||
|
||||
UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -351,7 +337,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
NTSTATUS nt_status;
|
||||
IO_STATUS_BLOCK io_status;
|
||||
FILE_PIPE_LOCAL_INFORMATION pipe_info;
|
||||
uv__ipc_queue_item_t* item;
|
||||
uv__ipc_xfer_queue_item_t* xfer_queue_item;
|
||||
|
||||
if ((handle->flags & UV_HANDLE_CONNECTION) &&
|
||||
handle->stream.conn.shutdown_req != NULL &&
|
||||
@ -428,27 +414,27 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
|
||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||
/* Free pending sockets */
|
||||
while (!QUEUE_EMPTY(&handle->pipe.conn.pending_ipc_info.queue)) {
|
||||
while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
|
||||
QUEUE* q;
|
||||
SOCKET socket;
|
||||
|
||||
q = QUEUE_HEAD(&handle->pipe.conn.pending_ipc_info.queue);
|
||||
q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
||||
xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
||||
|
||||
/* Materialize socket and close it */
|
||||
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
&item->socket_info_ex.socket_info,
|
||||
&xfer_queue_item->xfer_info.socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
uv__free(item);
|
||||
uv__free(xfer_queue_item);
|
||||
|
||||
if (socket != INVALID_SOCKET)
|
||||
closesocket(socket);
|
||||
}
|
||||
handle->pipe.conn.pending_ipc_info.queue_len = 0;
|
||||
handle->pipe.conn.ipc_xfer_queue_length = 0;
|
||||
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
||||
@ -879,23 +865,21 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
||||
uv_pipe_t* pipe_client;
|
||||
uv_pipe_accept_t* req;
|
||||
QUEUE* q;
|
||||
uv__ipc_queue_item_t* item;
|
||||
uv__ipc_xfer_queue_item_t* item;
|
||||
int err;
|
||||
|
||||
if (server->ipc) {
|
||||
if (QUEUE_EMPTY(&server->pipe.conn.pending_ipc_info.queue)) {
|
||||
if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
|
||||
/* No valid pending sockets. */
|
||||
return WSAEWOULDBLOCK;
|
||||
}
|
||||
|
||||
q = QUEUE_HEAD(&server->pipe.conn.pending_ipc_info.queue);
|
||||
q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
server->pipe.conn.pending_ipc_info.queue_len--;
|
||||
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
||||
server->pipe.conn.ipc_xfer_queue_length--;
|
||||
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
||||
|
||||
err = uv_tcp_import((uv_tcp_t*)client,
|
||||
&item->socket_info_ex,
|
||||
item->tcp_connection);
|
||||
err = uv__tcp_xfer_import((uv_tcp_t*) client, &item->xfer_info);
|
||||
if (err != 0)
|
||||
return err;
|
||||
|
||||
@ -1242,6 +1226,7 @@ static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int uv__build_coalesced_write_req(uv_write_t* user_req,
|
||||
const uv_buf_t bufs[],
|
||||
size_t nbufs,
|
||||
@ -1302,26 +1287,17 @@ static int uv__build_coalesced_write_req(uv_write_t* user_req,
|
||||
}
|
||||
|
||||
|
||||
static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
uv_stream_t* send_handle,
|
||||
uv_write_cb cb) {
|
||||
static int uv__pipe_write_data(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[],
|
||||
size_t nbufs,
|
||||
uv_stream_t* send_handle,
|
||||
uv_write_cb cb,
|
||||
bool copy_always) {
|
||||
int err;
|
||||
int result;
|
||||
uv_buf_t write_buf;
|
||||
uv_tcp_t* tcp_send_handle;
|
||||
uv_write_t* ipc_header_req = NULL;
|
||||
uv_ipc_frame_uv_stream ipc_frame;
|
||||
|
||||
/* Only TCP handles are supported for sharing. */
|
||||
if (send_handle && ((send_handle->type != UV_TCP) ||
|
||||
(!(send_handle->flags & UV_HANDLE_BOUND) &&
|
||||
!(send_handle->flags & UV_HANDLE_CONNECTION)))) {
|
||||
return ERROR_NOT_SUPPORTED;
|
||||
}
|
||||
|
||||
assert(handle->handle != INVALID_HANDLE_VALUE);
|
||||
|
||||
@ -1331,7 +1307,6 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
req->cb = cb;
|
||||
/* Private fields. */
|
||||
req->coalesced = 0;
|
||||
req->ipc_header = 0;
|
||||
req->event_handle = NULL;
|
||||
req->wait_handle = INVALID_HANDLE_VALUE;
|
||||
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
|
||||
@ -1340,7 +1315,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
if (nbufs == 0) {
|
||||
/* Write empty buffer. */
|
||||
write_buf = uv_null_buf_;
|
||||
} else if (nbufs == 1) {
|
||||
} else if (nbufs == 1 && !copy_always) {
|
||||
/* Write directly from bufs[0]. */
|
||||
write_buf = bufs[0];
|
||||
} else {
|
||||
@ -1351,112 +1326,6 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
return err;
|
||||
}
|
||||
|
||||
if (handle->ipc) {
|
||||
assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
|
||||
ipc_frame.header.flags = 0;
|
||||
|
||||
/* Use the IPC framing protocol. */
|
||||
if (send_handle) {
|
||||
tcp_send_handle = (uv_tcp_t*)send_handle;
|
||||
|
||||
if (handle->pipe.conn.ipc_pid == 0) {
|
||||
handle->pipe.conn.ipc_pid = uv_current_pid();
|
||||
}
|
||||
|
||||
err = uv_tcp_duplicate_socket(tcp_send_handle, handle->pipe.conn.ipc_pid,
|
||||
&ipc_frame.socket_info_ex.socket_info);
|
||||
if (err) {
|
||||
return err;
|
||||
}
|
||||
|
||||
ipc_frame.socket_info_ex.delayed_error = tcp_send_handle->delayed_error;
|
||||
|
||||
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
|
||||
|
||||
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
|
||||
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
|
||||
}
|
||||
}
|
||||
|
||||
if (nbufs > 0) {
|
||||
ipc_frame.header.flags |= UV_IPC_RAW_DATA;
|
||||
ipc_frame.header.raw_data_length = write_buf.len;
|
||||
}
|
||||
|
||||
/*
|
||||
* Use the provided req if we're only doing a single write.
|
||||
* If we're doing multiple writes, use ipc_header_write_req to do
|
||||
* the first write, and then use the provided req for the second write.
|
||||
*/
|
||||
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
|
||||
ipc_header_req = req;
|
||||
} else {
|
||||
/*
|
||||
* Try to use the preallocated write req if it's available.
|
||||
* Otherwise allocate a new one.
|
||||
*/
|
||||
if (handle->pipe.conn.ipc_header_write_req.type != UV_WRITE) {
|
||||
ipc_header_req = (uv_write_t*)&handle->pipe.conn.ipc_header_write_req;
|
||||
} else {
|
||||
ipc_header_req = (uv_write_t*)uv__malloc(sizeof(uv_write_t));
|
||||
if (!ipc_header_req) {
|
||||
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
||||
}
|
||||
}
|
||||
|
||||
UV_REQ_INIT(ipc_header_req, UV_WRITE);
|
||||
ipc_header_req->handle = (uv_stream_t*) handle;
|
||||
ipc_header_req->cb = NULL;
|
||||
ipc_header_req->coalesced = 0;
|
||||
ipc_header_req->ipc_header = 1;
|
||||
}
|
||||
|
||||
/* Write the header or the whole frame. */
|
||||
memset(&ipc_header_req->u.io.overlapped, 0,
|
||||
sizeof(ipc_header_req->u.io.overlapped));
|
||||
|
||||
/* Using overlapped IO, but wait for completion before returning.
|
||||
This write is blocking because ipc_frame is on stack. */
|
||||
ipc_header_req->u.io.overlapped.hEvent = CreateEvent(NULL, 1, 0, NULL);
|
||||
if (!ipc_header_req->u.io.overlapped.hEvent) {
|
||||
uv_fatal_error(GetLastError(), "CreateEvent");
|
||||
}
|
||||
|
||||
result = WriteFile(handle->handle,
|
||||
&ipc_frame,
|
||||
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
|
||||
sizeof(ipc_frame) : sizeof(ipc_frame.header),
|
||||
NULL,
|
||||
&ipc_header_req->u.io.overlapped);
|
||||
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
||||
err = GetLastError();
|
||||
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
||||
return err;
|
||||
}
|
||||
|
||||
if (!result) {
|
||||
/* Request not completed immediately. Wait for it.*/
|
||||
if (WaitForSingleObject(ipc_header_req->u.io.overlapped.hEvent, INFINITE) !=
|
||||
WAIT_OBJECT_0) {
|
||||
err = GetLastError();
|
||||
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
||||
return err;
|
||||
}
|
||||
}
|
||||
ipc_header_req->u.io.queued_bytes = 0;
|
||||
CloseHandle(ipc_header_req->u.io.overlapped.hEvent);
|
||||
ipc_header_req->u.io.overlapped.hEvent = NULL;
|
||||
|
||||
REGISTER_HANDLE_REQ(loop, handle, ipc_header_req);
|
||||
handle->reqs_pending++;
|
||||
handle->stream.conn.write_reqs_pending++;
|
||||
|
||||
/* If we don't have any raw data to write - we're done. */
|
||||
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if ((handle->flags &
|
||||
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
|
||||
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
|
||||
@ -1567,28 +1436,133 @@ static int uv_pipe_write_impl(uv_loop_t* loop,
|
||||
}
|
||||
|
||||
|
||||
int uv_pipe_write(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
uv_write_cb cb) {
|
||||
return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, NULL, cb);
|
||||
static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
|
||||
DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
|
||||
|
||||
/* If the both ends of the IPC pipe are owned by the same process,
|
||||
* the remote end pid may not yet be set. If so, do it here.
|
||||
* TODO: this is weird; it'd probably better to use a handshake. */
|
||||
if (*pid == 0)
|
||||
*pid = GetCurrentProcessId();
|
||||
|
||||
return *pid;
|
||||
}
|
||||
|
||||
|
||||
int uv_pipe_write2(uv_loop_t* loop,
|
||||
int uv__pipe_write_ipc(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t data_bufs[],
|
||||
size_t data_buf_count,
|
||||
uv_stream_t* send_handle,
|
||||
uv_write_cb cb) {
|
||||
uv_buf_t stack_bufs[6];
|
||||
uv_buf_t* bufs;
|
||||
size_t buf_count, buf_index;
|
||||
uv__ipc_frame_header_t xfer_frame_header;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
uv__ipc_frame_header_t data_frame_header;
|
||||
size_t data_length;
|
||||
size_t i;
|
||||
int err;
|
||||
|
||||
/* Compute the combined size of data buffers. */
|
||||
data_length = 0;
|
||||
for (i = 0; i < data_buf_count; i++)
|
||||
data_length += data_bufs[i].len;
|
||||
if (data_length > UINT32_MAX)
|
||||
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
|
||||
|
||||
/* Prepare xfer frame payload. */
|
||||
if (send_handle) {
|
||||
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
|
||||
|
||||
/* Verify that `send_handle` it is indeed a tcp handle. */
|
||||
if (send_tcp_handle->type != UV_TCP)
|
||||
return ERROR_NOT_SUPPORTED;
|
||||
|
||||
/* Export the tcp handle. */
|
||||
err = uv__tcp_xfer_export(
|
||||
send_tcp_handle, uv__pipe_get_ipc_remote_pid(handle), &xfer_info);
|
||||
if (err != 0)
|
||||
return err;
|
||||
}
|
||||
|
||||
/* Compute the number of uv_buf_t's required. */
|
||||
buf_count = 0;
|
||||
if (send_handle != NULL) {
|
||||
buf_count += 2; /* One for the frame header, one for the payload. */
|
||||
}
|
||||
if (data_buf_count > 0) {
|
||||
buf_count += 1 + data_buf_count; /* One extra for the frame header. */
|
||||
}
|
||||
|
||||
/* Use the on-stack buffer array if it is big enough; otherwise allocate
|
||||
* space for it on the heap. */
|
||||
if (buf_count < ARRAY_SIZE(stack_bufs)) {
|
||||
/* Use on-stack buffer array. */
|
||||
bufs = stack_bufs;
|
||||
} else {
|
||||
/* Use heap-allocated buffer array. */
|
||||
bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
|
||||
if (bufs == NULL)
|
||||
return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
|
||||
}
|
||||
buf_index = 0;
|
||||
|
||||
if (send_handle != NULL) {
|
||||
/* Add xfer frame header. */
|
||||
xfer_frame_header.type = UV__IPC_XFER_FRAME;
|
||||
xfer_frame_header.payload_length = sizeof xfer_info;
|
||||
bufs[buf_index++] =
|
||||
uv_buf_init((char*) &xfer_frame_header, sizeof xfer_frame_header);
|
||||
|
||||
/* Add xfer frame payload. */
|
||||
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
|
||||
}
|
||||
|
||||
if (data_length > 0) {
|
||||
/* Add data frame header. */
|
||||
data_frame_header.type = UV__IPC_DATA_FRAME;
|
||||
data_frame_header.payload_length = (uint32_t) data_length;
|
||||
bufs[buf_index++] =
|
||||
uv_buf_init((char*) &data_frame_header, sizeof data_frame_header);
|
||||
|
||||
/* Add data buffers. */
|
||||
for (i = 0; i < data_buf_count; i++)
|
||||
bufs[buf_index++] = data_bufs[i];
|
||||
}
|
||||
|
||||
/* Write buffers. We set the `always_copy` flag, so it is not a problem that
|
||||
* some of the written data lives on the stack. */
|
||||
err = uv__pipe_write_data(
|
||||
loop, req, handle, bufs, buf_count, send_handle, cb, true);
|
||||
|
||||
/* If we had to heap-allocate the bufs array, free it now. */
|
||||
if (bufs != stack_bufs) {
|
||||
uv__free(bufs);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
int uv__pipe_write(uv_loop_t* loop,
|
||||
uv_write_t* req,
|
||||
uv_pipe_t* handle,
|
||||
const uv_buf_t bufs[],
|
||||
unsigned int nbufs,
|
||||
size_t nbufs,
|
||||
uv_stream_t* send_handle,
|
||||
uv_write_cb cb) {
|
||||
if (!handle->ipc) {
|
||||
return WSAEINVAL;
|
||||
if (handle->ipc) {
|
||||
/* IPC pipe write: use framing protocol. */
|
||||
return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
|
||||
} else {
|
||||
/* Non-IPC pipe write: put data on the wire directly. */
|
||||
assert(send_handle == NULL);
|
||||
return uv__pipe_write_data(
|
||||
loop, req, handle, bufs, nbufs, NULL, cb, false);
|
||||
}
|
||||
|
||||
return uv_pipe_write_impl(loop, req, handle, bufs, nbufs, send_handle, cb);
|
||||
}
|
||||
|
||||
|
||||
@ -1627,147 +1601,189 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
}
|
||||
|
||||
|
||||
void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
|
||||
uv__ipc_socket_info_ex* info,
|
||||
int tcp_connection) {
|
||||
uv__ipc_queue_item_t* item;
|
||||
static void uv__pipe_queue_ipc_xfer_info(
|
||||
uv_pipe_t* handle, uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
uv__ipc_xfer_queue_item_t* item;
|
||||
|
||||
item = (uv__ipc_queue_item_t*) uv__malloc(sizeof(*item));
|
||||
item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
|
||||
if (item == NULL)
|
||||
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
||||
|
||||
memcpy(&item->socket_info_ex, info, sizeof(item->socket_info_ex));
|
||||
item->tcp_connection = tcp_connection;
|
||||
QUEUE_INSERT_TAIL(&handle->pipe.conn.pending_ipc_info.queue, &item->member);
|
||||
handle->pipe.conn.pending_ipc_info.queue_len++;
|
||||
memcpy(&item->xfer_info, xfer_info, sizeof(item->xfer_info));
|
||||
QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
|
||||
handle->pipe.conn.ipc_xfer_queue_length++;
|
||||
}
|
||||
|
||||
|
||||
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
uv_req_t* req) {
|
||||
DWORD bytes, avail;
|
||||
uv_buf_t buf;
|
||||
uv_ipc_frame_uv_stream ipc_frame;
|
||||
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
|
||||
* encountered before the requested number of bytes are read, an error is
|
||||
* returned. */
|
||||
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
|
||||
DWORD bytes_read, bytes_read_now;
|
||||
|
||||
bytes_read = 0;
|
||||
while (bytes_read < count) {
|
||||
if (!ReadFile(h,
|
||||
(char*) buffer + bytes_read,
|
||||
count - bytes_read,
|
||||
&bytes_read_now,
|
||||
NULL)) {
|
||||
return GetLastError();
|
||||
}
|
||||
|
||||
bytes_read += bytes_read_now;
|
||||
}
|
||||
|
||||
assert(bytes_read == count);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static DWORD uv__pipe_read_data(uv_loop_t* loop,
|
||||
uv_pipe_t* handle,
|
||||
DWORD suggested_bytes,
|
||||
DWORD max_bytes) {
|
||||
DWORD bytes_read;
|
||||
uv_buf_t buf;
|
||||
|
||||
/* Ask the user for a buffer to read data into. */
|
||||
buf = uv_buf_init(NULL, 0);
|
||||
handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
|
||||
if (buf.base == NULL || buf.len == 0) {
|
||||
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
||||
return 0; /* Break out of read loop. */
|
||||
}
|
||||
|
||||
/* Ensure we read at most the smaller of:
|
||||
* (a) the length of the user-allocated buffer.
|
||||
* (b) the maximum data length as specified by the `max_bytes` argument.
|
||||
*/
|
||||
max_bytes = min(buf.len, max_bytes);
|
||||
|
||||
/* Read into the user buffer. */
|
||||
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
|
||||
return 0; /* Break out of read loop. */
|
||||
}
|
||||
|
||||
/* Call the read callback. */
|
||||
handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
|
||||
|
||||
return bytes_read;
|
||||
}
|
||||
|
||||
|
||||
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
DWORD* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
|
||||
int err;
|
||||
|
||||
if (*data_remaining > 0) {
|
||||
/* Read data frame payload. */
|
||||
DWORD bytes_read =
|
||||
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
|
||||
*data_remaining -= bytes_read;
|
||||
return bytes_read;
|
||||
|
||||
} else {
|
||||
/* Start of a new IPC frame. */
|
||||
uv__ipc_frame_header_t frame_header;
|
||||
uv__ipc_socket_xfer_info_t xfer_info;
|
||||
|
||||
/* Read the IPC frame header. */
|
||||
err = uv__pipe_read_exactly(
|
||||
handle->handle, &frame_header, sizeof frame_header);
|
||||
if (err)
|
||||
goto error;
|
||||
|
||||
if (frame_header.type == UV__IPC_DATA_FRAME) {
|
||||
/* Data frame: capture payload length. Actual data will be read in
|
||||
* subsequent call to uv__pipe_read_ipc(). */
|
||||
*data_remaining = frame_header.payload_length;
|
||||
|
||||
/* Return number of bytes read. */
|
||||
return sizeof frame_header;
|
||||
|
||||
} else if (frame_header.type == UV__IPC_XFER_FRAME) {
|
||||
/* Xfer frame: read the payload. */
|
||||
assert(frame_header.payload_length == sizeof xfer_info);
|
||||
err =
|
||||
uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
|
||||
if (err)
|
||||
goto error;
|
||||
|
||||
/* Store the pending socket info. */
|
||||
uv__pipe_queue_ipc_xfer_info(handle, &xfer_info);
|
||||
|
||||
/* Return number of bytes read. */
|
||||
return sizeof frame_header + sizeof xfer_info;
|
||||
}
|
||||
|
||||
/* Invalid frame. */
|
||||
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
|
||||
}
|
||||
|
||||
error:
|
||||
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
|
||||
return 0; /* Break out of read loop. */
|
||||
}
|
||||
|
||||
|
||||
void uv_process_pipe_read_req(uv_loop_t* loop,
|
||||
uv_pipe_t* handle,
|
||||
uv_req_t* req) {
|
||||
assert(handle->type == UV_NAMED_PIPE);
|
||||
|
||||
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
|
||||
DECREASE_PENDING_REQ_COUNT(handle);
|
||||
eof_timer_stop(handle);
|
||||
|
||||
/* At this point, we're done with bookkeeping. If the user has stopped
|
||||
* reading the pipe in the meantime, there is nothing left to do, since there
|
||||
* is no callback that we can call. */
|
||||
if (!(handle->flags & UV_HANDLE_READING))
|
||||
return;
|
||||
|
||||
if (!REQ_SUCCESS(req)) {
|
||||
/* An error occurred doing the zero-read.
|
||||
* Note that if the read was cancelled by uv__pipe_interrupt_read(), the
|
||||
* request may indicate an ERROR_OPERATION_ABORTED error. This error isn't
|
||||
* relevant to the user; we'll restart the read by queueing a new read
|
||||
* request below. */
|
||||
if (handle->flags & UV_HANDLE_READING &&
|
||||
GET_REQ_ERROR(req) != ERROR_OPERATION_ABORTED) {
|
||||
uv_pipe_read_error_or_eof(loop,
|
||||
handle,
|
||||
GET_REQ_ERROR(req),
|
||||
uv_null_buf_);
|
||||
}
|
||||
/* An error occurred doing the zero-read. */
|
||||
DWORD err = GET_REQ_ERROR(req);
|
||||
|
||||
/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
|
||||
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
|
||||
* the user; we'll start a new zero-read at the end of this function. */
|
||||
if (err != ERROR_OPERATION_ABORTED)
|
||||
uv_pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
|
||||
|
||||
} else {
|
||||
/* Do non-blocking reads until the buffer is empty */
|
||||
while (handle->flags & UV_HANDLE_READING) {
|
||||
if (!PeekNamedPipe(handle->handle,
|
||||
NULL,
|
||||
0,
|
||||
NULL,
|
||||
&avail,
|
||||
NULL)) {
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
|
||||
/* The zero-read completed without error, indicating there is data
|
||||
* available in the kernel buffer. */
|
||||
DWORD avail;
|
||||
|
||||
/* Get the number of bytes available. */
|
||||
avail = 0;
|
||||
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
|
||||
|
||||
/* Read until we've either read all the bytes available, or the 'reading'
|
||||
* flag is cleared. */
|
||||
while (avail > 0 && handle->flags & UV_HANDLE_READING) {
|
||||
/* Depending on the type of pipe, read either IPC frames or raw data. */
|
||||
DWORD bytes_read =
|
||||
handle->ipc ? uv__pipe_read_ipc(loop, handle)
|
||||
: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
|
||||
|
||||
/* If no bytes were read, treat this as an indication that an error
|
||||
* occurred, and break out of the read loop. */
|
||||
if (bytes_read == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
if (avail == 0) {
|
||||
/* There is nothing to read after all. */
|
||||
/* It is possible that more bytes were read than we thought were
|
||||
* available. To prevent `avail` from underflowing, break out of the loop
|
||||
* if this is the case. */
|
||||
if (bytes_read > avail)
|
||||
break;
|
||||
}
|
||||
|
||||
if (handle->ipc) {
|
||||
/* Use the IPC framing protocol to read the incoming data. */
|
||||
if (handle->pipe.conn.remaining_ipc_rawdata_bytes == 0) {
|
||||
/* We're reading a new frame. First, read the header. */
|
||||
assert(avail >= sizeof(ipc_frame.header));
|
||||
|
||||
if (!ReadFile(handle->handle,
|
||||
&ipc_frame.header,
|
||||
sizeof(ipc_frame.header),
|
||||
&bytes,
|
||||
NULL)) {
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
|
||||
uv_null_buf_);
|
||||
break;
|
||||
}
|
||||
|
||||
assert(bytes == sizeof(ipc_frame.header));
|
||||
assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
|
||||
UV_IPC_TCP_CONNECTION));
|
||||
|
||||
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
|
||||
assert(avail - sizeof(ipc_frame.header) >=
|
||||
sizeof(ipc_frame.socket_info_ex));
|
||||
|
||||
/* Read the TCP socket info. */
|
||||
if (!ReadFile(handle->handle,
|
||||
&ipc_frame.socket_info_ex,
|
||||
sizeof(ipc_frame) - sizeof(ipc_frame.header),
|
||||
&bytes,
|
||||
NULL)) {
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
|
||||
uv_null_buf_);
|
||||
break;
|
||||
}
|
||||
|
||||
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
|
||||
|
||||
/* Store the pending socket info. */
|
||||
uv__pipe_insert_pending_socket(
|
||||
handle,
|
||||
&ipc_frame.socket_info_ex,
|
||||
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
|
||||
}
|
||||
|
||||
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
|
||||
handle->pipe.conn.remaining_ipc_rawdata_bytes =
|
||||
ipc_frame.header.raw_data_length;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
avail = min(avail, (DWORD)handle->pipe.conn.remaining_ipc_rawdata_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
buf = uv_buf_init(NULL, 0);
|
||||
handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
|
||||
if (buf.base == NULL || buf.len == 0) {
|
||||
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
||||
break;
|
||||
}
|
||||
assert(buf.base != NULL);
|
||||
|
||||
if (ReadFile(handle->handle,
|
||||
buf.base,
|
||||
min(buf.len, avail),
|
||||
&bytes,
|
||||
NULL)) {
|
||||
/* Successful read */
|
||||
if (handle->ipc) {
|
||||
assert(handle->pipe.conn.remaining_ipc_rawdata_bytes >= bytes);
|
||||
handle->pipe.conn.remaining_ipc_rawdata_bytes =
|
||||
handle->pipe.conn.remaining_ipc_rawdata_bytes - bytes;
|
||||
}
|
||||
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
|
||||
|
||||
/* Read again only if bytes == buf.len */
|
||||
if (bytes <= buf.len) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
|
||||
break;
|
||||
}
|
||||
/* Recompute the number of bytes available. */
|
||||
avail -= bytes_read;
|
||||
}
|
||||
}
|
||||
|
||||
@ -1776,8 +1792,6 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
!(handle->flags & UV_HANDLE_READ_PENDING)) {
|
||||
uv_pipe_queue_read(loop, handle);
|
||||
}
|
||||
|
||||
DECREASE_PENDING_REQ_COUNT(handle);
|
||||
}
|
||||
|
||||
|
||||
@ -1803,27 +1817,19 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
}
|
||||
}
|
||||
|
||||
if (req->ipc_header) {
|
||||
if (req == &handle->pipe.conn.ipc_header_write_req) {
|
||||
req->type = UV_UNKNOWN_REQ;
|
||||
} else {
|
||||
uv__free(req);
|
||||
}
|
||||
} else {
|
||||
err = GET_REQ_ERROR(req);
|
||||
err = GET_REQ_ERROR(req);
|
||||
|
||||
/* If this was a coalesced write, extract pointer to the user_provided
|
||||
* uv_write_t structure so we can pass the expected pointer to the
|
||||
* callback, then free the heap-allocated write req. */
|
||||
if (req->coalesced) {
|
||||
uv__coalesced_write_t* coalesced_write =
|
||||
container_of(req, uv__coalesced_write_t, req);
|
||||
req = coalesced_write->user_req;
|
||||
uv__free(coalesced_write);
|
||||
}
|
||||
if (req->cb) {
|
||||
req->cb(req, uv_translate_sys_error(err));
|
||||
}
|
||||
/* If this was a coalesced write, extract pointer to the user_provided
|
||||
* uv_write_t structure so we can pass the expected pointer to the callback,
|
||||
* then free the heap-allocated write req. */
|
||||
if (req->coalesced) {
|
||||
uv__coalesced_write_t* coalesced_write =
|
||||
container_of(req, uv__coalesced_write_t, req);
|
||||
req = coalesced_write->user_req;
|
||||
uv__free(coalesced_write);
|
||||
}
|
||||
if (req->cb) {
|
||||
req->cb(req, uv_translate_sys_error(err));
|
||||
}
|
||||
|
||||
handle->stream.conn.write_reqs_pending--;
|
||||
@ -2080,8 +2086,8 @@ int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
|
||||
|
||||
if (pipe->ipc) {
|
||||
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
|
||||
pipe->pipe.conn.ipc_pid = uv_os_getppid();
|
||||
assert(pipe->pipe.conn.ipc_pid != -1);
|
||||
pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
|
||||
assert(pipe->pipe.conn.ipc_remote_pid != -1);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@ -2212,7 +2218,7 @@ cleanup:
|
||||
int uv_pipe_pending_count(uv_pipe_t* handle) {
|
||||
if (!handle->ipc)
|
||||
return 0;
|
||||
return handle->pipe.conn.pending_ipc_info.queue_len;
|
||||
return handle->pipe.conn.ipc_xfer_queue_length;
|
||||
}
|
||||
|
||||
|
||||
@ -2245,7 +2251,7 @@ int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
||||
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
|
||||
if (!handle->ipc)
|
||||
return UV_UNKNOWN_HANDLE;
|
||||
if (handle->pipe.conn.pending_ipc_info.queue_len == 0)
|
||||
if (handle->pipe.conn.ipc_xfer_queue_length == 0)
|
||||
return UV_UNKNOWN_HANDLE;
|
||||
else
|
||||
return UV_TCP;
|
||||
|
||||
@ -1142,7 +1142,8 @@ int uv_spawn(uv_loop_t* loop,
|
||||
if (fdopt->flags & UV_CREATE_PIPE &&
|
||||
fdopt->data.stream->type == UV_NAMED_PIPE &&
|
||||
((uv_pipe_t*) fdopt->data.stream)->ipc) {
|
||||
((uv_pipe_t*) fdopt->data.stream)->pipe.conn.ipc_pid = info.dwProcessId;
|
||||
((uv_pipe_t*) fdopt->data.stream)->pipe.conn.ipc_remote_pid =
|
||||
info.dwProcessId;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -134,7 +134,8 @@ int uv_write(uv_write_t* req,
|
||||
err = uv_tcp_write(loop, req, (uv_tcp_t*) handle, bufs, nbufs, cb);
|
||||
break;
|
||||
case UV_NAMED_PIPE:
|
||||
err = uv_pipe_write(loop, req, (uv_pipe_t*) handle, bufs, nbufs, cb);
|
||||
err = uv__pipe_write(
|
||||
loop, req, (uv_pipe_t*) handle, bufs, nbufs, NULL, cb);
|
||||
break;
|
||||
case UV_TTY:
|
||||
err = uv_tty_write(loop, req, (uv_tty_t*) handle, bufs, nbufs, cb);
|
||||
@ -156,25 +157,18 @@ int uv_write2(uv_write_t* req,
|
||||
uv_loop_t* loop = handle->loop;
|
||||
int err;
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_WRITABLE)) {
|
||||
if (send_handle == NULL) {
|
||||
return uv_write(req, handle, bufs, nbufs, cb);
|
||||
}
|
||||
|
||||
if (handle->type != UV_NAMED_PIPE || !((uv_pipe_t*) handle)->ipc) {
|
||||
return UV_EINVAL;
|
||||
} else if (!(handle->flags & UV_HANDLE_WRITABLE)) {
|
||||
return UV_EPIPE;
|
||||
}
|
||||
|
||||
err = ERROR_INVALID_PARAMETER;
|
||||
switch (handle->type) {
|
||||
case UV_NAMED_PIPE:
|
||||
err = uv_pipe_write2(loop,
|
||||
req,
|
||||
(uv_pipe_t*) handle,
|
||||
bufs,
|
||||
nbufs,
|
||||
send_handle,
|
||||
cb);
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
||||
err = uv__pipe_write(
|
||||
loop, req, (uv_pipe_t*) handle, bufs, nbufs, send_handle, cb);
|
||||
return uv_translate_sys_error(err);
|
||||
}
|
||||
|
||||
|
||||
@ -1192,13 +1192,46 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex,
|
||||
int tcp_connection) {
|
||||
int uv__tcp_xfer_export(uv_tcp_t* handle,
|
||||
int target_pid,
|
||||
uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
|
||||
/* We're about to share the socket with another process. Because this is a
|
||||
* listening socket, we assume that the other process will be accepting
|
||||
* connections on it. Thus, before sharing the socket with another process,
|
||||
* we call listen here in the parent process. */
|
||||
if (!(handle->flags & UV_HANDLE_LISTENING)) {
|
||||
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
||||
return ERROR_NOT_SUPPORTED;
|
||||
}
|
||||
if (handle->delayed_error == 0 &&
|
||||
listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
|
||||
handle->delayed_error = WSAGetLastError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (WSADuplicateSocketW(
|
||||
handle->socket, target_pid, &xfer_info->socket_info)) {
|
||||
return WSAGetLastError();
|
||||
}
|
||||
xfer_info->delayed_error = handle->delayed_error;
|
||||
xfer_info->flags = handle->flags & UV_HANDLE_CONNECTION;
|
||||
|
||||
/* Mark the local copy of the handle as 'shared' so we behave in a way that's
|
||||
* friendly to the process(es) that we share the socket with. */
|
||||
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv__tcp_xfer_import(uv_tcp_t* tcp, uv__ipc_socket_xfer_info_t* xfer_info) {
|
||||
int err;
|
||||
SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
FROM_PROTOCOL_INFO,
|
||||
&socket_info_ex->socket_info,
|
||||
&xfer_info->socket_info,
|
||||
0,
|
||||
WSA_FLAG_OVERLAPPED);
|
||||
|
||||
@ -1206,26 +1239,21 @@ int uv_tcp_import(uv_tcp_t* tcp, uv__ipc_socket_info_ex* socket_info_ex,
|
||||
return WSAGetLastError();
|
||||
}
|
||||
|
||||
err = uv_tcp_set_socket(tcp->loop,
|
||||
tcp,
|
||||
socket,
|
||||
socket_info_ex->socket_info.iAddressFamily,
|
||||
1);
|
||||
err = uv_tcp_set_socket(
|
||||
tcp->loop, tcp, socket, xfer_info->socket_info.iAddressFamily, 1);
|
||||
if (err) {
|
||||
closesocket(socket);
|
||||
return err;
|
||||
}
|
||||
|
||||
if (tcp_connection) {
|
||||
tcp->delayed_error = xfer_info->delayed_error;
|
||||
tcp->flags |= UV_HANDLE_BOUND | UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
if (xfer_info->flags & UV_HANDLE_CONNECTION) {
|
||||
uv_connection_init((uv_stream_t*)tcp);
|
||||
tcp->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
||||
}
|
||||
|
||||
tcp->flags |= UV_HANDLE_BOUND;
|
||||
tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
tcp->delayed_error = socket_info_ex->delayed_error;
|
||||
|
||||
tcp->loop->active_tcp_streams++;
|
||||
return 0;
|
||||
}
|
||||
@ -1271,39 +1299,6 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
LPWSAPROTOCOL_INFOW protocol_info) {
|
||||
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
|
||||
/*
|
||||
* We're about to share the socket with another process. Because
|
||||
* this is a listening socket, we assume that the other process will
|
||||
* be accepting connections on it. So, before sharing the socket
|
||||
* with another process, we call listen here in the parent process.
|
||||
*/
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_LISTENING)) {
|
||||
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
||||
return ERROR_INVALID_PARAMETER;
|
||||
}
|
||||
|
||||
if (!(handle->delayed_error)) {
|
||||
if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) {
|
||||
handle->delayed_error = WSAGetLastError();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) {
|
||||
return WSAGetLastError();
|
||||
}
|
||||
|
||||
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
|
||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||
return UV_EINVAL;
|
||||
|
||||
@ -74,10 +74,6 @@
|
||||
static char *process_title;
|
||||
static CRITICAL_SECTION process_title_lock;
|
||||
|
||||
/* Cached copy of the process id, written once. */
|
||||
static DWORD current_pid = 0;
|
||||
|
||||
|
||||
/* Interval (in seconds) of the high-resolution clock. */
|
||||
static double hrtime_interval_ = 0;
|
||||
|
||||
@ -359,14 +355,6 @@ uv_pid_t uv_os_getppid(void) {
|
||||
}
|
||||
|
||||
|
||||
int uv_current_pid(void) {
|
||||
if (current_pid == 0) {
|
||||
current_pid = GetCurrentProcessId();
|
||||
}
|
||||
return current_pid;
|
||||
}
|
||||
|
||||
|
||||
char** uv_setup_args(int argc, char** argv) {
|
||||
return argv;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user