windows ipc fixes

This commit is contained in:
Igor Zinkovsky 2011-10-06 00:58:25 -07:00 committed by Ryan Dahl
parent 90e88aabf6
commit 34f719d7a5
4 changed files with 59 additions and 53 deletions

View File

@ -43,11 +43,6 @@ typedef struct uv_buf_t {
char* base;
} uv_buf_t;
typedef struct uv_duplicate_socket_info_s {
WSAPROTOCOL_INFOW socket_info;
struct uv_duplicate_socket_info_s* next;
} uv_duplicate_socket_info_t;
typedef int uv_file;
RB_HEAD(uv_timer_tree_s, uv_timer_s);
@ -103,7 +98,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct uv_req_s* next_req;
#define UV_WRITE_PRIVATE_FIELDS \
/* empty */
int ipc_header;
#define UV_CONNECT_PRIVATE_FIELDS \
/* empty */
@ -181,7 +176,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
uv_duplicate_socket_info_t* pending_ipc_sockets;
WSAPROTOCOL_INFOW* pending_socket_info;
#define UV_PIPE_PRIVATE_FIELDS \
HANDLE handle; \

View File

@ -66,8 +66,6 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_TTY_RAW 0x80000
#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000
#define UV_HANDLE_EMULATE_IOCP 0x200000
#define UV_HANDLE_DUPLICATED_SOCKET 0x400000
#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000
void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);

View File

@ -77,7 +77,9 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->name = NULL;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_ipc_sockets = NULL;
handle->pending_socket_info = NULL;
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
if (ipc) {
handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL;
@ -196,7 +198,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
uv_duplicate_socket_info_t* socket_info, *next_socket_info;
if (handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
@ -256,11 +257,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;
if (handle->flags & UV_HANDLE_CONNECTION) {
next_socket_info = handle->pending_ipc_sockets;
while (next_socket_info) {
socket_info = next_socket_info;
next_socket_info = next_socket_info->next;
free(socket_info);
if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
}
}
@ -582,24 +581,18 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
int r;
uv_loop_t* loop = server->loop;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
uv_duplicate_socket_info_t* pending_socket;
if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
pending_socket = server->pending_ipc_sockets;
if (!pending_socket) {
if (!server->pending_socket_info) {
/* No valid pending sockets. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}
server->pending_ipc_sockets = pending_socket->next;
r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info);
free(pending_socket);
return r;
return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info);
} else {
pipe_client = (uv_pipe_t*)client;
@ -754,7 +747,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_stream_t* send_handle, uv_write_cb cb) {
int result;
uv_tcp_t* tcp_send_handle;
uv_req_t* ipc_header_req;
uv_write_t* ipc_header_req;
DWORD written;
uv_ipc_frame_uv_stream ipc_frame;
@ -784,6 +777,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
req->type = UV_WRITE;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->ipc_header = 0;
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
@ -809,16 +803,26 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
* the first write, and then use the provided req for the second write.
*/
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
ipc_header_req = (uv_req_t*)req;
ipc_header_req = req;
} else {
ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req;
/* Initialize the req if needed. */
/*
* Try to use the preallocated write req if it's available.
* Otherwise allocate a new one.
*/
if (handle->ipc_header_write_req.type != UV_WRITE) {
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
handle->ipc_header_write_req.type = UV_WRITE;
handle->ipc_header_write_req.handle = (uv_stream_t*) handle;
handle->ipc_header_write_req.cb = NULL;
ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req;
} else {
ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t));
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
}
uv_req_init(loop, (uv_req_t*) ipc_header_req);
ipc_header_req->type = UV_WRITE;
ipc_header_req->handle = (uv_stream_t*) handle;
ipc_header_req->cb = NULL;
ipc_header_req->ipc_header = 1;
}
/* Write the header or the whole frame. */
@ -947,7 +951,6 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
DWORD bytes, avail;
uv_buf_t buf;
uv_ipc_frame_uv_stream ipc_frame;
uv_duplicate_socket_info_t* pending_ipc_socket;
assert(handle->type == UV_NAMED_PIPE);
@ -1015,16 +1018,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
/* Insert a new pending socket entry. */
pending_ipc_socket =
(uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket));
if (!pending_ipc_socket) {
/* Store the pending socket info. */
assert(!handle->pending_socket_info);
handle->pending_socket_info =
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info)));
if (!handle->pending_socket_info) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
pending_ipc_socket->socket_info = ipc_frame.socket_info;
pending_ipc_socket->next = handle->pending_ipc_sockets;
handle->pending_ipc_sockets = pending_ipc_socket;
*(handle->pending_socket_info) = ipc_frame.socket_info;
}
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
@ -1052,10 +1054,15 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE);
handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
if (handle->pending_socket_info) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
@ -1087,12 +1094,20 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
if (!REQ_SUCCESS(req)) {
uv__set_sys_error(loop, GET_REQ_ERROR(req));
((uv_write_cb)req->cb)(req, -1);
if (req->ipc_header) {
if (req == &handle->ipc_header_write_req) {
req->type = UV_UNKNOWN_REQ;
} else {
((uv_write_cb)req->cb)(req, 0);
free(req);
}
} else {
if (req->cb) {
if (!REQ_SUCCESS(req)) {
uv__set_sys_error(loop, GET_REQ_ERROR(req));
((uv_write_cb)req->cb)(req, -1);
} else {
((uv_write_cb)req->cb)(req, 0);
}
}
}

View File

@ -103,6 +103,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
handle->func_acceptex = NULL;
handle->func_connectex = NULL;
loop->counters.tcp_init++;
@ -410,12 +412,11 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_acceptex) {
if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
if (listen(handle->socket, backlog) == SOCKET_ERROR) {
@ -548,12 +549,11 @@ int uv__tcp_connect(uv_connect_t* req,
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_connectex) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
uv_req_init(loop, (uv_req_t*) req);
@ -609,12 +609,11 @@ int uv__tcp_connect6(uv_connect_t* req,
uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if (!handle->func_connectex) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
uv_req_init(loop, (uv_req_t*) req);
@ -952,7 +951,6 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
}
tcp->flags |= UV_HANDLE_BOUND;
tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET;
return uv_tcp_set_socket(tcp->loop, tcp, socket, 1);
}