Split this off from endgame, so that it can be handled separately and earlier, rather than trying to detect inside endgame which case we are in. There appear to be some race conditions on the `handle` field still however, which this does not yet attempt to address.
2642 lines
77 KiB
C
2642 lines
77 KiB
C
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to
|
|
* deal in the Software without restriction, including without limitation the
|
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
* sell copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
* IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include <assert.h>
|
|
#include <io.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
#include "handle-inl.h"
|
|
#include "internal.h"
|
|
#include "req-inl.h"
|
|
#include "stream-inl.h"
|
|
#include "uv-common.h"
|
|
#include "uv.h"
|
|
|
|
#include <aclapi.h>
|
|
#include <accctrl.h>
|
|
|
|
/* A zero-size buffer for use by uv_pipe_read */
|
|
static char uv_zero_[] = "";
|
|
|
|
/* Null uv_buf_t */
|
|
static const uv_buf_t uv_null_buf_ = { 0, NULL };
|
|
|
|
/* The timeout that the pipe will wait for the remote end to write data when
|
|
* the local ends wants to shut it down. */
|
|
static const int64_t eof_timeout = 50; /* ms */
|
|
|
|
static const int default_pending_pipe_instances = 4;
|
|
|
|
/* Pipe prefix */
|
|
static char pipe_prefix[] = "\\\\?\\pipe";
|
|
static const int pipe_prefix_len = sizeof(pipe_prefix) - 1;
|
|
|
|
/* IPC incoming xfer queue item. */
|
|
typedef struct {
|
|
uv__ipc_socket_xfer_type_t xfer_type;
|
|
uv__ipc_socket_xfer_info_t xfer_info;
|
|
QUEUE member;
|
|
} uv__ipc_xfer_queue_item_t;
|
|
|
|
/* IPC frame header flags. */
|
|
/* clang-format off */
|
|
enum {
|
|
UV__IPC_FRAME_HAS_DATA = 0x01,
|
|
UV__IPC_FRAME_HAS_SOCKET_XFER = 0x02,
|
|
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION = 0x04,
|
|
/* These are combinations of the flags above. */
|
|
UV__IPC_FRAME_XFER_FLAGS = 0x06,
|
|
UV__IPC_FRAME_VALID_FLAGS = 0x07
|
|
};
|
|
/* clang-format on */
|
|
|
|
/* IPC frame header. */
|
|
typedef struct {
|
|
uint32_t flags;
|
|
uint32_t reserved1; /* Ignored. */
|
|
uint32_t data_length; /* Must be zero if there is no data. */
|
|
uint32_t reserved2; /* Must be zero. */
|
|
} uv__ipc_frame_header_t;
|
|
|
|
/* To implement the IPC protocol correctly, these structures must have exactly
|
|
* the right size. */
|
|
STATIC_ASSERT(sizeof(uv__ipc_frame_header_t) == 16);
|
|
STATIC_ASSERT(sizeof(uv__ipc_socket_xfer_info_t) == 632);
|
|
|
|
/* Coalesced write request. */
|
|
typedef struct {
|
|
uv_write_t req; /* Internal heap-allocated write request. */
|
|
uv_write_t* user_req; /* Pointer to user-specified uv_write_t. */
|
|
} uv__coalesced_write_t;
|
|
|
|
|
|
static void eof_timer_init(uv_pipe_t* pipe);
|
|
static void eof_timer_start(uv_pipe_t* pipe);
|
|
static void eof_timer_stop(uv_pipe_t* pipe);
|
|
static void eof_timer_cb(uv_timer_t* timer);
|
|
static void eof_timer_destroy(uv_pipe_t* pipe);
|
|
static void eof_timer_close_cb(uv_handle_t* handle);
|
|
|
|
|
|
static void uv__unique_pipe_name(char* ptr, char* name, size_t size) {
|
|
snprintf(name, size, "\\\\?\\pipe\\uv\\%p-%lu", ptr, GetCurrentProcessId());
|
|
}
|
|
|
|
|
|
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
|
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
|
|
|
|
handle->reqs_pending = 0;
|
|
handle->handle = INVALID_HANDLE_VALUE;
|
|
handle->name = NULL;
|
|
handle->pipe.conn.ipc_remote_pid = 0;
|
|
handle->pipe.conn.ipc_data_frame.payload_remaining = 0;
|
|
QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue);
|
|
handle->pipe.conn.ipc_xfer_queue_length = 0;
|
|
handle->ipc = ipc;
|
|
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void uv__pipe_connection_init(uv_pipe_t* handle) {
|
|
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
|
|
uv__connection_init((uv_stream_t*) handle);
|
|
handle->read_req.data = handle;
|
|
handle->pipe.conn.eof_timer = NULL;
|
|
}
|
|
|
|
|
|
static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) {
|
|
HANDLE pipeHandle;
|
|
|
|
/*
|
|
* Assume that we have a duplex pipe first, so attempt to
|
|
* connect with GENERIC_READ | GENERIC_WRITE.
|
|
*/
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_READ | GENERIC_WRITE,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
|
return pipeHandle;
|
|
}
|
|
|
|
/*
|
|
* If the pipe is not duplex CreateFileW fails with
|
|
* ERROR_ACCESS_DENIED. In that case try to connect
|
|
* as a read-only or write-only.
|
|
*/
|
|
if (GetLastError() == ERROR_ACCESS_DENIED) {
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_READABLE;
|
|
return pipeHandle;
|
|
}
|
|
}
|
|
|
|
if (GetLastError() == ERROR_ACCESS_DENIED) {
|
|
pipeHandle = CreateFileW(name,
|
|
GENERIC_WRITE | FILE_READ_ATTRIBUTES,
|
|
0,
|
|
NULL,
|
|
OPEN_EXISTING,
|
|
FILE_FLAG_OVERLAPPED,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
*duplex_flags = UV_HANDLE_WRITABLE;
|
|
return pipeHandle;
|
|
}
|
|
}
|
|
|
|
return INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
|
|
static void close_pipe(uv_pipe_t* pipe) {
|
|
assert(pipe->u.fd == -1 || pipe->u.fd > 2);
|
|
if (pipe->u.fd == -1)
|
|
CloseHandle(pipe->handle);
|
|
else
|
|
close(pipe->u.fd);
|
|
|
|
pipe->u.fd = -1;
|
|
pipe->handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
|
|
static int uv__pipe_server(
|
|
HANDLE* pipeHandle_ptr, DWORD access,
|
|
char* name, size_t nameSize, char* random) {
|
|
HANDLE pipeHandle;
|
|
int err;
|
|
|
|
for (;;) {
|
|
uv__unique_pipe_name(random, name, nameSize);
|
|
|
|
pipeHandle = CreateNamedPipeA(name,
|
|
access | FILE_FLAG_FIRST_PIPE_INSTANCE,
|
|
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
|
|
NULL);
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
/* No name collisions. We're done. */
|
|
break;
|
|
}
|
|
|
|
err = GetLastError();
|
|
if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
|
|
goto error;
|
|
}
|
|
|
|
/* Pipe name collision. Increment the random number and try again. */
|
|
random++;
|
|
}
|
|
|
|
*pipeHandle_ptr = pipeHandle;
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (pipeHandle != INVALID_HANDLE_VALUE)
|
|
CloseHandle(pipeHandle);
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
static int uv__create_pipe_pair(
|
|
HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
|
|
unsigned int server_flags, unsigned int client_flags,
|
|
int inherit_client, char* random) {
|
|
/* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
|
|
char pipe_name[64];
|
|
SECURITY_ATTRIBUTES sa;
|
|
DWORD server_access;
|
|
DWORD client_access;
|
|
HANDLE server_pipe;
|
|
HANDLE client_pipe;
|
|
int err;
|
|
|
|
server_pipe = INVALID_HANDLE_VALUE;
|
|
client_pipe = INVALID_HANDLE_VALUE;
|
|
|
|
server_access = 0;
|
|
if (server_flags & UV_READABLE_PIPE)
|
|
server_access |= PIPE_ACCESS_INBOUND;
|
|
if (server_flags & UV_WRITABLE_PIPE)
|
|
server_access |= PIPE_ACCESS_OUTBOUND;
|
|
if (server_flags & UV_NONBLOCK_PIPE)
|
|
server_access |= FILE_FLAG_OVERLAPPED;
|
|
server_access |= WRITE_DAC;
|
|
|
|
client_access = 0;
|
|
if (client_flags & UV_READABLE_PIPE)
|
|
client_access |= GENERIC_READ;
|
|
else
|
|
client_access |= FILE_READ_ATTRIBUTES;
|
|
if (client_flags & UV_WRITABLE_PIPE)
|
|
client_access |= GENERIC_WRITE;
|
|
else
|
|
client_access |= FILE_WRITE_ATTRIBUTES;
|
|
client_access |= WRITE_DAC;
|
|
|
|
/* Create server pipe handle. */
|
|
err = uv__pipe_server(&server_pipe,
|
|
server_access,
|
|
pipe_name,
|
|
sizeof(pipe_name),
|
|
random);
|
|
if (err)
|
|
goto error;
|
|
|
|
/* Create client pipe handle. */
|
|
sa.nLength = sizeof sa;
|
|
sa.lpSecurityDescriptor = NULL;
|
|
sa.bInheritHandle = inherit_client;
|
|
|
|
client_pipe = CreateFileA(pipe_name,
|
|
client_access,
|
|
0,
|
|
&sa,
|
|
OPEN_EXISTING,
|
|
(client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
|
|
NULL);
|
|
if (client_pipe == INVALID_HANDLE_VALUE) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
#ifndef NDEBUG
|
|
/* Validate that the pipe was opened in the right mode. */
|
|
{
|
|
DWORD mode;
|
|
BOOL r;
|
|
r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
|
|
if (r == TRUE) {
|
|
assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
|
|
} else {
|
|
fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
|
|
}
|
|
}
|
|
#endif
|
|
|
|
/* Do a blocking ConnectNamedPipe. This should not block because we have
|
|
* both ends of the pipe created. */
|
|
if (!ConnectNamedPipe(server_pipe, NULL)) {
|
|
if (GetLastError() != ERROR_PIPE_CONNECTED) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
}
|
|
|
|
*client_pipe_ptr = client_pipe;
|
|
*server_pipe_ptr = server_pipe;
|
|
return 0;
|
|
|
|
error:
|
|
if (server_pipe != INVALID_HANDLE_VALUE)
|
|
CloseHandle(server_pipe);
|
|
|
|
if (client_pipe != INVALID_HANDLE_VALUE)
|
|
CloseHandle(client_pipe);
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
|
|
uv_file temp[2];
|
|
int err;
|
|
HANDLE readh;
|
|
HANDLE writeh;
|
|
|
|
/* Make the server side the inbound (read) end, */
|
|
/* so that both ends will have FILE_READ_ATTRIBUTES permission. */
|
|
/* TODO: better source of local randomness than &fds? */
|
|
read_flags |= UV_READABLE_PIPE;
|
|
write_flags |= UV_WRITABLE_PIPE;
|
|
err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
|
|
if (err != 0)
|
|
return err;
|
|
temp[0] = _open_osfhandle((intptr_t) readh, 0);
|
|
if (temp[0] == -1) {
|
|
if (errno == UV_EMFILE)
|
|
err = UV_EMFILE;
|
|
else
|
|
err = UV_UNKNOWN;
|
|
CloseHandle(readh);
|
|
CloseHandle(writeh);
|
|
return err;
|
|
}
|
|
temp[1] = _open_osfhandle((intptr_t) writeh, 0);
|
|
if (temp[1] == -1) {
|
|
if (errno == UV_EMFILE)
|
|
err = UV_EMFILE;
|
|
else
|
|
err = UV_UNKNOWN;
|
|
_close(temp[0]);
|
|
CloseHandle(writeh);
|
|
return err;
|
|
}
|
|
fds[0] = temp[0];
|
|
fds[1] = temp[1];
|
|
return 0;
|
|
}
|
|
|
|
|
|
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
|
|
uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
|
|
/* The parent_pipe is always the server_pipe and kept by libuv.
|
|
* The child_pipe is always the client_pipe and is passed to the child.
|
|
* The flags are specified with respect to their usage in the child. */
|
|
HANDLE server_pipe;
|
|
HANDLE client_pipe;
|
|
unsigned int server_flags;
|
|
unsigned int client_flags;
|
|
int err;
|
|
|
|
uv__pipe_connection_init(parent_pipe);
|
|
|
|
server_pipe = INVALID_HANDLE_VALUE;
|
|
client_pipe = INVALID_HANDLE_VALUE;
|
|
|
|
server_flags = 0;
|
|
client_flags = 0;
|
|
if (flags & UV_READABLE_PIPE) {
|
|
/* The server needs inbound (read) access too, otherwise CreateNamedPipe()
|
|
* won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
|
|
* the state of the write buffer when we're trying to shutdown the pipe. */
|
|
server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
|
|
client_flags |= UV_READABLE_PIPE;
|
|
}
|
|
if (flags & UV_WRITABLE_PIPE) {
|
|
server_flags |= UV_READABLE_PIPE;
|
|
client_flags |= UV_WRITABLE_PIPE;
|
|
}
|
|
server_flags |= UV_NONBLOCK_PIPE;
|
|
if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
|
|
client_flags |= UV_NONBLOCK_PIPE;
|
|
}
|
|
|
|
err = uv__create_pipe_pair(&server_pipe, &client_pipe,
|
|
server_flags, client_flags, 1, (char*) server_pipe);
|
|
if (err)
|
|
goto error;
|
|
|
|
if (CreateIoCompletionPort(server_pipe,
|
|
loop->iocp,
|
|
(ULONG_PTR) parent_pipe,
|
|
0) == NULL) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
parent_pipe->handle = server_pipe;
|
|
*child_pipe_ptr = client_pipe;
|
|
|
|
/* The server end is now readable and/or writable. */
|
|
if (flags & UV_READABLE_PIPE)
|
|
parent_pipe->flags |= UV_HANDLE_WRITABLE;
|
|
if (flags & UV_WRITABLE_PIPE)
|
|
parent_pipe->flags |= UV_HANDLE_READABLE;
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (server_pipe != INVALID_HANDLE_VALUE)
|
|
CloseHandle(server_pipe);
|
|
|
|
if (client_pipe != INVALID_HANDLE_VALUE)
|
|
CloseHandle(client_pipe);
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
static int uv__set_pipe_handle(uv_loop_t* loop,
|
|
uv_pipe_t* handle,
|
|
HANDLE pipeHandle,
|
|
int fd,
|
|
DWORD duplex_flags) {
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_MODE_INFORMATION mode_info;
|
|
DWORD mode = PIPE_READMODE_BYTE | PIPE_WAIT;
|
|
DWORD current_mode = 0;
|
|
DWORD err = 0;
|
|
|
|
assert(handle->flags & UV_HANDLE_CONNECTION);
|
|
assert(!(handle->flags & UV_HANDLE_PIPESERVER));
|
|
if (handle->flags & UV_HANDLE_CLOSING)
|
|
return UV_EINVAL;
|
|
if (handle->handle != INVALID_HANDLE_VALUE)
|
|
return UV_EBUSY;
|
|
|
|
if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
|
|
err = GetLastError();
|
|
if (err == ERROR_ACCESS_DENIED) {
|
|
/*
|
|
* SetNamedPipeHandleState can fail if the handle doesn't have either
|
|
* GENERIC_WRITE or FILE_WRITE_ATTRIBUTES.
|
|
* But if the handle already has the desired wait and blocking modes
|
|
* we can continue.
|
|
*/
|
|
if (!GetNamedPipeHandleState(pipeHandle, ¤t_mode, NULL, NULL,
|
|
NULL, NULL, 0)) {
|
|
return uv_translate_sys_error(GetLastError());
|
|
} else if (current_mode & PIPE_NOWAIT) {
|
|
return UV_EACCES;
|
|
}
|
|
} else {
|
|
/* If this returns ERROR_INVALID_PARAMETER we probably opened
|
|
* something that is not a pipe. */
|
|
if (err == ERROR_INVALID_PARAMETER) {
|
|
return UV_ENOTSOCK;
|
|
}
|
|
return uv_translate_sys_error(err);
|
|
}
|
|
}
|
|
|
|
/* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
|
|
nt_status = pNtQueryInformationFile(pipeHandle,
|
|
&io_status,
|
|
&mode_info,
|
|
sizeof(mode_info),
|
|
FileModeInformation);
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
return uv_translate_sys_error(err);
|
|
}
|
|
|
|
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
|
|
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
|
|
/* Non-overlapped pipe. */
|
|
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
|
|
handle->pipe.conn.readfile_thread_handle = NULL;
|
|
InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock);
|
|
} else {
|
|
/* Overlapped pipe. Try to associate with IOCP. */
|
|
if (CreateIoCompletionPort(pipeHandle,
|
|
loop->iocp,
|
|
(ULONG_PTR) handle,
|
|
0) == NULL) {
|
|
handle->flags |= UV_HANDLE_EMULATE_IOCP;
|
|
}
|
|
}
|
|
|
|
handle->handle = pipeHandle;
|
|
handle->u.fd = fd;
|
|
handle->flags |= duplex_flags;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_pipe_accept_t* req, BOOL firstInstance) {
|
|
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
|
|
|
|
req->pipeHandle =
|
|
CreateNamedPipeW(handle->name,
|
|
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED | WRITE_DAC |
|
|
(firstInstance ? FILE_FLAG_FIRST_PIPE_INSTANCE : 0),
|
|
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
|
|
PIPE_UNLIMITED_INSTANCES, 65536, 65536, 0, NULL);
|
|
|
|
if (req->pipeHandle == INVALID_HANDLE_VALUE) {
|
|
return 0;
|
|
}
|
|
|
|
/* Associate it with IOCP so we can get events. */
|
|
if (CreateIoCompletionPort(req->pipeHandle,
|
|
loop->iocp,
|
|
(ULONG_PTR) handle,
|
|
0) == NULL) {
|
|
uv_fatal_error(GetLastError(), "CreateIoCompletionPort");
|
|
}
|
|
|
|
/* Stash a handle in the server object for use from places such as
|
|
* getsockname and chmod. As we transfer ownership of these to client
|
|
* objects, we'll allocate new ones here. */
|
|
handle->handle = req->pipeHandle;
|
|
|
|
return 1;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI pipe_shutdown_thread_proc(void* parameter) {
|
|
uv_loop_t* loop;
|
|
uv_pipe_t* handle;
|
|
uv_shutdown_t* req;
|
|
|
|
req = (uv_shutdown_t*) parameter;
|
|
assert(req);
|
|
handle = (uv_pipe_t*) req->handle;
|
|
assert(handle);
|
|
loop = handle->loop;
|
|
assert(loop);
|
|
|
|
FlushFileBuffers(handle->handle);
|
|
|
|
/* Post completed */
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void uv__pipe_shutdown(uv_loop_t* loop, uv_pipe_t* handle, uv_shutdown_t *req) {
|
|
DWORD result;
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_PIPE_LOCAL_INFORMATION pipe_info;
|
|
|
|
assert(handle->flags & UV_HANDLE_CONNECTION);
|
|
assert(req != NULL);
|
|
assert(handle->stream.conn.write_reqs_pending == 0);
|
|
SET_REQ_SUCCESS(req);
|
|
|
|
if (handle->flags & UV_HANDLE_CLOSING) {
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
return;
|
|
}
|
|
|
|
/* Try to avoid flushing the pipe buffer in the thread pool. */
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
&pipe_info,
|
|
sizeof pipe_info,
|
|
FilePipeLocalInformation);
|
|
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
SET_REQ_ERROR(req, pRtlNtStatusToDosError(nt_status));
|
|
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
return;
|
|
}
|
|
|
|
if (pipe_info.OutboundQuota == pipe_info.WriteQuotaAvailable) {
|
|
/* Short-circuit, no need to call FlushFileBuffers:
|
|
* all writes have been read. */
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
return;
|
|
}
|
|
|
|
/* Run FlushFileBuffers in the thread pool. */
|
|
result = QueueUserWorkItem(pipe_shutdown_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION);
|
|
if (!result) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
handle->flags |= UV_HANDLE_WRITABLE; /* Questionable. */
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
uv__ipc_xfer_queue_item_t* xfer_queue_item;
|
|
|
|
assert(handle->reqs_pending == 0);
|
|
assert(handle->flags & UV_HANDLE_CLOSING);
|
|
assert(!(handle->flags & UV_HANDLE_CLOSED));
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
/* Free pending sockets */
|
|
while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) {
|
|
QUEUE* q;
|
|
SOCKET socket;
|
|
|
|
q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue);
|
|
QUEUE_REMOVE(q);
|
|
xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
|
|
|
/* Materialize socket and close it */
|
|
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
|
FROM_PROTOCOL_INFO,
|
|
FROM_PROTOCOL_INFO,
|
|
&xfer_queue_item->xfer_info.socket_info,
|
|
0,
|
|
WSA_FLAG_OVERLAPPED);
|
|
uv__free(xfer_queue_item);
|
|
|
|
if (socket != INVALID_SOCKET)
|
|
closesocket(socket);
|
|
}
|
|
handle->pipe.conn.ipc_xfer_queue_length = 0;
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
|
UnregisterWait(handle->read_req.wait_handle);
|
|
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (handle->read_req.event_handle != NULL) {
|
|
CloseHandle(handle->read_req.event_handle);
|
|
handle->read_req.event_handle = NULL;
|
|
}
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)
|
|
DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock);
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_PIPESERVER) {
|
|
assert(handle->pipe.serv.accept_reqs);
|
|
uv__free(handle->pipe.serv.accept_reqs);
|
|
handle->pipe.serv.accept_reqs = NULL;
|
|
}
|
|
|
|
uv__handle_close(handle);
|
|
}
|
|
|
|
|
|
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return;
|
|
handle->pipe.serv.pending_instances = count;
|
|
handle->flags |= UV_HANDLE_PIPESERVER;
|
|
}
|
|
|
|
|
|
/* Creates a pipe server. */
|
|
int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int i, err, nameSize;
|
|
uv_pipe_accept_t* req;
|
|
|
|
if (handle->flags & UV_HANDLE_BOUND) {
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
if (!name) {
|
|
return UV_EINVAL;
|
|
}
|
|
if (uv__is_closing(handle)) {
|
|
return UV_EINVAL;
|
|
}
|
|
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
|
|
handle->pipe.serv.pending_instances = default_pending_pipe_instances;
|
|
}
|
|
|
|
handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*)
|
|
uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances);
|
|
if (!handle->pipe.serv.accept_reqs) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
req = &handle->pipe.serv.accept_reqs[i];
|
|
UV_REQ_INIT(req, UV_ACCEPT);
|
|
req->data = handle;
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
req->next_pending = NULL;
|
|
}
|
|
|
|
/* Convert name to UTF16. */
|
|
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
|
|
handle->name = uv__malloc(nameSize);
|
|
if (!handle->name) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
if (!MultiByteToWideChar(CP_UTF8,
|
|
0,
|
|
name,
|
|
-1,
|
|
handle->name,
|
|
nameSize / sizeof(WCHAR))) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
/*
|
|
* Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE.
|
|
* If this fails then there's already a pipe server for the given pipe name.
|
|
*/
|
|
if (!pipe_alloc_accept(loop,
|
|
handle,
|
|
&handle->pipe.serv.accept_reqs[0],
|
|
TRUE)) {
|
|
err = GetLastError();
|
|
if (err == ERROR_ACCESS_DENIED) {
|
|
err = WSAEADDRINUSE; /* Translates to UV_EADDRINUSE. */
|
|
} else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) {
|
|
err = WSAEACCES; /* Translates to UV_EACCES. */
|
|
}
|
|
goto error;
|
|
}
|
|
|
|
handle->pipe.serv.pending_accepts = NULL;
|
|
handle->flags |= UV_HANDLE_PIPESERVER;
|
|
handle->flags |= UV_HANDLE_BOUND;
|
|
|
|
return 0;
|
|
|
|
error:
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
return uv_translate_sys_error(err);
|
|
}
|
|
|
|
|
|
static DWORD WINAPI pipe_connect_thread_proc(void* parameter) {
|
|
uv_loop_t* loop;
|
|
uv_pipe_t* handle;
|
|
uv_connect_t* req;
|
|
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
|
|
DWORD duplex_flags;
|
|
|
|
req = (uv_connect_t*) parameter;
|
|
assert(req);
|
|
handle = (uv_pipe_t*) req->handle;
|
|
assert(handle);
|
|
loop = handle->loop;
|
|
assert(loop);
|
|
|
|
/* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait
|
|
* up to 30 seconds for the pipe to become available with WaitNamedPipe. */
|
|
while (WaitNamedPipeW(handle->name, 30000)) {
|
|
/* The pipe is now available, try to connect. */
|
|
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
|
|
if (pipeHandle != INVALID_HANDLE_VALUE)
|
|
break;
|
|
|
|
SwitchToThread();
|
|
}
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
SET_REQ_SUCCESS(req);
|
|
req->u.connect.pipeHandle = pipeHandle;
|
|
req->u.connect.duplex_flags = duplex_flags;
|
|
} else {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
|
|
/* Post completed */
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
|
|
const char* name, uv_connect_cb cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int err, nameSize;
|
|
HANDLE pipeHandle = INVALID_HANDLE_VALUE;
|
|
DWORD duplex_flags;
|
|
|
|
UV_REQ_INIT(req, UV_CONNECT);
|
|
req->handle = (uv_stream_t*) handle;
|
|
req->cb = cb;
|
|
req->u.connect.pipeHandle = INVALID_HANDLE_VALUE;
|
|
req->u.connect.duplex_flags = 0;
|
|
|
|
if (handle->flags & UV_HANDLE_PIPESERVER) {
|
|
err = ERROR_INVALID_PARAMETER;
|
|
goto error;
|
|
}
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
err = ERROR_PIPE_BUSY;
|
|
goto error;
|
|
}
|
|
uv__pipe_connection_init(handle);
|
|
|
|
/* Convert name to UTF16. */
|
|
nameSize = MultiByteToWideChar(CP_UTF8, 0, name, -1, NULL, 0) * sizeof(WCHAR);
|
|
handle->name = uv__malloc(nameSize);
|
|
if (!handle->name) {
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
}
|
|
|
|
if (!MultiByteToWideChar(CP_UTF8,
|
|
0,
|
|
name,
|
|
-1,
|
|
handle->name,
|
|
nameSize / sizeof(WCHAR))) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
pipeHandle = open_named_pipe(handle->name, &duplex_flags);
|
|
if (pipeHandle == INVALID_HANDLE_VALUE) {
|
|
if (GetLastError() == ERROR_PIPE_BUSY) {
|
|
/* Wait for the server to make a pipe instance available. */
|
|
if (!QueueUserWorkItem(&pipe_connect_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
|
|
return;
|
|
}
|
|
|
|
err = GetLastError();
|
|
goto error;
|
|
}
|
|
|
|
req->u.connect.pipeHandle = pipeHandle;
|
|
req->u.connect.duplex_flags = duplex_flags;
|
|
SET_REQ_SUCCESS(req);
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
return;
|
|
|
|
error:
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
if (pipeHandle != INVALID_HANDLE_VALUE)
|
|
CloseHandle(pipeHandle);
|
|
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, err);
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
return;
|
|
}
|
|
|
|
|
|
void uv__pipe_interrupt_read(uv_pipe_t* handle) {
|
|
BOOL r;
|
|
|
|
if (!(handle->flags & UV_HANDLE_READ_PENDING))
|
|
return; /* No pending reads. */
|
|
if (handle->flags & UV_HANDLE_CANCELLATION_PENDING)
|
|
return; /* Already cancelled. */
|
|
if (handle->handle == INVALID_HANDLE_VALUE)
|
|
return; /* Pipe handle closed. */
|
|
|
|
if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) {
|
|
/* Cancel asynchronous read. */
|
|
r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped);
|
|
assert(r || GetLastError() == ERROR_NOT_FOUND);
|
|
(void) r;
|
|
} else {
|
|
/* Cancel synchronous read (which is happening in the thread pool). */
|
|
HANDLE thread;
|
|
volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
|
|
|
|
EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock);
|
|
|
|
thread = *thread_ptr;
|
|
if (thread == NULL) {
|
|
/* The thread pool thread has not yet reached the point of blocking, we
|
|
* can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */
|
|
*thread_ptr = INVALID_HANDLE_VALUE;
|
|
|
|
} else {
|
|
/* Spin until the thread has acknowledged (by setting the thread to
|
|
* INVALID_HANDLE_VALUE) that it is past the point of blocking. */
|
|
while (thread != INVALID_HANDLE_VALUE) {
|
|
r = CancelSynchronousIo(thread);
|
|
assert(r || GetLastError() == ERROR_NOT_FOUND);
|
|
SwitchToThread(); /* Yield thread. */
|
|
thread = *thread_ptr;
|
|
}
|
|
}
|
|
|
|
LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock);
|
|
}
|
|
|
|
/* Set flag to indicate that read has been cancelled. */
|
|
handle->flags |= UV_HANDLE_CANCELLATION_PENDING;
|
|
}
|
|
|
|
|
|
void uv__pipe_read_stop(uv_pipe_t* handle) {
|
|
handle->flags &= ~UV_HANDLE_READING;
|
|
DECREASE_ACTIVE_COUNT(handle->loop, handle);
|
|
uv__pipe_interrupt_read(handle);
|
|
}
|
|
|
|
|
|
/* Cleans up uv_pipe_t (server or connection) and all resources associated with
|
|
* it. */
|
|
void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
int i;
|
|
HANDLE pipeHandle;
|
|
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
handle->flags &= ~UV_HANDLE_READING;
|
|
DECREASE_ACTIVE_COUNT(loop, handle);
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_LISTENING) {
|
|
handle->flags &= ~UV_HANDLE_LISTENING;
|
|
DECREASE_ACTIVE_COUNT(loop, handle);
|
|
}
|
|
|
|
handle->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
|
|
|
|
uv__handle_closing(handle);
|
|
|
|
uv__pipe_interrupt_read(handle);
|
|
|
|
if (handle->name) {
|
|
uv__free(handle->name);
|
|
handle->name = NULL;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_PIPESERVER) {
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle;
|
|
if (pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(pipeHandle);
|
|
handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE;
|
|
}
|
|
}
|
|
handle->handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
eof_timer_destroy(handle);
|
|
}
|
|
|
|
if ((handle->flags & UV_HANDLE_CONNECTION)
|
|
&& handle->handle != INVALID_HANDLE_VALUE) {
|
|
/* This will eventually destroy the write queue for us too. */
|
|
close_pipe(handle);
|
|
}
|
|
|
|
if (handle->reqs_pending == 0)
|
|
uv__want_endgame(loop, (uv_handle_t*) handle);
|
|
}
|
|
|
|
|
|
static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_pipe_accept_t* req, BOOL firstInstance) {
|
|
assert(handle->flags & UV_HANDLE_LISTENING);
|
|
|
|
if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
return;
|
|
}
|
|
|
|
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
|
|
|
|
/* Prepare the overlapped structure. */
|
|
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
|
|
|
|
if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) &&
|
|
GetLastError() != ERROR_IO_PENDING) {
|
|
if (GetLastError() == ERROR_PIPE_CONNECTED) {
|
|
SET_REQ_SUCCESS(req);
|
|
} else {
|
|
CloseHandle(req->pipeHandle);
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
uv__insert_pending_req(loop, (uv_req_t*) req);
|
|
handle->reqs_pending++;
|
|
return;
|
|
}
|
|
|
|
/* Wait for completion via IOCP */
|
|
handle->reqs_pending++;
|
|
}
|
|
|
|
|
|
int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
|
uv_loop_t* loop = server->loop;
|
|
uv_pipe_t* pipe_client;
|
|
uv_pipe_accept_t* req;
|
|
QUEUE* q;
|
|
uv__ipc_xfer_queue_item_t* item;
|
|
int err;
|
|
|
|
if (server->ipc) {
|
|
if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) {
|
|
/* No valid pending sockets. */
|
|
return WSAEWOULDBLOCK;
|
|
}
|
|
|
|
q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue);
|
|
QUEUE_REMOVE(q);
|
|
server->pipe.conn.ipc_xfer_queue_length--;
|
|
item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member);
|
|
|
|
err = uv__tcp_xfer_import(
|
|
(uv_tcp_t*) client, item->xfer_type, &item->xfer_info);
|
|
if (err != 0)
|
|
return err;
|
|
|
|
uv__free(item);
|
|
|
|
} else {
|
|
pipe_client = (uv_pipe_t*) client;
|
|
uv__pipe_connection_init(pipe_client);
|
|
|
|
/* Find a connection instance that has been connected, but not yet
|
|
* accepted. */
|
|
req = server->pipe.serv.pending_accepts;
|
|
|
|
if (!req) {
|
|
/* No valid connections found, so we error out. */
|
|
return WSAEWOULDBLOCK;
|
|
}
|
|
|
|
/* Initialize the client handle and copy the pipeHandle to the client */
|
|
pipe_client->handle = req->pipeHandle;
|
|
pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
|
|
|
|
/* Prepare the req to pick up a new connection */
|
|
server->pipe.serv.pending_accepts = req->next_pending;
|
|
req->next_pending = NULL;
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
|
|
server->handle = INVALID_HANDLE_VALUE;
|
|
if (!(server->flags & UV_HANDLE_CLOSING)) {
|
|
uv__pipe_queue_accept(loop, server, req, FALSE);
|
|
}
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* Starts listening for connections for the given pipe. */
|
|
int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
int i;
|
|
|
|
if (handle->flags & UV_HANDLE_LISTENING) {
|
|
handle->stream.serv.connection_cb = cb;
|
|
}
|
|
|
|
if (!(handle->flags & UV_HANDLE_BOUND)) {
|
|
return WSAEINVAL;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_READING) {
|
|
return WSAEISCONN;
|
|
}
|
|
|
|
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
|
|
return ERROR_NOT_SUPPORTED;
|
|
}
|
|
|
|
if (handle->ipc) {
|
|
return WSAEINVAL;
|
|
}
|
|
|
|
handle->flags |= UV_HANDLE_LISTENING;
|
|
INCREASE_ACTIVE_COUNT(loop, handle);
|
|
handle->stream.serv.connection_cb = cb;
|
|
|
|
/* First pipe handle should have already been created in uv_pipe_bind */
|
|
assert(handle->pipe.serv.accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
|
|
|
|
for (i = 0; i < handle->pipe.serv.pending_instances; i++) {
|
|
uv__pipe_queue_accept(loop, handle, &handle->pipe.serv.accept_reqs[i], i == 0);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) {
|
|
uv_read_t* req = (uv_read_t*) arg;
|
|
uv_pipe_t* handle = (uv_pipe_t*) req->data;
|
|
uv_loop_t* loop = handle->loop;
|
|
volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle;
|
|
CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock;
|
|
HANDLE thread;
|
|
DWORD bytes;
|
|
DWORD err;
|
|
|
|
assert(req->type == UV_READ);
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
err = 0;
|
|
|
|
/* Create a handle to the current thread. */
|
|
if (!DuplicateHandle(GetCurrentProcess(),
|
|
GetCurrentThread(),
|
|
GetCurrentProcess(),
|
|
&thread,
|
|
0,
|
|
FALSE,
|
|
DUPLICATE_SAME_ACCESS)) {
|
|
err = GetLastError();
|
|
goto out1;
|
|
}
|
|
|
|
/* The lock needs to be held when thread handle is modified. */
|
|
EnterCriticalSection(lock);
|
|
if (*thread_ptr == INVALID_HANDLE_VALUE) {
|
|
/* uv__pipe_interrupt_read() cancelled reading before we got here. */
|
|
err = ERROR_OPERATION_ABORTED;
|
|
} else {
|
|
/* Let main thread know which worker thread is doing the blocking read. */
|
|
assert(*thread_ptr == NULL);
|
|
*thread_ptr = thread;
|
|
}
|
|
LeaveCriticalSection(lock);
|
|
|
|
if (err)
|
|
goto out2;
|
|
|
|
/* Block the thread until data is available on the pipe, or the read is
|
|
* cancelled. */
|
|
if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL))
|
|
err = GetLastError();
|
|
|
|
/* Let the main thread know the worker is past the point of blocking. */
|
|
assert(thread == *thread_ptr);
|
|
*thread_ptr = INVALID_HANDLE_VALUE;
|
|
|
|
/* Briefly acquire the mutex. Since the main thread holds the lock while it
|
|
* is spinning trying to cancel this thread's I/O, we will block here until
|
|
* it stops doing that. */
|
|
EnterCriticalSection(lock);
|
|
LeaveCriticalSection(lock);
|
|
|
|
out2:
|
|
/* Close the handle to the current thread. */
|
|
CloseHandle(thread);
|
|
|
|
out1:
|
|
/* Set request status and post a completion record to the IOCP. */
|
|
if (err)
|
|
SET_REQ_ERROR(req, err);
|
|
else
|
|
SET_REQ_SUCCESS(req);
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
|
|
int result;
|
|
DWORD bytes;
|
|
uv_write_t* req = (uv_write_t*) parameter;
|
|
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
|
|
uv_loop_t* loop = handle->loop;
|
|
|
|
assert(req != NULL);
|
|
assert(req->type == UV_WRITE);
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
result = WriteFile(handle->handle,
|
|
req->write_buffer.base,
|
|
req->write_buffer.len,
|
|
&bytes,
|
|
NULL);
|
|
|
|
if (!result) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
}
|
|
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
|
|
uv_read_t* req;
|
|
uv_tcp_t* handle;
|
|
|
|
req = (uv_read_t*) context;
|
|
assert(req != NULL);
|
|
handle = (uv_tcp_t*)req->data;
|
|
assert(handle != NULL);
|
|
assert(!timed_out);
|
|
|
|
if (!PostQueuedCompletionStatus(handle->loop->iocp,
|
|
req->u.io.overlapped.InternalHigh,
|
|
0,
|
|
&req->u.io.overlapped)) {
|
|
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
|
|
}
|
|
}
|
|
|
|
|
|
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
|
|
uv_write_t* req;
|
|
uv_tcp_t* handle;
|
|
|
|
req = (uv_write_t*) context;
|
|
assert(req != NULL);
|
|
handle = (uv_tcp_t*)req->handle;
|
|
assert(handle != NULL);
|
|
assert(!timed_out);
|
|
|
|
if (!PostQueuedCompletionStatus(handle->loop->iocp,
|
|
req->u.io.overlapped.InternalHigh,
|
|
0,
|
|
&req->u.io.overlapped)) {
|
|
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
|
|
}
|
|
}
|
|
|
|
|
|
static void uv__pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
uv_read_t* req;
|
|
int result;
|
|
|
|
assert(handle->flags & UV_HANDLE_READING);
|
|
assert(!(handle->flags & UV_HANDLE_READ_PENDING));
|
|
|
|
assert(handle->handle != INVALID_HANDLE_VALUE);
|
|
|
|
req = &handle->read_req;
|
|
|
|
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */
|
|
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
} else {
|
|
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
assert(req->event_handle != NULL);
|
|
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
|
|
}
|
|
|
|
/* Do 0-read */
|
|
result = ReadFile(handle->handle,
|
|
&uv_zero_,
|
|
0,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
/* Make this req pending reporting an error. */
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (req->wait_handle == INVALID_HANDLE_VALUE) {
|
|
if (!RegisterWaitForSingleObject(&req->wait_handle,
|
|
req->event_handle, post_completion_read_wait, (void*) req,
|
|
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
|
SET_REQ_ERROR(req, GetLastError());
|
|
goto error;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Start the eof timer if there is one */
|
|
eof_timer_start(handle);
|
|
handle->flags |= UV_HANDLE_READ_PENDING;
|
|
handle->reqs_pending++;
|
|
return;
|
|
|
|
error:
|
|
uv__insert_pending_req(loop, (uv_req_t*)req);
|
|
handle->flags |= UV_HANDLE_READ_PENDING;
|
|
handle->reqs_pending++;
|
|
}
|
|
|
|
|
|
int uv__pipe_read_start(uv_pipe_t* handle,
|
|
uv_alloc_cb alloc_cb,
|
|
uv_read_cb read_cb) {
|
|
uv_loop_t* loop = handle->loop;
|
|
|
|
handle->flags |= UV_HANDLE_READING;
|
|
INCREASE_ACTIVE_COUNT(loop, handle);
|
|
handle->read_cb = read_cb;
|
|
handle->alloc_cb = alloc_cb;
|
|
|
|
/* If reading was stopped and then started again, there could still be a read
|
|
* request pending. */
|
|
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
|
|
handle->read_req.event_handle == NULL) {
|
|
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
|
|
if (handle->read_req.event_handle == NULL) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
}
|
|
uv__pipe_queue_read(loop, handle);
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static void uv__insert_non_overlapped_write_req(uv_pipe_t* handle,
|
|
uv_write_t* req) {
|
|
req->next_req = NULL;
|
|
if (handle->pipe.conn.non_overlapped_writes_tail) {
|
|
req->next_req =
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req;
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req = (uv_req_t*)req;
|
|
handle->pipe.conn.non_overlapped_writes_tail = req;
|
|
} else {
|
|
req->next_req = (uv_req_t*)req;
|
|
handle->pipe.conn.non_overlapped_writes_tail = req;
|
|
}
|
|
}
|
|
|
|
|
|
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
|
|
uv_write_t* req;
|
|
|
|
if (handle->pipe.conn.non_overlapped_writes_tail) {
|
|
req = (uv_write_t*)handle->pipe.conn.non_overlapped_writes_tail->next_req;
|
|
|
|
if (req == handle->pipe.conn.non_overlapped_writes_tail) {
|
|
handle->pipe.conn.non_overlapped_writes_tail = NULL;
|
|
} else {
|
|
handle->pipe.conn.non_overlapped_writes_tail->next_req =
|
|
req->next_req;
|
|
}
|
|
|
|
return req;
|
|
} else {
|
|
/* queue empty */
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static void uv__queue_non_overlapped_write(uv_pipe_t* handle) {
|
|
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
|
|
if (req) {
|
|
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
|
|
req,
|
|
WT_EXECUTELONGFUNCTION)) {
|
|
uv_fatal_error(GetLastError(), "QueueUserWorkItem");
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
static int uv__build_coalesced_write_req(uv_write_t* user_req,
|
|
const uv_buf_t bufs[],
|
|
size_t nbufs,
|
|
uv_write_t** req_out,
|
|
uv_buf_t* write_buf_out) {
|
|
/* Pack into a single heap-allocated buffer:
|
|
* (a) a uv_write_t structure where libuv stores the actual state.
|
|
* (b) a pointer to the original uv_write_t.
|
|
* (c) data from all `bufs` entries.
|
|
*/
|
|
char* heap_buffer;
|
|
size_t heap_buffer_length, heap_buffer_offset;
|
|
uv__coalesced_write_t* coalesced_write_req; /* (a) + (b) */
|
|
char* data_start; /* (c) */
|
|
size_t data_length;
|
|
unsigned int i;
|
|
|
|
/* Compute combined size of all combined buffers from `bufs`. */
|
|
data_length = 0;
|
|
for (i = 0; i < nbufs; i++)
|
|
data_length += bufs[i].len;
|
|
|
|
/* The total combined size of data buffers should not exceed UINT32_MAX,
|
|
* because WriteFile() won't accept buffers larger than that. */
|
|
if (data_length > UINT32_MAX)
|
|
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
|
|
|
|
/* Compute heap buffer size. */
|
|
heap_buffer_length = sizeof *coalesced_write_req + /* (a) + (b) */
|
|
data_length; /* (c) */
|
|
|
|
/* Allocate buffer. */
|
|
heap_buffer = uv__malloc(heap_buffer_length);
|
|
if (heap_buffer == NULL)
|
|
return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
|
|
|
|
/* Copy uv_write_t information to the buffer. */
|
|
coalesced_write_req = (uv__coalesced_write_t*) heap_buffer;
|
|
coalesced_write_req->req = *user_req; /* copy (a) */
|
|
coalesced_write_req->req.coalesced = 1;
|
|
coalesced_write_req->user_req = user_req; /* copy (b) */
|
|
heap_buffer_offset = sizeof *coalesced_write_req; /* offset (a) + (b) */
|
|
|
|
/* Copy data buffers to the heap buffer. */
|
|
data_start = &heap_buffer[heap_buffer_offset];
|
|
for (i = 0; i < nbufs; i++) {
|
|
memcpy(&heap_buffer[heap_buffer_offset],
|
|
bufs[i].base,
|
|
bufs[i].len); /* copy (c) */
|
|
heap_buffer_offset += bufs[i].len; /* offset (c) */
|
|
}
|
|
assert(heap_buffer_offset == heap_buffer_length);
|
|
|
|
/* Set out arguments and return. */
|
|
*req_out = &coalesced_write_req->req;
|
|
*write_buf_out = uv_buf_init(data_start, (unsigned int) data_length);
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int uv__pipe_write_data(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t bufs[],
|
|
size_t nbufs,
|
|
uv_write_cb cb,
|
|
int copy_always) {
|
|
int err;
|
|
int result;
|
|
uv_buf_t write_buf;
|
|
|
|
assert(handle->handle != INVALID_HANDLE_VALUE);
|
|
|
|
UV_REQ_INIT(req, UV_WRITE);
|
|
req->handle = (uv_stream_t*) handle;
|
|
req->send_handle = NULL;
|
|
req->cb = cb;
|
|
/* Private fields. */
|
|
req->coalesced = 0;
|
|
req->event_handle = NULL;
|
|
req->wait_handle = INVALID_HANDLE_VALUE;
|
|
|
|
/* Prepare the overlapped structure. */
|
|
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
|
|
if (handle->flags & (UV_HANDLE_EMULATE_IOCP | UV_HANDLE_BLOCKING_WRITES)) {
|
|
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
|
|
if (req->event_handle == NULL) {
|
|
uv_fatal_error(GetLastError(), "CreateEvent");
|
|
}
|
|
req->u.io.overlapped.hEvent = (HANDLE) ((uintptr_t) req->event_handle | 1);
|
|
}
|
|
req->write_buffer = uv_null_buf_;
|
|
|
|
if (nbufs == 0) {
|
|
/* Write empty buffer. */
|
|
write_buf = uv_null_buf_;
|
|
} else if (nbufs == 1 && !copy_always) {
|
|
/* Write directly from bufs[0]. */
|
|
write_buf = bufs[0];
|
|
} else {
|
|
/* Coalesce all `bufs` into one big buffer. This also creates a new
|
|
* write-request structure that replaces the old one. */
|
|
err = uv__build_coalesced_write_req(req, bufs, nbufs, &req, &write_buf);
|
|
if (err != 0)
|
|
return err;
|
|
}
|
|
|
|
if ((handle->flags &
|
|
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) ==
|
|
(UV_HANDLE_BLOCKING_WRITES | UV_HANDLE_NON_OVERLAPPED_PIPE)) {
|
|
DWORD bytes;
|
|
result =
|
|
WriteFile(handle->handle, write_buf.base, write_buf.len, &bytes, NULL);
|
|
|
|
if (!result) {
|
|
err = GetLastError();
|
|
return err;
|
|
} else {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
POST_COMPLETION_FOR_REQ(loop, req);
|
|
return 0;
|
|
} else if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
req->write_buffer = write_buf;
|
|
uv__insert_non_overlapped_write_req(handle, req);
|
|
if (handle->stream.conn.write_reqs_pending == 0) {
|
|
uv__queue_non_overlapped_write(handle);
|
|
}
|
|
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = write_buf.len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
} else if (handle->flags & UV_HANDLE_BLOCKING_WRITES) {
|
|
/* Using overlapped IO, but wait for completion before returning */
|
|
result = WriteFile(handle->handle,
|
|
write_buf.base,
|
|
write_buf.len,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
err = GetLastError();
|
|
CloseHandle(req->event_handle);
|
|
req->event_handle = NULL;
|
|
return err;
|
|
}
|
|
|
|
if (result) {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
} else {
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = write_buf.len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
if (WaitForSingleObject(req->event_handle, INFINITE) !=
|
|
WAIT_OBJECT_0) {
|
|
err = GetLastError();
|
|
CloseHandle(req->event_handle);
|
|
req->event_handle = NULL;
|
|
return err;
|
|
}
|
|
}
|
|
CloseHandle(req->event_handle);
|
|
req->event_handle = NULL;
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
return 0;
|
|
} else {
|
|
result = WriteFile(handle->handle,
|
|
write_buf.base,
|
|
write_buf.len,
|
|
NULL,
|
|
&req->u.io.overlapped);
|
|
|
|
if (!result && GetLastError() != ERROR_IO_PENDING) {
|
|
return GetLastError();
|
|
}
|
|
|
|
if (result) {
|
|
/* Request completed immediately. */
|
|
req->u.io.queued_bytes = 0;
|
|
} else {
|
|
/* Request queued by the kernel. */
|
|
req->u.io.queued_bytes = write_buf.len;
|
|
handle->write_queue_size += req->u.io.queued_bytes;
|
|
}
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (!RegisterWaitForSingleObject(&req->wait_handle,
|
|
req->event_handle, post_completion_write_wait, (void*) req,
|
|
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
|
return GetLastError();
|
|
}
|
|
}
|
|
}
|
|
|
|
REGISTER_HANDLE_REQ(loop, handle, req);
|
|
handle->reqs_pending++;
|
|
handle->stream.conn.write_reqs_pending++;
|
|
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD uv__pipe_get_ipc_remote_pid(uv_pipe_t* handle) {
|
|
DWORD* pid = &handle->pipe.conn.ipc_remote_pid;
|
|
|
|
/* If the both ends of the IPC pipe are owned by the same process,
|
|
* the remote end pid may not yet be set. If so, do it here.
|
|
* TODO: this is weird; it'd probably better to use a handshake. */
|
|
if (*pid == 0)
|
|
*pid = GetCurrentProcessId();
|
|
|
|
return *pid;
|
|
}
|
|
|
|
|
|
int uv__pipe_write_ipc(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t data_bufs[],
|
|
size_t data_buf_count,
|
|
uv_stream_t* send_handle,
|
|
uv_write_cb cb) {
|
|
uv_buf_t stack_bufs[6];
|
|
uv_buf_t* bufs;
|
|
size_t buf_count, buf_index;
|
|
uv__ipc_frame_header_t frame_header;
|
|
uv__ipc_socket_xfer_type_t xfer_type = UV__IPC_SOCKET_XFER_NONE;
|
|
uv__ipc_socket_xfer_info_t xfer_info;
|
|
uint64_t data_length;
|
|
size_t i;
|
|
int err;
|
|
|
|
/* Compute the combined size of data buffers. */
|
|
data_length = 0;
|
|
for (i = 0; i < data_buf_count; i++)
|
|
data_length += data_bufs[i].len;
|
|
if (data_length > UINT32_MAX)
|
|
return WSAENOBUFS; /* Maps to UV_ENOBUFS. */
|
|
|
|
/* Prepare the frame's socket xfer payload. */
|
|
if (send_handle != NULL) {
|
|
uv_tcp_t* send_tcp_handle = (uv_tcp_t*) send_handle;
|
|
|
|
/* Verify that `send_handle` it is indeed a tcp handle. */
|
|
if (send_tcp_handle->type != UV_TCP)
|
|
return ERROR_NOT_SUPPORTED;
|
|
|
|
/* Export the tcp handle. */
|
|
err = uv__tcp_xfer_export(send_tcp_handle,
|
|
uv__pipe_get_ipc_remote_pid(handle),
|
|
&xfer_type,
|
|
&xfer_info);
|
|
if (err != 0)
|
|
return err;
|
|
}
|
|
|
|
/* Compute the number of uv_buf_t's required. */
|
|
buf_count = 1 + data_buf_count; /* Frame header and data buffers. */
|
|
if (send_handle != NULL)
|
|
buf_count += 1; /* One extra for the socket xfer information. */
|
|
|
|
/* Use the on-stack buffer array if it is big enough; otherwise allocate
|
|
* space for it on the heap. */
|
|
if (buf_count < ARRAY_SIZE(stack_bufs)) {
|
|
/* Use on-stack buffer array. */
|
|
bufs = stack_bufs;
|
|
} else {
|
|
/* Use heap-allocated buffer array. */
|
|
bufs = uv__calloc(buf_count, sizeof(uv_buf_t));
|
|
if (bufs == NULL)
|
|
return ERROR_NOT_ENOUGH_MEMORY; /* Maps to UV_ENOMEM. */
|
|
}
|
|
buf_index = 0;
|
|
|
|
/* Initialize frame header and add it to the buffers list. */
|
|
memset(&frame_header, 0, sizeof frame_header);
|
|
bufs[buf_index++] = uv_buf_init((char*) &frame_header, sizeof frame_header);
|
|
|
|
if (send_handle != NULL) {
|
|
/* Add frame header flags. */
|
|
switch (xfer_type) {
|
|
case UV__IPC_SOCKET_XFER_TCP_CONNECTION:
|
|
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER |
|
|
UV__IPC_FRAME_XFER_IS_TCP_CONNECTION;
|
|
break;
|
|
case UV__IPC_SOCKET_XFER_TCP_SERVER:
|
|
frame_header.flags |= UV__IPC_FRAME_HAS_SOCKET_XFER;
|
|
break;
|
|
default:
|
|
assert(0); /* Unreachable. */
|
|
}
|
|
/* Add xfer info buffer. */
|
|
bufs[buf_index++] = uv_buf_init((char*) &xfer_info, sizeof xfer_info);
|
|
}
|
|
|
|
if (data_length > 0) {
|
|
/* Update frame header. */
|
|
frame_header.flags |= UV__IPC_FRAME_HAS_DATA;
|
|
frame_header.data_length = (uint32_t) data_length;
|
|
/* Add data buffers to buffers list. */
|
|
for (i = 0; i < data_buf_count; i++)
|
|
bufs[buf_index++] = data_bufs[i];
|
|
}
|
|
|
|
/* Write buffers. We set the `always_copy` flag, so it is not a problem that
|
|
* some of the written data lives on the stack. */
|
|
err = uv__pipe_write_data(loop, req, handle, bufs, buf_count, cb, 1);
|
|
|
|
/* If we had to heap-allocate the bufs array, free it now. */
|
|
if (bufs != stack_bufs) {
|
|
uv__free(bufs);
|
|
}
|
|
|
|
return err;
|
|
}
|
|
|
|
|
|
int uv__pipe_write(uv_loop_t* loop,
|
|
uv_write_t* req,
|
|
uv_pipe_t* handle,
|
|
const uv_buf_t bufs[],
|
|
size_t nbufs,
|
|
uv_stream_t* send_handle,
|
|
uv_write_cb cb) {
|
|
if (handle->ipc) {
|
|
/* IPC pipe write: use framing protocol. */
|
|
return uv__pipe_write_ipc(loop, req, handle, bufs, nbufs, send_handle, cb);
|
|
} else {
|
|
/* Non-IPC pipe write: put data on the wire directly. */
|
|
assert(send_handle == NULL);
|
|
return uv__pipe_write_data(loop, req, handle, bufs, nbufs, cb, 0);
|
|
}
|
|
}
|
|
|
|
|
|
static void uv__pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_buf_t buf) {
|
|
/* If there is an eof timer running, we don't need it any more, so discard
|
|
* it. */
|
|
eof_timer_destroy(handle);
|
|
|
|
uv_read_stop((uv_stream_t*) handle);
|
|
|
|
handle->read_cb((uv_stream_t*) handle, UV_EOF, &buf);
|
|
}
|
|
|
|
|
|
static void uv__pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
|
|
uv_buf_t buf) {
|
|
/* If there is an eof timer running, we don't need it any more, so discard
|
|
* it. */
|
|
eof_timer_destroy(handle);
|
|
|
|
uv_read_stop((uv_stream_t*) handle);
|
|
|
|
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
|
|
}
|
|
|
|
|
|
static void uv__pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|
int error, uv_buf_t buf) {
|
|
if (error == ERROR_BROKEN_PIPE) {
|
|
uv__pipe_read_eof(loop, handle, buf);
|
|
} else {
|
|
uv__pipe_read_error(loop, handle, error, buf);
|
|
}
|
|
}
|
|
|
|
|
|
static void uv__pipe_queue_ipc_xfer_info(
|
|
uv_pipe_t* handle,
|
|
uv__ipc_socket_xfer_type_t xfer_type,
|
|
uv__ipc_socket_xfer_info_t* xfer_info) {
|
|
uv__ipc_xfer_queue_item_t* item;
|
|
|
|
item = (uv__ipc_xfer_queue_item_t*) uv__malloc(sizeof(*item));
|
|
if (item == NULL)
|
|
uv_fatal_error(ERROR_OUTOFMEMORY, "uv__malloc");
|
|
|
|
item->xfer_type = xfer_type;
|
|
item->xfer_info = *xfer_info;
|
|
|
|
QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member);
|
|
handle->pipe.conn.ipc_xfer_queue_length++;
|
|
}
|
|
|
|
|
|
/* Read an exact number of bytes from a pipe. If an error or end-of-file is
|
|
* encountered before the requested number of bytes are read, an error is
|
|
* returned. */
|
|
static int uv__pipe_read_exactly(HANDLE h, void* buffer, DWORD count) {
|
|
DWORD bytes_read, bytes_read_now;
|
|
|
|
bytes_read = 0;
|
|
while (bytes_read < count) {
|
|
if (!ReadFile(h,
|
|
(char*) buffer + bytes_read,
|
|
count - bytes_read,
|
|
&bytes_read_now,
|
|
NULL)) {
|
|
return GetLastError();
|
|
}
|
|
|
|
bytes_read += bytes_read_now;
|
|
}
|
|
|
|
assert(bytes_read == count);
|
|
return 0;
|
|
}
|
|
|
|
|
|
static DWORD uv__pipe_read_data(uv_loop_t* loop,
|
|
uv_pipe_t* handle,
|
|
DWORD suggested_bytes,
|
|
DWORD max_bytes) {
|
|
DWORD bytes_read;
|
|
uv_buf_t buf;
|
|
|
|
/* Ask the user for a buffer to read data into. */
|
|
buf = uv_buf_init(NULL, 0);
|
|
handle->alloc_cb((uv_handle_t*) handle, suggested_bytes, &buf);
|
|
if (buf.base == NULL || buf.len == 0) {
|
|
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
|
return 0; /* Break out of read loop. */
|
|
}
|
|
|
|
/* Ensure we read at most the smaller of:
|
|
* (a) the length of the user-allocated buffer.
|
|
* (b) the maximum data length as specified by the `max_bytes` argument.
|
|
*/
|
|
if (max_bytes > buf.len)
|
|
max_bytes = buf.len;
|
|
|
|
/* Read into the user buffer. */
|
|
if (!ReadFile(handle->handle, buf.base, max_bytes, &bytes_read, NULL)) {
|
|
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), buf);
|
|
return 0; /* Break out of read loop. */
|
|
}
|
|
|
|
/* Call the read callback. */
|
|
handle->read_cb((uv_stream_t*) handle, bytes_read, &buf);
|
|
|
|
return bytes_read;
|
|
}
|
|
|
|
|
|
static DWORD uv__pipe_read_ipc(uv_loop_t* loop, uv_pipe_t* handle) {
|
|
uint32_t* data_remaining = &handle->pipe.conn.ipc_data_frame.payload_remaining;
|
|
int err;
|
|
|
|
if (*data_remaining > 0) {
|
|
/* Read frame data payload. */
|
|
DWORD bytes_read =
|
|
uv__pipe_read_data(loop, handle, *data_remaining, *data_remaining);
|
|
*data_remaining -= bytes_read;
|
|
return bytes_read;
|
|
|
|
} else {
|
|
/* Start of a new IPC frame. */
|
|
uv__ipc_frame_header_t frame_header;
|
|
uint32_t xfer_flags;
|
|
uv__ipc_socket_xfer_type_t xfer_type;
|
|
uv__ipc_socket_xfer_info_t xfer_info;
|
|
|
|
/* Read the IPC frame header. */
|
|
err = uv__pipe_read_exactly(
|
|
handle->handle, &frame_header, sizeof frame_header);
|
|
if (err)
|
|
goto error;
|
|
|
|
/* Validate that flags are valid. */
|
|
if ((frame_header.flags & ~UV__IPC_FRAME_VALID_FLAGS) != 0)
|
|
goto invalid;
|
|
/* Validate that reserved2 is zero. */
|
|
if (frame_header.reserved2 != 0)
|
|
goto invalid;
|
|
|
|
/* Parse xfer flags. */
|
|
xfer_flags = frame_header.flags & UV__IPC_FRAME_XFER_FLAGS;
|
|
if (xfer_flags & UV__IPC_FRAME_HAS_SOCKET_XFER) {
|
|
/* Socket coming -- determine the type. */
|
|
xfer_type = xfer_flags & UV__IPC_FRAME_XFER_IS_TCP_CONNECTION
|
|
? UV__IPC_SOCKET_XFER_TCP_CONNECTION
|
|
: UV__IPC_SOCKET_XFER_TCP_SERVER;
|
|
} else if (xfer_flags == 0) {
|
|
/* No socket. */
|
|
xfer_type = UV__IPC_SOCKET_XFER_NONE;
|
|
} else {
|
|
/* Invalid flags. */
|
|
goto invalid;
|
|
}
|
|
|
|
/* Parse data frame information. */
|
|
if (frame_header.flags & UV__IPC_FRAME_HAS_DATA) {
|
|
*data_remaining = frame_header.data_length;
|
|
} else if (frame_header.data_length != 0) {
|
|
/* Data length greater than zero but data flag not set -- invalid. */
|
|
goto invalid;
|
|
}
|
|
|
|
/* If no socket xfer info follows, return here. Data will be read in a
|
|
* subsequent invocation of uv__pipe_read_ipc(). */
|
|
if (xfer_type == UV__IPC_SOCKET_XFER_NONE)
|
|
return sizeof frame_header; /* Number of bytes read. */
|
|
|
|
/* Read transferred socket information. */
|
|
err = uv__pipe_read_exactly(handle->handle, &xfer_info, sizeof xfer_info);
|
|
if (err)
|
|
goto error;
|
|
|
|
/* Store the pending socket info. */
|
|
uv__pipe_queue_ipc_xfer_info(handle, xfer_type, &xfer_info);
|
|
|
|
/* Return number of bytes read. */
|
|
return sizeof frame_header + sizeof xfer_info;
|
|
}
|
|
|
|
invalid:
|
|
/* Invalid frame. */
|
|
err = WSAECONNABORTED; /* Maps to UV_ECONNABORTED. */
|
|
|
|
error:
|
|
uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
|
|
return 0; /* Break out of read loop. */
|
|
}
|
|
|
|
|
|
void uv__process_pipe_read_req(uv_loop_t* loop,
|
|
uv_pipe_t* handle,
|
|
uv_req_t* req) {
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING);
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
eof_timer_stop(handle);
|
|
|
|
/* At this point, we're done with bookkeeping. If the user has stopped
|
|
* reading the pipe in the meantime, there is nothing left to do, since there
|
|
* is no callback that we can call. */
|
|
if (!(handle->flags & UV_HANDLE_READING))
|
|
return;
|
|
|
|
if (!REQ_SUCCESS(req)) {
|
|
/* An error occurred doing the zero-read. */
|
|
DWORD err = GET_REQ_ERROR(req);
|
|
|
|
/* If the read was cancelled by uv__pipe_interrupt_read(), the request may
|
|
* indicate an ERROR_OPERATION_ABORTED error. This error isn't relevant to
|
|
* the user; we'll start a new zero-read at the end of this function. */
|
|
if (err != ERROR_OPERATION_ABORTED)
|
|
uv__pipe_read_error_or_eof(loop, handle, err, uv_null_buf_);
|
|
|
|
} else {
|
|
/* The zero-read completed without error, indicating there is data
|
|
* available in the kernel buffer. */
|
|
DWORD avail;
|
|
|
|
/* Get the number of bytes available. */
|
|
avail = 0;
|
|
if (!PeekNamedPipe(handle->handle, NULL, 0, NULL, &avail, NULL))
|
|
uv__pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
|
|
|
|
/* Read until we've either read all the bytes available, or the 'reading'
|
|
* flag is cleared. */
|
|
while (avail > 0 && handle->flags & UV_HANDLE_READING) {
|
|
/* Depending on the type of pipe, read either IPC frames or raw data. */
|
|
DWORD bytes_read =
|
|
handle->ipc ? uv__pipe_read_ipc(loop, handle)
|
|
: uv__pipe_read_data(loop, handle, avail, (DWORD) -1);
|
|
|
|
/* If no bytes were read, treat this as an indication that an error
|
|
* occurred, and break out of the read loop. */
|
|
if (bytes_read == 0)
|
|
break;
|
|
|
|
/* It is possible that more bytes were read than we thought were
|
|
* available. To prevent `avail` from underflowing, break out of the loop
|
|
* if this is the case. */
|
|
if (bytes_read > avail)
|
|
break;
|
|
|
|
/* Recompute the number of bytes available. */
|
|
avail -= bytes_read;
|
|
}
|
|
}
|
|
|
|
/* Start another zero-read request if necessary. */
|
|
if ((handle->flags & UV_HANDLE_READING) &&
|
|
!(handle->flags & UV_HANDLE_READ_PENDING)) {
|
|
uv__pipe_queue_read(loop, handle);
|
|
}
|
|
}
|
|
|
|
|
|
void uv__process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_write_t* req) {
|
|
int err;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
assert(handle->write_queue_size >= req->u.io.queued_bytes);
|
|
handle->write_queue_size -= req->u.io.queued_bytes;
|
|
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
|
if (req->wait_handle != INVALID_HANDLE_VALUE) {
|
|
UnregisterWait(req->wait_handle);
|
|
req->wait_handle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (req->event_handle) {
|
|
CloseHandle(req->event_handle);
|
|
req->event_handle = NULL;
|
|
}
|
|
}
|
|
|
|
err = GET_REQ_ERROR(req);
|
|
|
|
/* If this was a coalesced write, extract pointer to the user_provided
|
|
* uv_write_t structure so we can pass the expected pointer to the callback,
|
|
* then free the heap-allocated write req. */
|
|
if (req->coalesced) {
|
|
uv__coalesced_write_t* coalesced_write =
|
|
container_of(req, uv__coalesced_write_t, req);
|
|
req = coalesced_write->user_req;
|
|
uv__free(coalesced_write);
|
|
}
|
|
if (req->cb) {
|
|
req->cb(req, uv_translate_sys_error(err));
|
|
}
|
|
|
|
handle->stream.conn.write_reqs_pending--;
|
|
|
|
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
|
|
handle->pipe.conn.non_overlapped_writes_tail) {
|
|
assert(handle->stream.conn.write_reqs_pending > 0);
|
|
uv__queue_non_overlapped_write(handle);
|
|
}
|
|
|
|
if (handle->stream.conn.write_reqs_pending == 0)
|
|
if (handle->flags & UV_HANDLE_SHUTTING)
|
|
uv__pipe_shutdown(loop, handle, handle->stream.conn.shutdown_req);
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_req_t* raw_req) {
|
|
uv_pipe_accept_t* req = (uv_pipe_accept_t*) raw_req;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
if (handle->flags & UV_HANDLE_CLOSING) {
|
|
/* The req->pipeHandle should be freed already in uv__pipe_close(). */
|
|
assert(req->pipeHandle == INVALID_HANDLE_VALUE);
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
return;
|
|
}
|
|
|
|
if (REQ_SUCCESS(req)) {
|
|
assert(req->pipeHandle != INVALID_HANDLE_VALUE);
|
|
req->next_pending = handle->pipe.serv.pending_accepts;
|
|
handle->pipe.serv.pending_accepts = req;
|
|
|
|
if (handle->stream.serv.connection_cb) {
|
|
handle->stream.serv.connection_cb((uv_stream_t*)handle, 0);
|
|
}
|
|
} else {
|
|
if (req->pipeHandle != INVALID_HANDLE_VALUE) {
|
|
CloseHandle(req->pipeHandle);
|
|
req->pipeHandle = INVALID_HANDLE_VALUE;
|
|
}
|
|
if (!(handle->flags & UV_HANDLE_CLOSING)) {
|
|
uv__pipe_queue_accept(loop, handle, req, FALSE);
|
|
}
|
|
}
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_connect_t* req) {
|
|
HANDLE pipeHandle;
|
|
DWORD duplex_flags;
|
|
int err;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
err = 0;
|
|
if (REQ_SUCCESS(req)) {
|
|
pipeHandle = req->u.connect.pipeHandle;
|
|
duplex_flags = req->u.connect.duplex_flags;
|
|
err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags);
|
|
if (err)
|
|
CloseHandle(pipeHandle);
|
|
} else {
|
|
err = uv_translate_sys_error(GET_REQ_ERROR(req));
|
|
}
|
|
|
|
if (req->cb)
|
|
req->cb(req, err);
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
|
|
void uv__process_pipe_shutdown_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|
uv_shutdown_t* req) {
|
|
int err;
|
|
|
|
assert(handle->type == UV_NAMED_PIPE);
|
|
|
|
/* Clear the shutdown_req field so we don't go here again. */
|
|
handle->stream.conn.shutdown_req = NULL;
|
|
handle->flags &= ~UV_HANDLE_SHUTTING;
|
|
UNREGISTER_HANDLE_REQ(loop, handle, req);
|
|
|
|
if (handle->flags & UV_HANDLE_CLOSING) {
|
|
/* Already closing. Cancel the shutdown. */
|
|
err = UV_ECANCELED;
|
|
} else if (!REQ_SUCCESS(req)) {
|
|
/* An error occurred in trying to shutdown gracefully. */
|
|
err = uv_translate_sys_error(GET_REQ_ERROR(req));
|
|
} else {
|
|
if (handle->flags & UV_HANDLE_READABLE) {
|
|
/* Initialize and optionally start the eof timer. Only do this if the pipe
|
|
* is readable and we haven't seen EOF come in ourselves. */
|
|
eof_timer_init(handle);
|
|
|
|
/* If reading start the timer right now. Otherwise uv__pipe_queue_read will
|
|
* start it. */
|
|
if (handle->flags & UV_HANDLE_READ_PENDING) {
|
|
eof_timer_start(handle);
|
|
}
|
|
|
|
} else {
|
|
/* This pipe is not readable. We can just close it to let the other end
|
|
* know that we're done writing. */
|
|
close_pipe(handle);
|
|
}
|
|
err = 0;
|
|
}
|
|
|
|
if (req->cb)
|
|
req->cb(req, err);
|
|
|
|
DECREASE_PENDING_REQ_COUNT(handle);
|
|
}
|
|
|
|
|
|
static void eof_timer_init(uv_pipe_t* pipe) {
|
|
int r;
|
|
|
|
assert(pipe->pipe.conn.eof_timer == NULL);
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
pipe->pipe.conn.eof_timer = (uv_timer_t*) uv__malloc(sizeof *pipe->pipe.conn.eof_timer);
|
|
|
|
r = uv_timer_init(pipe->loop, pipe->pipe.conn.eof_timer);
|
|
assert(r == 0); /* timers can't fail */
|
|
(void) r;
|
|
pipe->pipe.conn.eof_timer->data = pipe;
|
|
uv_unref((uv_handle_t*) pipe->pipe.conn.eof_timer);
|
|
}
|
|
|
|
|
|
static void eof_timer_start(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer != NULL) {
|
|
uv_timer_start(pipe->pipe.conn.eof_timer, eof_timer_cb, eof_timeout, 0);
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_stop(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer != NULL) {
|
|
uv_timer_stop(pipe->pipe.conn.eof_timer);
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_cb(uv_timer_t* timer) {
|
|
uv_pipe_t* pipe = (uv_pipe_t*) timer->data;
|
|
uv_loop_t* loop = timer->loop;
|
|
|
|
assert(pipe->type == UV_NAMED_PIPE);
|
|
|
|
/* This should always be true, since we start the timer only in
|
|
* uv__pipe_queue_read after successfully calling ReadFile, or in
|
|
* uv__process_pipe_shutdown_req if a read is pending, and we always
|
|
* immediately stop the timer in uv__process_pipe_read_req. */
|
|
assert(pipe->flags & UV_HANDLE_READ_PENDING);
|
|
|
|
/* If there are many packets coming off the iocp then the timer callback may
|
|
* be called before the read request is coming off the queue. Therefore we
|
|
* check here if the read request has completed but will be processed later.
|
|
*/
|
|
if ((pipe->flags & UV_HANDLE_READ_PENDING) &&
|
|
HasOverlappedIoCompleted(&pipe->read_req.u.io.overlapped)) {
|
|
return;
|
|
}
|
|
|
|
/* Force both ends off the pipe. */
|
|
close_pipe(pipe);
|
|
|
|
/* Stop reading, so the pending read that is going to fail will not be
|
|
* reported to the user. */
|
|
uv_read_stop((uv_stream_t*) pipe);
|
|
|
|
/* Report the eof and update flags. This will get reported even if the user
|
|
* stopped reading in the meantime. TODO: is that okay? */
|
|
uv__pipe_read_eof(loop, pipe, uv_null_buf_);
|
|
}
|
|
|
|
|
|
static void eof_timer_destroy(uv_pipe_t* pipe) {
|
|
assert(pipe->flags & UV_HANDLE_CONNECTION);
|
|
|
|
if (pipe->pipe.conn.eof_timer) {
|
|
uv_close((uv_handle_t*) pipe->pipe.conn.eof_timer, eof_timer_close_cb);
|
|
pipe->pipe.conn.eof_timer = NULL;
|
|
}
|
|
}
|
|
|
|
|
|
static void eof_timer_close_cb(uv_handle_t* handle) {
|
|
assert(handle->type == UV_TIMER);
|
|
uv__free(handle);
|
|
}
|
|
|
|
|
|
int uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
|
|
HANDLE os_handle = uv__get_osfhandle(file);
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_ACCESS_INFORMATION access;
|
|
DWORD duplex_flags = 0;
|
|
int err;
|
|
|
|
if (os_handle == INVALID_HANDLE_VALUE)
|
|
return UV_EBADF;
|
|
if (pipe->flags & UV_HANDLE_PIPESERVER)
|
|
return UV_EINVAL;
|
|
if (pipe->flags & UV_HANDLE_CONNECTION)
|
|
return UV_EBUSY;
|
|
|
|
uv__pipe_connection_init(pipe);
|
|
uv__once_init();
|
|
/* In order to avoid closing a stdio file descriptor 0-2, duplicate the
|
|
* underlying OS handle and forget about the original fd.
|
|
* We could also opt to use the original OS handle and just never close it,
|
|
* but then there would be no reliable way to cancel pending read operations
|
|
* upon close.
|
|
*/
|
|
if (file <= 2) {
|
|
if (!DuplicateHandle(INVALID_HANDLE_VALUE,
|
|
os_handle,
|
|
INVALID_HANDLE_VALUE,
|
|
&os_handle,
|
|
0,
|
|
FALSE,
|
|
DUPLICATE_SAME_ACCESS))
|
|
return uv_translate_sys_error(GetLastError());
|
|
assert(os_handle != INVALID_HANDLE_VALUE);
|
|
file = -1;
|
|
}
|
|
|
|
/* Determine what kind of permissions we have on this handle.
|
|
* Cygwin opens the pipe in message mode, but we can support it,
|
|
* just query the access flags and set the stream flags accordingly.
|
|
*/
|
|
nt_status = pNtQueryInformationFile(os_handle,
|
|
&io_status,
|
|
&access,
|
|
sizeof(access),
|
|
FileAccessInformation);
|
|
if (nt_status != STATUS_SUCCESS)
|
|
return UV_EINVAL;
|
|
|
|
if (pipe->ipc) {
|
|
if (!(access.AccessFlags & FILE_WRITE_DATA) ||
|
|
!(access.AccessFlags & FILE_READ_DATA)) {
|
|
return UV_EINVAL;
|
|
}
|
|
}
|
|
|
|
if (access.AccessFlags & FILE_WRITE_DATA)
|
|
duplex_flags |= UV_HANDLE_WRITABLE;
|
|
if (access.AccessFlags & FILE_READ_DATA)
|
|
duplex_flags |= UV_HANDLE_READABLE;
|
|
|
|
err = uv__set_pipe_handle(pipe->loop,
|
|
pipe,
|
|
os_handle,
|
|
file,
|
|
duplex_flags);
|
|
if (err) {
|
|
if (file == -1)
|
|
CloseHandle(os_handle);
|
|
return err;
|
|
}
|
|
|
|
if (pipe->ipc) {
|
|
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
|
|
pipe->pipe.conn.ipc_remote_pid = uv_os_getppid();
|
|
assert(pipe->pipe.conn.ipc_remote_pid != (DWORD)(uv_pid_t) -1);
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
|
|
static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
NTSTATUS nt_status;
|
|
IO_STATUS_BLOCK io_status;
|
|
FILE_NAME_INFORMATION tmp_name_info;
|
|
FILE_NAME_INFORMATION* name_info;
|
|
WCHAR* name_buf;
|
|
unsigned int addrlen;
|
|
unsigned int name_size;
|
|
unsigned int name_len;
|
|
int err;
|
|
|
|
uv__once_init();
|
|
name_info = NULL;
|
|
|
|
if (handle->name != NULL) {
|
|
/* The user might try to query the name before we are connected,
|
|
* and this is just easier to return the cached value if we have it. */
|
|
name_buf = handle->name;
|
|
name_len = wcslen(name_buf);
|
|
|
|
/* check how much space we need */
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
NULL,
|
|
0,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
return err;
|
|
} else if (addrlen >= *size) {
|
|
*size = addrlen + 1;
|
|
err = UV_ENOBUFS;
|
|
goto error;
|
|
}
|
|
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
buffer,
|
|
addrlen,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
return err;
|
|
}
|
|
|
|
*size = addrlen;
|
|
buffer[addrlen] = '\0';
|
|
|
|
return 0;
|
|
}
|
|
|
|
if (handle->handle == INVALID_HANDLE_VALUE) {
|
|
*size = 0;
|
|
return UV_EINVAL;
|
|
}
|
|
|
|
/* NtQueryInformationFile will block if another thread is performing a
|
|
* blocking operation on the queried handle. If the pipe handle is
|
|
* synchronous, there may be a worker thread currently calling ReadFile() on
|
|
* the pipe handle, which could cause a deadlock. To avoid this, interrupt
|
|
* the read. */
|
|
if (handle->flags & UV_HANDLE_CONNECTION &&
|
|
handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
|
|
uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */
|
|
}
|
|
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
&tmp_name_info,
|
|
sizeof tmp_name_info,
|
|
FileNameInformation);
|
|
if (nt_status == STATUS_BUFFER_OVERFLOW) {
|
|
name_size = sizeof(*name_info) + tmp_name_info.FileNameLength;
|
|
name_info = uv__malloc(name_size);
|
|
if (!name_info) {
|
|
*size = 0;
|
|
err = UV_ENOMEM;
|
|
goto cleanup;
|
|
}
|
|
|
|
nt_status = pNtQueryInformationFile(handle->handle,
|
|
&io_status,
|
|
name_info,
|
|
name_size,
|
|
FileNameInformation);
|
|
}
|
|
|
|
if (nt_status != STATUS_SUCCESS) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(pRtlNtStatusToDosError(nt_status));
|
|
goto error;
|
|
}
|
|
|
|
if (!name_info) {
|
|
/* the struct on stack was used */
|
|
name_buf = tmp_name_info.FileName;
|
|
name_len = tmp_name_info.FileNameLength;
|
|
} else {
|
|
name_buf = name_info->FileName;
|
|
name_len = name_info->FileNameLength;
|
|
}
|
|
|
|
if (name_len == 0) {
|
|
*size = 0;
|
|
err = 0;
|
|
goto error;
|
|
}
|
|
|
|
name_len /= sizeof(WCHAR);
|
|
|
|
/* check how much space we need */
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
NULL,
|
|
0,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
goto error;
|
|
} else if (pipe_prefix_len + addrlen >= *size) {
|
|
/* "\\\\.\\pipe" + name */
|
|
*size = pipe_prefix_len + addrlen + 1;
|
|
err = UV_ENOBUFS;
|
|
goto error;
|
|
}
|
|
|
|
memcpy(buffer, pipe_prefix, pipe_prefix_len);
|
|
addrlen = WideCharToMultiByte(CP_UTF8,
|
|
0,
|
|
name_buf,
|
|
name_len,
|
|
buffer+pipe_prefix_len,
|
|
*size-pipe_prefix_len,
|
|
NULL,
|
|
NULL);
|
|
if (!addrlen) {
|
|
*size = 0;
|
|
err = uv_translate_sys_error(GetLastError());
|
|
goto error;
|
|
}
|
|
|
|
addrlen += pipe_prefix_len;
|
|
*size = addrlen;
|
|
buffer[addrlen] = '\0';
|
|
|
|
err = 0;
|
|
|
|
error:
|
|
uv__free(name_info);
|
|
|
|
cleanup:
|
|
return err;
|
|
}
|
|
|
|
|
|
int uv_pipe_pending_count(uv_pipe_t* handle) {
|
|
if (!handle->ipc)
|
|
return 0;
|
|
return handle->pipe.conn.ipc_xfer_queue_length;
|
|
}
|
|
|
|
|
|
int uv_pipe_getsockname(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return uv__pipe_getname(handle, buffer, size);
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION ||
|
|
handle->handle != INVALID_HANDLE_VALUE) {
|
|
*size = 0;
|
|
return 0;
|
|
}
|
|
|
|
return UV_EBADF;
|
|
}
|
|
|
|
|
|
int uv_pipe_getpeername(const uv_pipe_t* handle, char* buffer, size_t* size) {
|
|
/* emulate unix behaviour */
|
|
if (handle->flags & UV_HANDLE_BOUND)
|
|
return UV_ENOTCONN;
|
|
|
|
if (handle->handle != INVALID_HANDLE_VALUE)
|
|
return uv__pipe_getname(handle, buffer, size);
|
|
|
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
|
if (handle->name != NULL)
|
|
return uv__pipe_getname(handle, buffer, size);
|
|
}
|
|
|
|
return UV_EBADF;
|
|
}
|
|
|
|
|
|
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
|
|
if (!handle->ipc)
|
|
return UV_UNKNOWN_HANDLE;
|
|
if (handle->pipe.conn.ipc_xfer_queue_length == 0)
|
|
return UV_UNKNOWN_HANDLE;
|
|
else
|
|
return UV_TCP;
|
|
}
|
|
|
|
int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
|
|
SID_IDENTIFIER_AUTHORITY sid_world = { SECURITY_WORLD_SID_AUTHORITY };
|
|
PACL old_dacl, new_dacl;
|
|
PSECURITY_DESCRIPTOR sd;
|
|
EXPLICIT_ACCESS ea;
|
|
PSID everyone;
|
|
int error;
|
|
|
|
if (handle == NULL || handle->handle == INVALID_HANDLE_VALUE)
|
|
return UV_EBADF;
|
|
|
|
if (mode != UV_READABLE &&
|
|
mode != UV_WRITABLE &&
|
|
mode != (UV_WRITABLE | UV_READABLE))
|
|
return UV_EINVAL;
|
|
|
|
if (!AllocateAndInitializeSid(&sid_world,
|
|
1,
|
|
SECURITY_WORLD_RID,
|
|
0, 0, 0, 0, 0, 0, 0,
|
|
&everyone)) {
|
|
error = GetLastError();
|
|
goto done;
|
|
}
|
|
|
|
if (GetSecurityInfo(handle->handle,
|
|
SE_KERNEL_OBJECT,
|
|
DACL_SECURITY_INFORMATION,
|
|
NULL,
|
|
NULL,
|
|
&old_dacl,
|
|
NULL,
|
|
&sd)) {
|
|
error = GetLastError();
|
|
goto clean_sid;
|
|
}
|
|
|
|
memset(&ea, 0, sizeof(EXPLICIT_ACCESS));
|
|
if (mode & UV_READABLE)
|
|
ea.grfAccessPermissions |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
|
|
if (mode & UV_WRITABLE)
|
|
ea.grfAccessPermissions |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
|
|
ea.grfAccessPermissions |= SYNCHRONIZE;
|
|
ea.grfAccessMode = SET_ACCESS;
|
|
ea.grfInheritance = NO_INHERITANCE;
|
|
ea.Trustee.TrusteeForm = TRUSTEE_IS_SID;
|
|
ea.Trustee.TrusteeType = TRUSTEE_IS_WELL_KNOWN_GROUP;
|
|
ea.Trustee.ptstrName = (LPTSTR)everyone;
|
|
|
|
if (SetEntriesInAcl(1, &ea, old_dacl, &new_dacl)) {
|
|
error = GetLastError();
|
|
goto clean_sd;
|
|
}
|
|
|
|
if (SetSecurityInfo(handle->handle,
|
|
SE_KERNEL_OBJECT,
|
|
DACL_SECURITY_INFORMATION,
|
|
NULL,
|
|
NULL,
|
|
new_dacl,
|
|
NULL)) {
|
|
error = GetLastError();
|
|
goto clean_dacl;
|
|
}
|
|
|
|
error = 0;
|
|
|
|
clean_dacl:
|
|
LocalFree((HLOCAL) new_dacl);
|
|
clean_sd:
|
|
LocalFree((HLOCAL) sd);
|
|
clean_sid:
|
|
FreeSid(everyone);
|
|
done:
|
|
return uv_translate_sys_error(error);
|
|
}
|