windows: Enable passing of TCP connections over IPC
This commit is contained in:
parent
31ff9863a1
commit
70925c3bb9
@ -318,7 +318,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
|
||||
uv_write_t ipc_header_write_req; \
|
||||
int ipc_pid; \
|
||||
uint64_t remaining_ipc_rawdata_bytes; \
|
||||
WSAPROTOCOL_INFOW* pending_socket_info; \
|
||||
struct { \
|
||||
WSAPROTOCOL_INFOW* socket_info; \
|
||||
int tcp_connection; \
|
||||
} pending_ipc_info; \
|
||||
uv_write_t* non_overlapped_writes_tail;
|
||||
|
||||
#define UV_PIPE_PRIVATE_FIELDS \
|
||||
|
||||
@ -71,7 +71,6 @@ int uv_is_active(uv_handle_t* handle) {
|
||||
|
||||
|
||||
void uv_close(uv_handle_t* handle, uv_close_cb cb) {
|
||||
uv_tcp_t* tcp;
|
||||
uv_pipe_t* pipe;
|
||||
uv_udp_t* udp;
|
||||
uv_process_t* process;
|
||||
@ -88,19 +87,7 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) {
|
||||
/* Handle-specific close actions */
|
||||
switch (handle->type) {
|
||||
case UV_TCP:
|
||||
tcp = (uv_tcp_t*)handle;
|
||||
/* If we don't shutdown before calling closesocket, windows will */
|
||||
/* silently discard the kernel send buffer and reset the connection. */
|
||||
if ((tcp->flags & UV_HANDLE_CONNECTION) &&
|
||||
!(tcp->flags & UV_HANDLE_SHUT)) {
|
||||
shutdown(tcp->socket, SD_SEND);
|
||||
tcp->flags |= UV_HANDLE_SHUTTING | UV_HANDLE_SHUT;
|
||||
}
|
||||
tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
|
||||
closesocket(tcp->socket);
|
||||
if (tcp->reqs_pending == 0) {
|
||||
uv_want_endgame(loop, handle);
|
||||
}
|
||||
uv_tcp_close((uv_tcp_t*)handle);
|
||||
return;
|
||||
|
||||
case UV_NAMED_PIPE:
|
||||
|
||||
@ -68,11 +68,12 @@ void uv_process_timers(uv_loop_t* loop);
|
||||
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x00200000
|
||||
#define UV_HANDLE_TTY_SAVED_POSITION 0x00400000
|
||||
#define UV_HANDLE_TTY_SAVED_ATTRIBUTES 0x00800000
|
||||
#define UV_HANDLE_SHARED_TCP_SERVER 0x01000000
|
||||
#define UV_HANDLE_SHARED_TCP_SOCKET 0x01000000
|
||||
#define UV_HANDLE_TCP_NODELAY 0x02000000
|
||||
#define UV_HANDLE_TCP_KEEPALIVE 0x04000000
|
||||
#define UV_HANDLE_TCP_SINGLE_ACCEPT 0x08000000
|
||||
#define UV_HANDLE_TCP_ACCEPT_STATE_CHANGING 0x10000000
|
||||
#define UV_HANDLE_TCP_SOCKET_CLOSED 0x20000000
|
||||
|
||||
void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
|
||||
void uv_process_endgames(uv_loop_t* loop);
|
||||
@ -143,7 +144,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
|
||||
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
int tcp_connection);
|
||||
|
||||
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
LPWSAPROTOCOL_INFOW protocol_info);
|
||||
|
||||
@ -43,7 +43,8 @@ static const int default_pending_pipe_instances = 4;
|
||||
|
||||
/* IPC protocol flags. */
|
||||
#define UV_IPC_RAW_DATA 0x0001
|
||||
#define UV_IPC_UV_STREAM 0x0002
|
||||
#define UV_IPC_TCP_SERVER 0x0002
|
||||
#define UV_IPC_TCP_CONNECTION 0x0004
|
||||
|
||||
/* IPC frame header. */
|
||||
typedef struct {
|
||||
@ -79,7 +80,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
||||
handle->name = NULL;
|
||||
handle->ipc_pid = 0;
|
||||
handle->remaining_ipc_rawdata_bytes = 0;
|
||||
handle->pending_socket_info = NULL;
|
||||
handle->pending_ipc_info.socket_info = NULL;
|
||||
handle->pending_ipc_info.tcp_connection = 0;
|
||||
handle->ipc = ipc;
|
||||
handle->non_overlapped_writes_tail = NULL;
|
||||
|
||||
@ -356,9 +358,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
||||
handle->flags |= UV_HANDLE_CLOSED;
|
||||
|
||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||
if (handle->pending_socket_info) {
|
||||
free(handle->pending_socket_info);
|
||||
handle->pending_socket_info = NULL;
|
||||
if (handle->pending_ipc_info.socket_info) {
|
||||
free(handle->pending_ipc_info.socket_info);
|
||||
handle->pending_ipc_info.socket_info = NULL;
|
||||
}
|
||||
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
@ -711,13 +713,14 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
||||
uv_pipe_accept_t* req;
|
||||
|
||||
if (server->ipc) {
|
||||
if (!server->pending_socket_info) {
|
||||
if (!server->pending_ipc_info.socket_info) {
|
||||
/* No valid pending sockets. */
|
||||
uv__set_sys_error(loop, WSAEWOULDBLOCK);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
|
||||
return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
|
||||
server->pending_ipc_info.tcp_connection);
|
||||
} else {
|
||||
pipe_client = (uv_pipe_t*)client;
|
||||
|
||||
@ -1051,9 +1054,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Only TCP server handles are supported for sharing. */
|
||||
if (send_handle && (send_handle->type != UV_TCP ||
|
||||
send_handle->flags & UV_HANDLE_CONNECTION)) {
|
||||
/* Only TCP handles are supported for sharing. */
|
||||
if (send_handle && send_handle->type != UV_TCP) {
|
||||
uv__set_artificial_error(loop, UV_ENOTSUP);
|
||||
return -1;
|
||||
}
|
||||
@ -1091,7 +1093,11 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
|
||||
&ipc_frame.socket_info)) {
|
||||
return -1;
|
||||
}
|
||||
ipc_frame.header.flags |= UV_IPC_UV_STREAM;
|
||||
ipc_frame.header.flags |= UV_IPC_TCP_SERVER;
|
||||
|
||||
if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) {
|
||||
ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION;
|
||||
}
|
||||
}
|
||||
|
||||
if (bufcnt == 1) {
|
||||
@ -1132,7 +1138,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
|
||||
|
||||
result = WriteFile(handle->handle,
|
||||
&ipc_frame,
|
||||
ipc_frame.header.flags & UV_IPC_UV_STREAM ?
|
||||
ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
|
||||
sizeof(ipc_frame) : sizeof(ipc_frame.header),
|
||||
NULL,
|
||||
&ipc_header_req->overlapped);
|
||||
@ -1146,7 +1152,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
|
||||
ipc_header_req->queued_bytes = 0;
|
||||
} else {
|
||||
/* Request queued by the kernel. */
|
||||
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ?
|
||||
ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ?
|
||||
sizeof(ipc_frame) : sizeof(ipc_frame.header);
|
||||
handle->write_queue_size += req->queued_bytes;
|
||||
}
|
||||
@ -1330,9 +1336,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
}
|
||||
|
||||
assert(bytes == sizeof(ipc_frame.header));
|
||||
assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
|
||||
assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA |
|
||||
UV_IPC_TCP_CONNECTION));
|
||||
|
||||
if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
|
||||
if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) {
|
||||
assert(avail - sizeof(ipc_frame.header) >=
|
||||
sizeof(ipc_frame.socket_info));
|
||||
|
||||
@ -1350,14 +1357,16 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
|
||||
|
||||
/* Store the pending socket info. */
|
||||
assert(!handle->pending_socket_info);
|
||||
handle->pending_socket_info =
|
||||
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
|
||||
if (!handle->pending_socket_info) {
|
||||
assert(!handle->pending_ipc_info.socket_info);
|
||||
handle->pending_ipc_info.socket_info =
|
||||
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
|
||||
if (!handle->pending_ipc_info.socket_info) {
|
||||
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
|
||||
}
|
||||
|
||||
*(handle->pending_socket_info) = ipc_frame.socket_info;
|
||||
*(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
|
||||
handle->pending_ipc_info.tcp_connection =
|
||||
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
|
||||
}
|
||||
|
||||
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
|
||||
@ -1385,14 +1394,14 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||
handle->remaining_ipc_rawdata_bytes - bytes;
|
||||
if (handle->read2_cb) {
|
||||
handle->read2_cb(handle, bytes, buf,
|
||||
handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
|
||||
handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
|
||||
} else if (handle->read_cb) {
|
||||
handle->read_cb((uv_stream_t*)handle, bytes, buf);
|
||||
}
|
||||
|
||||
if (handle->pending_socket_info) {
|
||||
free(handle->pending_socket_info);
|
||||
handle->pending_socket_info = NULL;
|
||||
if (handle->pending_ipc_info.socket_info) {
|
||||
free(handle->pending_ipc_info.socket_info);
|
||||
handle->pending_ipc_info.socket_info = NULL;
|
||||
}
|
||||
} else {
|
||||
handle->read_cb((uv_stream_t*)handle, bytes, buf);
|
||||
|
||||
145
src/win/tcp.c
145
src/win/tcp.c
@ -199,6 +199,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
|
||||
assert(!(handle->flags & UV_HANDLE_CLOSED));
|
||||
handle->flags |= UV_HANDLE_CLOSED;
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) {
|
||||
closesocket(handle->socket);
|
||||
handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
|
||||
}
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
|
||||
@ -218,6 +223,18 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
|
||||
handle->accept_reqs = NULL;
|
||||
}
|
||||
|
||||
if (handle->flags & UV_HANDLE_CONNECTION &&
|
||||
handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
||||
UnregisterWait(handle->read_req.wait_handle);
|
||||
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
|
||||
}
|
||||
if (handle->read_req.event_handle) {
|
||||
CloseHandle(handle->read_req.event_handle);
|
||||
handle->read_req.event_handle = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (handle->close_cb) {
|
||||
handle->close_cb((uv_handle_t*)handle);
|
||||
}
|
||||
@ -341,7 +358,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
|
||||
/* Prepare the overlapped structure. */
|
||||
memset(&(req->overlapped), 0, sizeof(req->overlapped));
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
|
||||
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
|
||||
}
|
||||
|
||||
success = handle->func_acceptex(handle->socket,
|
||||
@ -415,6 +432,13 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
|
||||
buf.len = 0;
|
||||
}
|
||||
|
||||
/* Prepare the overlapped structure. */
|
||||
memset(&(req->overlapped), 0, sizeof(req->overlapped));
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
assert(req->event_handle);
|
||||
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
|
||||
}
|
||||
|
||||
flags = 0;
|
||||
result = WSARecv(handle->socket,
|
||||
(WSABUF*)&buf,
|
||||
@ -434,6 +458,14 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
|
||||
/* The req will be processed with IOCP. */
|
||||
handle->flags |= UV_HANDLE_READ_PENDING;
|
||||
handle->reqs_pending++;
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
|
||||
req->wait_handle == INVALID_HANDLE_VALUE &&
|
||||
!RegisterWaitForSingleObject(&req->wait_handle,
|
||||
req->overlapped.hEvent, post_completion, (void*) req,
|
||||
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
||||
SET_REQ_ERROR(req, GetLastError());
|
||||
uv_insert_pending_req(loop, (uv_req_t*)req);
|
||||
}
|
||||
} else {
|
||||
/* Make this req pending reporting an error. */
|
||||
SET_REQ_ERROR(req, WSAGetLastError());
|
||||
@ -466,7 +498,7 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
|
||||
}
|
||||
}
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SERVER) &&
|
||||
if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
|
||||
listen(handle->socket, backlog) == SOCKET_ERROR) {
|
||||
uv__set_sys_error(loop, WSAGetLastError());
|
||||
return -1;
|
||||
@ -593,10 +625,18 @@ int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
|
||||
handle->read_cb = read_cb;
|
||||
handle->alloc_cb = alloc_cb;
|
||||
|
||||
/* If reading was stopped and then started again, there could stell be a */
|
||||
/* If reading was stopped and then started again, there could still be a */
|
||||
/* read request pending. */
|
||||
if (!(handle->flags & UV_HANDLE_READ_PENDING))
|
||||
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
|
||||
!handle->read_req.event_handle) {
|
||||
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
|
||||
if (!handle->read_req.event_handle) {
|
||||
uv_fatal_error(GetLastError(), "CreateEvent");
|
||||
}
|
||||
}
|
||||
uv_tcp_queue_read(loop, handle);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -790,6 +830,16 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
|
||||
req->cb = cb;
|
||||
memset(&req->overlapped, 0, sizeof(req->overlapped));
|
||||
|
||||
/* Prepare the overlapped structure. */
|
||||
memset(&(req->overlapped), 0, sizeof(req->overlapped));
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
|
||||
if (!req->event_handle) {
|
||||
uv_fatal_error(GetLastError(), "CreateEvent");
|
||||
}
|
||||
req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
|
||||
}
|
||||
|
||||
result = WSASend(handle->socket,
|
||||
(WSABUF*)bufs,
|
||||
bufcnt,
|
||||
@ -812,6 +862,14 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle,
|
||||
handle->write_reqs_pending++;
|
||||
handle->write_queue_size += req->queued_bytes;
|
||||
uv_ref(loop);
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
|
||||
req->wait_handle == INVALID_HANDLE_VALUE &&
|
||||
!RegisterWaitForSingleObject(&req->wait_handle,
|
||||
req->overlapped.hEvent, post_completion, (void*) req,
|
||||
INFINITE, WT_EXECUTEINWAITTHREAD)) {
|
||||
SET_REQ_ERROR(req, GetLastError());
|
||||
uv_insert_pending_req(loop, (uv_req_t*)req);
|
||||
}
|
||||
} else {
|
||||
/* Send failed due to an error. */
|
||||
uv__set_sys_error(loop, WSAGetLastError());
|
||||
@ -945,6 +1003,17 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
assert(handle->write_queue_size >= req->queued_bytes);
|
||||
handle->write_queue_size -= req->queued_bytes;
|
||||
|
||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||
if (req->wait_handle != INVALID_HANDLE_VALUE) {
|
||||
UnregisterWait(req->wait_handle);
|
||||
req->wait_handle = INVALID_HANDLE_VALUE;
|
||||
}
|
||||
if (req->event_handle) {
|
||||
CloseHandle(req->event_handle);
|
||||
req->event_handle = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (req->cb) {
|
||||
uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req));
|
||||
((uv_write_cb)req->cb)(req, loop->last_err.code == UV_OK ? 0 : -1);
|
||||
@ -1036,7 +1105,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
|
||||
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
||||
int tcp_connection) {
|
||||
SOCKET socket = WSASocketW(AF_INET,
|
||||
SOCK_STREAM,
|
||||
IPPROTO_IP,
|
||||
@ -1050,13 +1120,22 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
|
||||
}
|
||||
|
||||
tcp->flags |= UV_HANDLE_BOUND;
|
||||
tcp->flags |= UV_HANDLE_SHARED_TCP_SERVER;
|
||||
tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
if (tcp_connection) {
|
||||
uv_connection_init((uv_stream_t*)tcp);
|
||||
}
|
||||
|
||||
if (socket_protocol_info->iAddressFamily == AF_INET6) {
|
||||
tcp->flags |= UV_HANDLE_IPV6;
|
||||
}
|
||||
|
||||
return uv_tcp_set_socket(tcp->loop, tcp, socket, 1);
|
||||
if (uv_tcp_set_socket(tcp->loop, tcp, socket, 1) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tcp->loop->active_tcp_streams++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -1097,15 +1176,12 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
|
||||
|
||||
int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
LPWSAPROTOCOL_INFOW protocol_info) {
|
||||
assert(!(handle->flags & UV_HANDLE_CONNECTION));
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
|
||||
/*
|
||||
* We're about to share the socket with another process. Because
|
||||
* this is a listening socket, we assume that the other process will
|
||||
* be accepting connections on it. So, before sharing the socket
|
||||
* with another process, we call listen here in the parent process.
|
||||
* This needs to be modified if the socket is shared with
|
||||
* another process for anything other than accepting connections.
|
||||
*/
|
||||
|
||||
if (!(handle->flags & UV_HANDLE_LISTENING)) {
|
||||
@ -1117,8 +1193,7 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
uv__set_sys_error(handle->loop, WSAGetLastError());
|
||||
return -1;
|
||||
}
|
||||
|
||||
handle->flags |= UV_HANDLE_SHARED_TCP_SERVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) {
|
||||
@ -1126,6 +1201,8 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid,
|
||||
return -1;
|
||||
}
|
||||
|
||||
handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1162,3 +1239,45 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv_tcp_close(uv_tcp_t* tcp) {
|
||||
int non_ifs_lsp;
|
||||
int close_socket = 1;
|
||||
|
||||
/*
|
||||
* In order for winsock to do a graceful close there must not be
|
||||
* any pending reads.
|
||||
*/
|
||||
if (tcp->flags & UV_HANDLE_READ_PENDING) {
|
||||
/* Just do shutdown on non-shared sockets, which ensures graceful close. */
|
||||
if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) {
|
||||
shutdown(tcp->socket, SD_SEND);
|
||||
tcp->flags |= UV_HANDLE_SHUT;
|
||||
} else {
|
||||
/* Check if we have any non-IFS LSPs stacked on top of TCP */
|
||||
non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 :
|
||||
uv_tcp_non_ifs_lsp_ipv4;
|
||||
|
||||
if (!non_ifs_lsp) {
|
||||
/*
|
||||
* Shared socket with no non-IFS LSPs, request to cancel pending I/O.
|
||||
* The socket will be closed inside endgame.
|
||||
*/
|
||||
CancelIo((HANDLE)tcp->socket);
|
||||
close_socket = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
|
||||
|
||||
if (close_socket) {
|
||||
closesocket(tcp->socket);
|
||||
tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
|
||||
}
|
||||
|
||||
if (tcp->reqs_pending == 0) {
|
||||
uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
|
||||
}
|
||||
}
|
||||
|
||||
216
test/run-tests.c
216
test/run-tests.c
@ -32,6 +32,11 @@
|
||||
/* The time in milliseconds after which a single test times out. */
|
||||
#define TEST_TIMEOUT 5000
|
||||
|
||||
int ipc_helper(int listen_after_write);
|
||||
int ipc_helper_tcp_connection(void);
|
||||
int ipc_send_recv_helper(void);
|
||||
int stdio_over_pipes_helper(void);
|
||||
|
||||
static int maybe_run_test(int argc, char **argv);
|
||||
|
||||
|
||||
@ -51,212 +56,6 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
|
||||
|
||||
static uv_pipe_t channel;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_write_t conn_notify_req;
|
||||
static int close_cb_called;
|
||||
static int connection_accepted;
|
||||
|
||||
static uv_pipe_t stdin_pipe;
|
||||
static uv_pipe_t stdout_pipe;
|
||||
static int on_pipe_read_called;
|
||||
static int after_write_called;
|
||||
|
||||
|
||||
static void close_cb(uv_handle_t* handle) {
|
||||
close_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
static void close_conn_cb(uv_handle_t* handle) {
|
||||
free(handle);
|
||||
close_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
void conn_notify_write_cb(uv_write_t* req, int status) {
|
||||
uv_close((uv_handle_t*)&tcp_server, close_cb);
|
||||
uv_close((uv_handle_t*)&channel, close_cb);
|
||||
}
|
||||
|
||||
|
||||
static void ipc_on_connection(uv_stream_t* server, int status) {
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
uv_tcp_t* conn;
|
||||
|
||||
if (!connection_accepted) {
|
||||
/*
|
||||
* Accept the connection and close it. Also let the other
|
||||
* side know.
|
||||
*/
|
||||
ASSERT(status == 0);
|
||||
ASSERT((uv_stream_t*)&tcp_server == server);
|
||||
|
||||
conn = malloc(sizeof(*conn));
|
||||
ASSERT(conn);
|
||||
|
||||
r = uv_tcp_init(server->loop, conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept(server, (uv_stream_t*)conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_close((uv_handle_t*)conn, close_conn_cb);
|
||||
|
||||
buf = uv_buf_init("accepted_connection\n", 20);
|
||||
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
NULL, conn_notify_write_cb);
|
||||
ASSERT(r == 0);
|
||||
|
||||
connection_accepted = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int ipc_helper(int listen_after_write) {
|
||||
/*
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel that we
|
||||
* over which a handle will be transmitted. In this initial version only
|
||||
* data is transfered over the channel. XXX edit this comment after handle
|
||||
* transfer is added.
|
||||
*/
|
||||
|
||||
uv_write_t write_req;
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
|
||||
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&channel, 0);
|
||||
|
||||
ASSERT(uv_is_readable((uv_stream_t*) &channel));
|
||||
ASSERT(uv_is_writable((uv_stream_t*) &channel));
|
||||
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
|
||||
ASSERT(r == 0);
|
||||
|
||||
if (!listen_after_write) {
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
buf = uv_buf_init("hello\n", 6);
|
||||
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
(uv_stream_t*)&tcp_server, NULL);
|
||||
ASSERT(r == 0);
|
||||
|
||||
if (listen_after_write) {
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
r = uv_run(uv_default_loop());
|
||||
ASSERT(r == 0);
|
||||
|
||||
ASSERT(connection_accepted == 1);
|
||||
ASSERT(close_cb_called == 3);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
|
||||
ASSERT(nread > 0);
|
||||
ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
|
||||
on_pipe_read_called++;
|
||||
|
||||
free(buf.base);
|
||||
|
||||
uv_close((uv_handle_t*)&stdin_pipe, close_cb);
|
||||
uv_close((uv_handle_t*)&stdout_pipe, close_cb);
|
||||
}
|
||||
|
||||
|
||||
static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle,
|
||||
size_t suggested_size) {
|
||||
uv_buf_t buf;
|
||||
buf.base = (char*)malloc(suggested_size);
|
||||
buf.len = suggested_size;
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
||||
static void after_pipe_write(uv_write_t* req, int status) {
|
||||
ASSERT(status == 0);
|
||||
after_write_called++;
|
||||
}
|
||||
|
||||
|
||||
static int stdio_over_pipes_helper() {
|
||||
/* Write several buffers to test that the write order is preserved. */
|
||||
char* buffers[] = {
|
||||
"he",
|
||||
"ll",
|
||||
"o ",
|
||||
"wo",
|
||||
"rl",
|
||||
"d",
|
||||
"\n"
|
||||
};
|
||||
|
||||
uv_write_t write_req[ARRAY_SIZE(buffers)];
|
||||
uv_buf_t buf[ARRAY_SIZE(buffers)];
|
||||
int r, i;
|
||||
uv_loop_t* loop = uv_default_loop();
|
||||
|
||||
ASSERT(UV_NAMED_PIPE == uv_guess_handle(0));
|
||||
ASSERT(UV_NAMED_PIPE == uv_guess_handle(1));
|
||||
|
||||
r = uv_pipe_init(loop, &stdin_pipe, 0);
|
||||
ASSERT(r == 0);
|
||||
r = uv_pipe_init(loop, &stdout_pipe, 0);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&stdin_pipe, 0);
|
||||
uv_pipe_open(&stdout_pipe, 1);
|
||||
|
||||
/* Unref both stdio handles to make sure that all writes complete. */
|
||||
uv_unref(loop);
|
||||
uv_unref(loop);
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(buffers); i++) {
|
||||
buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i]));
|
||||
}
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(buffers); i++) {
|
||||
r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1,
|
||||
after_pipe_write);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
uv_run(loop);
|
||||
|
||||
ASSERT(after_write_called == 7);
|
||||
ASSERT(on_pipe_read_called == 0);
|
||||
ASSERT(close_cb_called == 0);
|
||||
|
||||
uv_ref(loop);
|
||||
uv_ref(loop);
|
||||
|
||||
r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc,
|
||||
on_pipe_read);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_run(loop);
|
||||
|
||||
ASSERT(after_write_called == 7);
|
||||
ASSERT(on_pipe_read_called == 1);
|
||||
ASSERT(close_cb_called == 2);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int maybe_run_test(int argc, char **argv) {
|
||||
if (strcmp(argv[1], "--list") == 0) {
|
||||
print_tests(stdout);
|
||||
@ -272,10 +71,13 @@ static int maybe_run_test(int argc, char **argv) {
|
||||
}
|
||||
|
||||
if (strcmp(argv[1], "ipc_send_recv_helper") == 0) {
|
||||
int ipc_send_recv_helper(void); /* See test-ipc-send-recv.c */
|
||||
return ipc_send_recv_helper();
|
||||
}
|
||||
|
||||
if (strcmp(argv[1], "ipc_helper_tcp_connection") == 0) {
|
||||
return ipc_helper_tcp_connection();
|
||||
}
|
||||
|
||||
if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) {
|
||||
return stdio_over_pipes_helper();
|
||||
}
|
||||
|
||||
347
test/test-ipc.c
347
test/test-ipc.c
@ -27,17 +27,28 @@
|
||||
|
||||
static uv_pipe_t channel;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_tcp_t tcp_connection;
|
||||
|
||||
static int exit_cb_called;
|
||||
static int read2_cb_called;
|
||||
static int tcp_write_cb_called;
|
||||
static int tcp_read_cb_called;
|
||||
static int on_pipe_read_called;
|
||||
static int local_conn_accepted;
|
||||
static int remote_conn_accepted;
|
||||
static int tcp_server_listening;
|
||||
|
||||
static uv_write_t write_req;
|
||||
static uv_pipe_t channel;
|
||||
static uv_tcp_t tcp_server;
|
||||
static uv_write_t conn_notify_req;
|
||||
static int close_cb_called;
|
||||
static int connection_accepted;
|
||||
static int tcp_conn_read_cb_called;
|
||||
static int tcp_conn_write_cb_called;
|
||||
|
||||
typedef struct {
|
||||
uv_connect_t conn_req;
|
||||
uv_write_t tcp_write_req;
|
||||
uv_tcp_t conn;
|
||||
} tcp_conn;
|
||||
|
||||
@ -49,7 +60,7 @@ static void close_server_conn_cb(uv_handle_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
static void ipc_on_connection(uv_stream_t* server, int status) {
|
||||
static void on_connection(uv_stream_t* server, int status) {
|
||||
uv_tcp_t* conn;
|
||||
int r;
|
||||
|
||||
@ -156,7 +167,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
|
||||
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, on_connection);
|
||||
ASSERT(r == 0);
|
||||
|
||||
tcp_server_listening = 1;
|
||||
@ -214,32 +225,127 @@ void spawn_helper(uv_pipe_t* channel,
|
||||
}
|
||||
|
||||
|
||||
static int run_ipc_test(const char* helper) {
|
||||
static void on_tcp_write(uv_write_t* req, int status) {
|
||||
ASSERT(status == 0);
|
||||
ASSERT(req->handle == (uv_stream_t*)&tcp_connection);
|
||||
tcp_write_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
static uv_buf_t on_read_alloc(uv_handle_t* handle, size_t suggested_size) {
|
||||
uv_buf_t buf;
|
||||
buf.base = (char*)malloc(suggested_size);
|
||||
buf.len = suggested_size;
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
||||
static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
|
||||
ASSERT(nread > 0);
|
||||
ASSERT(memcmp("hello again\n", buf.base, nread) == 0);
|
||||
ASSERT(tcp == (uv_stream_t*)&tcp_connection);
|
||||
free(buf.base);
|
||||
|
||||
tcp_read_cb_called++;
|
||||
|
||||
uv_close((uv_handle_t*)tcp, NULL);
|
||||
uv_close((uv_handle_t*)&channel, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void on_read_connection(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
|
||||
uv_handle_type pending) {
|
||||
int r;
|
||||
uv_buf_t outbuf;
|
||||
uv_err_t err;
|
||||
|
||||
if (nread == 0) {
|
||||
/* Everything OK, but nothing read. */
|
||||
free(buf.base);
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread < 0) {
|
||||
err = uv_last_error(pipe->loop);
|
||||
if (err.code == UV_EOF) {
|
||||
free(buf.base);
|
||||
return;
|
||||
}
|
||||
|
||||
printf("error recving on channel: %s\n", uv_strerror(err));
|
||||
abort();
|
||||
}
|
||||
|
||||
fprintf(stderr, "got %d bytes\n", (int)nread);
|
||||
|
||||
ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
|
||||
read2_cb_called++;
|
||||
|
||||
/* Accept the pending TCP connection */
|
||||
ASSERT(pending == UV_TCP);
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_connection);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection);
|
||||
ASSERT(r == 0);
|
||||
|
||||
/* Make sure that the expected data is correctly multiplexed. */
|
||||
ASSERT(memcmp("hello\n", buf.base, nread) == 0);
|
||||
|
||||
/* Write/read to/from the connection */
|
||||
outbuf = uv_buf_init("world\n", 6);
|
||||
r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
|
||||
on_tcp_write);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
|
||||
ASSERT(r == 0);
|
||||
|
||||
free(buf.base);
|
||||
}
|
||||
|
||||
|
||||
static int run_ipc_test(const char* helper, uv_read2_cb read_cb) {
|
||||
uv_process_t process;
|
||||
int r;
|
||||
|
||||
spawn_helper(&channel, &process, helper);
|
||||
uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read);
|
||||
uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb);
|
||||
|
||||
r = uv_run(uv_default_loop());
|
||||
ASSERT(r == 0);
|
||||
|
||||
ASSERT(local_conn_accepted == 1);
|
||||
ASSERT(remote_conn_accepted == 1);
|
||||
ASSERT(read2_cb_called == 1);
|
||||
ASSERT(exit_cb_called == 1);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
TEST_IMPL(ipc_listen_before_write) {
|
||||
return run_ipc_test("ipc_helper_listen_before_write");
|
||||
int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
|
||||
ASSERT(local_conn_accepted == 1);
|
||||
ASSERT(remote_conn_accepted == 1);
|
||||
ASSERT(read2_cb_called == 1);
|
||||
ASSERT(exit_cb_called == 1);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
TEST_IMPL(ipc_listen_after_write) {
|
||||
return run_ipc_test("ipc_helper_listen_after_write");
|
||||
int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
|
||||
ASSERT(local_conn_accepted == 1);
|
||||
ASSERT(remote_conn_accepted == 1);
|
||||
ASSERT(read2_cb_called == 1);
|
||||
ASSERT(exit_cb_called == 1);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
TEST_IMPL(ipc_tcp_connection) {
|
||||
int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
|
||||
ASSERT(read2_cb_called == 1);
|
||||
ASSERT(tcp_write_cb_called == 1);
|
||||
ASSERT(tcp_read_cb_called == 1);
|
||||
ASSERT(exit_cb_called == 1);
|
||||
return r;
|
||||
}
|
||||
|
||||
|
||||
@ -287,3 +393,220 @@ TEST_IMPL(listen_no_simultaneous_accepts) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
/* Everything here runs in a child process. */
|
||||
|
||||
tcp_conn conn;
|
||||
|
||||
|
||||
static void close_cb(uv_handle_t* handle) {
|
||||
close_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
static void conn_notify_write_cb(uv_write_t* req, int status) {
|
||||
uv_close((uv_handle_t*)&tcp_server, close_cb);
|
||||
uv_close((uv_handle_t*)&channel, close_cb);
|
||||
}
|
||||
|
||||
|
||||
static void tcp_connection_write_cb(uv_write_t* req, int status) {
|
||||
ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle);
|
||||
uv_close((uv_handle_t*)req->handle, close_cb);
|
||||
uv_close((uv_handle_t*)&channel, close_cb);
|
||||
uv_close((uv_handle_t*)&tcp_server, close_cb);
|
||||
tcp_conn_write_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
static void on_tcp_child_process_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
|
||||
uv_buf_t outbuf;
|
||||
int r;
|
||||
|
||||
if (nread < 0) {
|
||||
if (uv_last_error(tcp->loop).code == UV_EOF) {
|
||||
free(buf.base);
|
||||
return;
|
||||
}
|
||||
|
||||
printf("error recving on tcp connection: %s\n",
|
||||
uv_strerror(uv_last_error(tcp->loop)));
|
||||
abort();
|
||||
}
|
||||
|
||||
ASSERT(nread > 0);
|
||||
ASSERT(memcmp("world\n", buf.base, nread) == 0);
|
||||
on_pipe_read_called++;
|
||||
free(buf.base);
|
||||
|
||||
/* Write to the socket */
|
||||
outbuf = uv_buf_init("hello again\n", 12);
|
||||
r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
|
||||
ASSERT(r == 0);
|
||||
|
||||
tcp_conn_read_cb_called++;
|
||||
}
|
||||
|
||||
|
||||
static void connect_child_process_cb(uv_connect_t* req, int status) {
|
||||
int r;
|
||||
|
||||
ASSERT(status == 0);
|
||||
r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
|
||||
static void ipc_on_connection(uv_stream_t* server, int status) {
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
|
||||
if (!connection_accepted) {
|
||||
/*
|
||||
* Accept the connection and close it. Also let the other
|
||||
* side know.
|
||||
*/
|
||||
ASSERT(status == 0);
|
||||
ASSERT((uv_stream_t*)&tcp_server == server);
|
||||
|
||||
r = uv_tcp_init(server->loop, &conn.conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept(server, (uv_stream_t*)&conn.conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_close((uv_handle_t*)&conn.conn, close_cb);
|
||||
|
||||
buf = uv_buf_init("accepted_connection\n", 20);
|
||||
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
NULL, conn_notify_write_cb);
|
||||
ASSERT(r == 0);
|
||||
|
||||
connection_accepted = 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
uv_tcp_t* conn;
|
||||
|
||||
ASSERT(status == 0);
|
||||
ASSERT((uv_stream_t*)&tcp_server == server);
|
||||
|
||||
conn = malloc(sizeof(*conn));
|
||||
ASSERT(conn);
|
||||
|
||||
r = uv_tcp_init(server->loop, conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_accept(server, (uv_stream_t*)conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
/* Send the accepted connection to the other process */
|
||||
buf = uv_buf_init("hello\n", 6);
|
||||
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
(uv_stream_t*)conn, NULL);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_read_start((uv_stream_t*)conn, on_read_alloc, on_tcp_child_process_read);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_close((uv_handle_t*)conn, close_cb);
|
||||
}
|
||||
|
||||
|
||||
int ipc_helper(int listen_after_write) {
|
||||
/*
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel that we
|
||||
* over which a handle will be transmitted.
|
||||
*/
|
||||
|
||||
uv_write_t write_req;
|
||||
int r;
|
||||
uv_buf_t buf;
|
||||
|
||||
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&channel, 0);
|
||||
|
||||
ASSERT(uv_is_readable((uv_stream_t*) &channel));
|
||||
ASSERT(uv_is_writable((uv_stream_t*) &channel));
|
||||
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
|
||||
ASSERT(r == 0);
|
||||
|
||||
if (!listen_after_write) {
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
buf = uv_buf_init("hello\n", 6);
|
||||
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
|
||||
(uv_stream_t*)&tcp_server, NULL);
|
||||
ASSERT(r == 0);
|
||||
|
||||
if (listen_after_write) {
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
r = uv_run(uv_default_loop());
|
||||
ASSERT(r == 0);
|
||||
|
||||
ASSERT(connection_accepted == 1);
|
||||
ASSERT(close_cb_called == 3);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int ipc_helper_tcp_connection() {
|
||||
/*
|
||||
* This is launched from test-ipc.c. stdin is a duplex channel that we
|
||||
* over which a handle will be transmitted.
|
||||
*/
|
||||
|
||||
int r;
|
||||
struct sockaddr_in addr;
|
||||
|
||||
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&channel, 0);
|
||||
|
||||
ASSERT(uv_is_readable((uv_stream_t*)&channel));
|
||||
ASSERT(uv_is_writable((uv_stream_t*)&channel));
|
||||
|
||||
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
/* Make a connection to the server */
|
||||
r = uv_tcp_init(uv_default_loop(), &conn.conn);
|
||||
ASSERT(r == 0);
|
||||
|
||||
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
|
||||
r = uv_tcp_connect(&conn.conn_req, (uv_tcp_t*)&conn.conn, addr, connect_child_process_cb);
|
||||
ASSERT(r == 0);
|
||||
|
||||
r = uv_run(uv_default_loop());
|
||||
ASSERT(r == 0);
|
||||
|
||||
ASSERT(tcp_conn_read_cb_called == 1);
|
||||
ASSERT(tcp_conn_write_cb_called == 1);
|
||||
ASSERT(close_cb_called == 4);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -26,6 +26,7 @@ TEST_DECLARE (ipc_listen_before_write)
|
||||
TEST_DECLARE (ipc_listen_after_write)
|
||||
TEST_DECLARE (ipc_send_recv_pipe)
|
||||
TEST_DECLARE (ipc_send_recv_tcp)
|
||||
TEST_DECLARE (ipc_tcp_connection)
|
||||
TEST_DECLARE (tcp_ping_pong)
|
||||
TEST_DECLARE (tcp_ping_pong_v6)
|
||||
TEST_DECLARE (pipe_ping_pong)
|
||||
@ -180,6 +181,7 @@ TASK_LIST_START
|
||||
TEST_ENTRY (ipc_listen_after_write)
|
||||
TEST_ENTRY (ipc_send_recv_pipe)
|
||||
TEST_ENTRY (ipc_send_recv_tcp)
|
||||
TEST_ENTRY (ipc_tcp_connection)
|
||||
|
||||
TEST_ENTRY (tcp_ping_pong)
|
||||
TEST_HELPER (tcp_ping_pong, tcp4_echo_server)
|
||||
|
||||
@ -40,6 +40,7 @@ static uv_loop_t* loop;
|
||||
static char output[OUTPUT_SIZE];
|
||||
static int output_used;
|
||||
|
||||
|
||||
typedef struct {
|
||||
uv_write_t req;
|
||||
uv_buf_t buf;
|
||||
@ -155,3 +156,101 @@ TEST_IMPL(stdio_over_pipes) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* Everything here runs in a child process. */
|
||||
|
||||
static int on_pipe_read_called;
|
||||
static int after_write_called;
|
||||
static uv_pipe_t stdin_pipe;
|
||||
static uv_pipe_t stdout_pipe;
|
||||
|
||||
static void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
|
||||
ASSERT(nread > 0);
|
||||
ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
|
||||
on_pipe_read_called++;
|
||||
|
||||
free(buf.base);
|
||||
|
||||
uv_close((uv_handle_t*)&stdin_pipe, close_cb);
|
||||
uv_close((uv_handle_t*)&stdout_pipe, close_cb);
|
||||
}
|
||||
|
||||
|
||||
static void after_pipe_write(uv_write_t* req, int status) {
|
||||
ASSERT(status == 0);
|
||||
after_write_called++;
|
||||
}
|
||||
|
||||
|
||||
static uv_buf_t on_read_alloc(uv_handle_t* handle,
|
||||
size_t suggested_size) {
|
||||
uv_buf_t buf;
|
||||
buf.base = (char*)malloc(suggested_size);
|
||||
buf.len = suggested_size;
|
||||
return buf;
|
||||
}
|
||||
|
||||
|
||||
int stdio_over_pipes_helper() {
|
||||
/* Write several buffers to test that the write order is preserved. */
|
||||
char* buffers[] = {
|
||||
"he",
|
||||
"ll",
|
||||
"o ",
|
||||
"wo",
|
||||
"rl",
|
||||
"d",
|
||||
"\n"
|
||||
};
|
||||
|
||||
uv_write_t write_req[ARRAY_SIZE(buffers)];
|
||||
uv_buf_t buf[ARRAY_SIZE(buffers)];
|
||||
int r, i;
|
||||
uv_loop_t* loop = uv_default_loop();
|
||||
|
||||
ASSERT(UV_NAMED_PIPE == uv_guess_handle(0));
|
||||
ASSERT(UV_NAMED_PIPE == uv_guess_handle(1));
|
||||
|
||||
r = uv_pipe_init(loop, &stdin_pipe, 0);
|
||||
ASSERT(r == 0);
|
||||
r = uv_pipe_init(loop, &stdout_pipe, 0);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_pipe_open(&stdin_pipe, 0);
|
||||
uv_pipe_open(&stdout_pipe, 1);
|
||||
|
||||
/* Unref both stdio handles to make sure that all writes complete. */
|
||||
uv_unref(loop);
|
||||
uv_unref(loop);
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(buffers); i++) {
|
||||
buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i]));
|
||||
}
|
||||
|
||||
for (i = 0; i < ARRAY_SIZE(buffers); i++) {
|
||||
r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1,
|
||||
after_pipe_write);
|
||||
ASSERT(r == 0);
|
||||
}
|
||||
|
||||
uv_run(loop);
|
||||
|
||||
ASSERT(after_write_called == 7);
|
||||
ASSERT(on_pipe_read_called == 0);
|
||||
ASSERT(close_cb_called == 0);
|
||||
|
||||
uv_ref(loop);
|
||||
uv_ref(loop);
|
||||
|
||||
r = uv_read_start((uv_stream_t*)&stdin_pipe, on_read_alloc,
|
||||
on_pipe_read);
|
||||
ASSERT(r == 0);
|
||||
|
||||
uv_run(loop);
|
||||
|
||||
ASSERT(after_write_called == 7);
|
||||
ASSERT(on_pipe_read_called == 1);
|
||||
ASSERT(close_cb_called == 2);
|
||||
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user