ipc on windows

This commit is contained in:
Igor Zinkovsky 2011-09-29 17:58:58 -07:00 committed by Ryan Dahl
parent 61fab8d1ba
commit 81c4043c83
14 changed files with 766 additions and 294 deletions

View File

@ -43,6 +43,11 @@ typedef struct uv_buf_t {
char* base;
} uv_buf_t;
typedef struct uv_duplicate_socket_info_s {
WSAPROTOCOL_INFOW socket_info;
struct uv_duplicate_socket_info_s* next;
} uv_duplicate_socket_info_t;
typedef int uv_file;
RB_HEAD(uv_timer_tree_s, uv_timer_s);
@ -120,6 +125,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
UV_REQ_FIELDS \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
HANDLE event_handle; \
HANDLE wait_handle; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;
@ -140,10 +147,12 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
#define uv_tcp_server_fields \
uv_tcp_accept_t* accept_reqs; \
uv_tcp_accept_t* pending_accepts;
uv_tcp_accept_t* pending_accepts; \
LPFN_ACCEPTEX func_acceptex;
#define uv_tcp_connection_fields \
uv_buf_t read_buffer;
uv_buf_t read_buffer; \
LPFN_CONNECTEX func_connectex;
#define UV_TCP_PRIVATE_FIELDS \
SOCKET socket; \
@ -164,11 +173,15 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_alloc_cb alloc_cb;
#define uv_pipe_server_fields \
uv_pipe_accept_t accept_reqs[4]; \
uv_pipe_accept_t* pending_accepts;
uv_pipe_accept_t accept_reqs[4]; \
uv_pipe_accept_t* pending_accepts;
#define uv_pipe_connection_fields \
uv_timer_t* eof_timer;
uv_timer_t* eof_timer; \
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
uv_duplicate_socket_info_t* pending_ipc_sockets;
#define UV_PIPE_PRIVATE_FIELDS \
HANDLE handle; \

View File

@ -41,6 +41,12 @@ extern "C" {
typedef intptr_t ssize_t;
#endif
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "uv-private/uv-unix.h"
#else
# include "uv-private/uv-win.h"
#endif
/* Expand this list if necessary. */
typedef enum {
UV_UNKNOWN = -1,
@ -153,12 +159,6 @@ typedef struct uv_fs_s uv_fs_t;
typedef struct uv_fs_event_s uv_fs_event_t;
typedef struct uv_work_s uv_work_t;
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "uv-private/uv-unix.h"
#else
# include "uv-private/uv-win.h"
#endif
/*
* This function must be called before any other functions in libuv.
@ -392,13 +392,6 @@ int uv_read_stop(uv_stream_t*);
*/
int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb);
typedef enum {
UV_STDIN = 0,
UV_STDOUT,
UV_STDERR
} uv_std_type;
uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type);
/*
* Write data to stream. Buffers are written in order. Example:

View File

@ -790,10 +790,3 @@ size_t uv__strlcpy(char* dst, const char* src, size_t size) {
return src - org;
}
uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) {
assert(0 && "implement me");
return NULL;
}

View File

@ -44,27 +44,30 @@ void uv_process_timers(uv_loop_t* loop);
*/
/* Private uv_handle flags */
#define UV_HANDLE_CLOSING 0x0001
#define UV_HANDLE_CLOSED 0x0002
#define UV_HANDLE_BOUND 0x0004
#define UV_HANDLE_LISTENING 0x0008
#define UV_HANDLE_CONNECTION 0x0010
#define UV_HANDLE_CONNECTED 0x0020
#define UV_HANDLE_READING 0x0040
#define UV_HANDLE_ACTIVE 0x0040
#define UV_HANDLE_EOF 0x0080
#define UV_HANDLE_SHUTTING 0x0100
#define UV_HANDLE_SHUT 0x0200
#define UV_HANDLE_ENDGAME_QUEUED 0x0400
#define UV_HANDLE_BIND_ERROR 0x1000
#define UV_HANDLE_IPV6 0x2000
#define UV_HANDLE_PIPESERVER 0x4000
#define UV_HANDLE_READ_PENDING 0x8000
#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000
#define UV_HANDLE_UV_ALLOCED 0x20000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000
#define UV_HANDLE_ZERO_READ 0x80000
#define UV_HANDLE_TTY_RAW 0x100000
#define UV_HANDLE_CLOSING 0x0001
#define UV_HANDLE_CLOSED 0x0002
#define UV_HANDLE_BOUND 0x0004
#define UV_HANDLE_LISTENING 0x0008
#define UV_HANDLE_CONNECTION 0x0010
#define UV_HANDLE_CONNECTED 0x0020
#define UV_HANDLE_READING 0x0040
#define UV_HANDLE_ACTIVE 0x0040
#define UV_HANDLE_EOF 0x0080
#define UV_HANDLE_SHUTTING 0x0100
#define UV_HANDLE_SHUT 0x0200
#define UV_HANDLE_ENDGAME_QUEUED 0x0400
#define UV_HANDLE_BIND_ERROR 0x1000
#define UV_HANDLE_IPV6 0x2000
#define UV_HANDLE_PIPESERVER 0x4000
#define UV_HANDLE_READ_PENDING 0x8000
#define UV_HANDLE_UV_ALLOCED 0x10000
#define UV_HANDLE_SYNC_BYPASS_IOCP 0x20000
#define UV_HANDLE_ZERO_READ 0x40000
#define UV_HANDLE_TTY_RAW 0x80000
#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000
#define UV_HANDLE_EMULATE_IOCP 0x200000
#define UV_HANDLE_DUPLICATED_SOCKET 0x400000
#define UV_HANDLE_WINSOCK_EXT_INIT 0x800000
void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);
@ -97,8 +100,8 @@ uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped);
void uv_insert_pending_req(uv_loop_t* loop, uv_req_t* req);
void uv_process_reqs(uv_loop_t* loop);
#define POST_COMPLETION_FOR_REQ(loop, req) \
if (!PostQueuedCompletionStatus((loop)->iocp, \
#define POST_COMPLETION_FOR_REQ(loop, req) \
if (!PostQueuedCompletionStatus((loop)->iocp, \
0, \
0, \
&((req)->overlapped))) { \
@ -135,6 +138,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle);
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info);
/*
* UDP
@ -149,19 +154,21 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle);
/*
* Pipes
*/
int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle,
HANDLE pipeHandle);
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize);
void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err);
void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle);
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client);
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client);
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb);
int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb);
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_write_cb cb);
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb);
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req);
@ -267,6 +274,10 @@ void uv_fs_event_close(uv_loop_t* loop, uv_fs_event_t* handle);
void uv_fs_event_endgame(uv_loop_t* loop, uv_fs_event_t* handle);
/* Utils */
int uv_parent_pid();
/*
* Error handling
*/

View File

@ -20,6 +20,7 @@
*/
#include <assert.h>
#include <io.h>
#include <string.h>
#include <stdio.h>
@ -38,6 +39,22 @@ static const uv_buf_t uv_null_buf_ = { 0, NULL };
/* when the local ends wants to shut it down. */
static const int64_t eof_timeout = 50; /* ms */
/* IPC protocol flags. */
#define UV_IPC_RAW_DATA 0x0001
#define UV_IPC_UV_STREAM 0x0002
/* IPC frame header. */
typedef struct {
int flags;
uint64_t raw_data_length;
} uv_ipc_frame_header_t;
/* IPC frame, which contains an imported TCP socket stream. */
typedef struct {
uv_ipc_frame_header_t header;
WSAPROTOCOL_INFOW socket_info;
} uv_ipc_frame_uv_stream;
static void eof_timer_init(uv_pipe_t* pipe);
static void eof_timer_start(uv_pipe_t* pipe);
static void eof_timer_stop(uv_pipe_t* pipe);
@ -58,7 +75,13 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->reqs_pending = 0;
handle->handle = INVALID_HANDLE_VALUE;
handle->name = NULL;
handle->ipc = ipc;
handle->ipc_pid = 0;
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_ipc_sockets = NULL;
if (ipc) {
handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL;
}
loop->counters.pipe_init++;
@ -66,24 +89,6 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
}
int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle,
HANDLE pipeHandle) {
int err = uv_pipe_init(loop, handle);
if (!err) {
/*
* At this point we don't know whether the pipe will be used as a client
* or a server. So, we assume that it will be a client until
* uv_listen is called.
*/
handle->handle = pipeHandle;
handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE;
}
return err;
}
static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->eof_timer = NULL;
@ -132,7 +137,6 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
uv_pipe_connection_init(handle);
handle->handle = pipeHandle;
handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE;
err = 0;
done:
@ -192,7 +196,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_PIPE_LOCAL_INFORMATION pipe_info;
uv_duplicate_socket_info_t* socket_info, *next_socket_info;
if (handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
@ -251,6 +255,15 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
if (handle->flags & UV_HANDLE_CONNECTION) {
next_socket_info = handle->pending_ipc_sockets;
while (next_socket_info) {
socket_info = next_socket_info;
next_socket_info = next_socket_info->next;
free(socket_info);
}
}
/* Remember the state of this flag because the close callback is */
/* allowed to clobber or free the handle's memory */
uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED;
@ -568,30 +581,50 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle,
}
int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
int r;
uv_loop_t* loop = server->loop;
/* Find a connection instance that has been connected, but not yet */
/* accepted. */
uv_pipe_accept_t* req = server->pending_accepts;
uv_pipe_t* pipe_client;
uv_pipe_accept_t* req;
uv_duplicate_socket_info_t* pending_socket;
if (!req) {
/* No valid connections found, so we error out. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}
if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
pending_socket = server->pending_ipc_sockets;
if (!pending_socket) {
/* No valid pending sockets. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}
/* Initialize the client handle and copy the pipeHandle to the client */
uv_pipe_connection_init(client);
client->handle = req->pipeHandle;
server->pending_ipc_sockets = pending_socket->next;
r = uv_tcp_import((uv_tcp_t*)client, &pending_socket->socket_info);
free(pending_socket);
return r;
} else {
pipe_client = (uv_pipe_t*)client;
/* Prepare the req to pick up a new connection */
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->pipeHandle = INVALID_HANDLE_VALUE;
/* Find a connection instance that has been connected, but not yet */
/* accepted. */
req = server->pending_accepts;
if (!(server->flags & UV_HANDLE_CLOSING) &&
!(server->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
uv_pipe_queue_accept(loop, server, req, FALSE);
if (!req) {
/* No valid connections found, so we error out. */
uv__set_sys_error(loop, WSAEWOULDBLOCK);
return -1;
}
/* Initialize the client handle and copy the pipeHandle to the client */
uv_pipe_connection_init(pipe_client);
pipe_client->handle = req->pipeHandle;
/* Prepare the req to pick up a new connection */
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->pipeHandle = INVALID_HANDLE_VALUE;
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, server, req, FALSE);
}
}
return 0;
@ -603,11 +636,8 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
uv_loop_t* loop = handle->loop;
int i, errno;
uv_pipe_accept_t* req;
HANDLE pipeHandle;
if (!(handle->flags & UV_HANDLE_BOUND) &&
!(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
if (!(handle->flags & UV_HANDLE_BOUND)) {
uv__set_artificial_error(loop, UV_EINVAL);
return -1;
}
@ -618,8 +648,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
return -1;
}
if (!(handle->flags & UV_HANDLE_PIPESERVER) &&
!(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
if (!(handle->flags & UV_HANDLE_PIPESERVER)) {
uv__set_artificial_error(loop, UV_ENOTSUP);
return -1;
}
@ -627,30 +656,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
if (handle->flags & UV_HANDLE_GIVEN_OS_HANDLE) {
handle->flags |= UV_HANDLE_PIPESERVER;
pipeHandle = handle->handle;
assert(pipeHandle != INVALID_HANDLE_VALUE);
req = &handle->accept_reqs[0];
uv_req_init(loop, (uv_req_t*) req);
req->pipeHandle = pipeHandle;
req->type = UV_ACCEPT;
req->data = handle;
req->next_pending = NULL;
/* First pipe handle should have already been created in uv_pipe_bind */
assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
if (uv_set_pipe_handle(loop, handle, pipeHandle)) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
uv_pipe_queue_accept(loop, handle, req, TRUE);
} else {
/* First pipe handle should have already been created in uv_pipe_bind */
assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE);
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
}
for (i = 0; i < COUNTOF(handle->accept_reqs); i++) {
uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0);
}
return 0;
@ -694,8 +704,8 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
}
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb, uv_read2_cb read2_cb) {
uv_loop_t* loop = handle->loop;
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
@ -715,9 +725,10 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
handle->flags |= UV_HANDLE_READING;
handle->read_cb = read_cb;
handle->read2_cb = read2_cb;
handle->alloc_cb = alloc_cb;
/* If reading was stopped and then started again, there could stell be a */
/* If reading was stopped and then started again, there could still be a */
/* read request pending. */
if (!(handle->flags & UV_HANDLE_READ_PENDING))
uv_pipe_queue_read(loop, handle);
@ -726,11 +737,33 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
}
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
int result;
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
}
if (bufcnt != 1) {
int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
}
static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
int result;
uv_tcp_t* tcp_send_handle;
uv_req_t* ipc_header_req;
DWORD written;
uv_ipc_frame_uv_stream ipc_frame;
if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) {
uv__set_artificial_error(loop, UV_ENOTSUP);
return -1;
}
if (send_handle && send_handle->type != UV_TCP) {
uv__set_artificial_error(loop, UV_ENOTSUP);
return -1;
}
@ -753,6 +786,73 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
/* Use the IPC framing protocol. */
if (send_handle) {
tcp_send_handle = (uv_tcp_t*)send_handle;
if (WSADuplicateSocketW(tcp_send_handle->socket, handle->ipc_pid,
&ipc_frame.socket_info)) {
uv__set_sys_error(loop, WSAGetLastError());
return -1;
}
ipc_frame.header.flags |= UV_IPC_UV_STREAM;
}
if (bufcnt == 1) {
ipc_frame.header.flags |= UV_IPC_RAW_DATA;
ipc_frame.header.raw_data_length = bufs[0].len;
}
/*
* Use the provided req if we're only doing a single write.
* If we're doing multiple writes, use ipc_header_write_req to do
* the first write, and then use the provided req for the second write.
*/
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
ipc_header_req = (uv_req_t*)req;
} else {
ipc_header_req = (uv_req_t*)&handle->ipc_header_write_req;
/* Initialize the req if needed. */
if (handle->ipc_header_write_req.type != UV_WRITE) {
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
handle->ipc_header_write_req.type = UV_WRITE;
handle->ipc_header_write_req.handle = (uv_stream_t*) handle;
handle->ipc_header_write_req.cb = NULL;
}
}
/* Write the header or the whole frame. */
memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped));
result = WriteFile(handle->handle,
&ipc_frame,
ipc_frame.header.flags & UV_IPC_UV_STREAM ?
sizeof(ipc_frame) : sizeof(ipc_frame.header),
&written,
&ipc_header_req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->queued_bytes = written;
handle->write_queue_size += req->queued_bytes;
}
handle->reqs_pending++;
handle->write_reqs_pending++;
/* If we don't have any raw data to write - we're done. */
if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) {
return 0;
}
}
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
@ -780,6 +880,23 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
}
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_write_cb cb) {
return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb);
}
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) {
if (!(handle->flags & UV_HANDLE_USE_IPC_PROTOCOL)) {
uv__set_artificial_error(loop, UV_EINVAL);
return -1;
}
return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb);
}
static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_buf_t buf) {
/* If there is an eof timer running, we don't need it any more, */
@ -790,7 +907,11 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
uv_read_stop((uv_stream_t*) handle);
uv__set_artificial_error(loop, UV_EOF);
handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
if (handle->read2_cb) {
handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE);
} else {
handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_);
}
}
@ -803,7 +924,11 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
uv_read_stop((uv_stream_t*) handle);
uv__set_sys_error(loop, error);
handle->read_cb((uv_stream_t*)handle, -1, buf);
if (handle->read2_cb) {
handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE);
} else {
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
}
@ -821,6 +946,8 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
uv_req_t* req) {
DWORD bytes, avail;
uv_buf_t buf;
uv_ipc_frame_uv_stream ipc_frame;
uv_duplicate_socket_info_t* pending_ipc_socket;
assert(handle->type == UV_NAMED_PIPE);
@ -839,11 +966,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
/* Do non-blocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
if (!PeekNamedPipe(handle->handle,
NULL,
0,
NULL,
&avail,
NULL)) {
NULL,
0,
NULL,
&avail,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_);
break;
}
@ -853,6 +980,63 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
break;
}
if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
/* Use the IPC framing protocol to read the incoming data. */
if (handle->remaining_ipc_rawdata_bytes == 0) {
/* We're reading a new frame. First, read the header. */
assert(avail >= sizeof(ipc_frame.header));
if (!ReadFile(handle->handle,
&ipc_frame.header,
sizeof(ipc_frame.header),
&bytes,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
uv_null_buf_);
break;
}
assert(bytes == sizeof(ipc_frame.header));
if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
assert(avail - sizeof(ipc_frame.header) >=
sizeof(ipc_frame.socket_info));
/* Read the TCP socket info. */
if (!ReadFile(handle->handle,
&ipc_frame.socket_info,
sizeof(ipc_frame) - sizeof(ipc_frame.header),
&bytes,
NULL)) {
uv_pipe_read_error_or_eof(loop, handle, GetLastError(),
uv_null_buf_);
break;
}
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
/* Insert a new pending socket entry. */
pending_ipc_socket =
(uv_duplicate_socket_info_t*)malloc(sizeof(*pending_ipc_socket));
if (!pending_ipc_socket) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
pending_ipc_socket->socket_info = ipc_frame.socket_info;
pending_ipc_socket->next = handle->pending_ipc_sockets;
handle->pending_ipc_sockets = pending_ipc_socket;
}
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
handle->remaining_ipc_rawdata_bytes =
ipc_frame.header.raw_data_length;
continue;
}
} else {
avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes);
}
}
buf = handle->alloc_cb((uv_handle_t*) handle, avail);
assert(buf.len > 0);
@ -862,7 +1046,20 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
&bytes,
NULL)) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, buf);
if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
assert(handle->remaining_ipc_rawdata_bytes >= bytes);
handle->remaining_ipc_rawdata_bytes =
handle->remaining_ipc_rawdata_bytes - bytes;
if (handle->read2_cb) {
handle->read2_cb(handle, bytes, buf,
handle->pending_ipc_sockets ? UV_TCP : UV_UNKNOWN_HANDLE);
} else if (handle->read_cb) {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
} else {
handle->read_cb((uv_stream_t*)handle, bytes, buf);
}
/* Read again only if bytes == buf.len */
if (bytes <= buf.len) {
break;
@ -928,8 +1125,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle,
CloseHandle(req->pipeHandle);
req->pipeHandle = INVALID_HANDLE_VALUE;
}
if (!(handle->flags & UV_HANDLE_CLOSING) &&
!(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) {
if (!(handle->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(loop, handle, req, FALSE);
}
}
@ -1066,6 +1262,21 @@ static void eof_timer_close_cb(uv_handle_t* handle) {
void uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
assert(0 && "implement me");
}
HANDLE os_handle;
/* Special-case stdin with ipc. */
if (file == 0 && pipe->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
os_handle = (HANDLE)_get_osfhandle(file);
if (os_handle == INVALID_HANDLE_VALUE ||
uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) {
return;
}
uv_pipe_connection_init(pipe);
pipe->ipc_pid = uv_parent_pid();
assert(pipe->ipc_pid != -1);
pipe->handle = os_handle;
}
}

View File

@ -45,7 +45,7 @@ typedef struct env_var {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); \
} \
if (!uv_utf8_to_utf16(s, t, size / sizeof(wchar_t))) { \
uv__set_sys_error(loop, GetLastError()); \
uv__set_sys_error(loop, GetLastError()); \
err = -1; \
goto done; \
}
@ -739,7 +739,8 @@ void uv_process_close(uv_loop_t* loop, uv_process_t* handle) {
static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe,
HANDLE* child_pipe, DWORD server_access, DWORD child_access) {
HANDLE* child_pipe, DWORD server_access, DWORD child_access,
int overlapped) {
int err;
SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE };
char pipe_name[64];
@ -767,7 +768,7 @@ static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe,
0,
&sa,
OPEN_EXISTING,
0,
overlapped ? FILE_FLAG_OVERLAPPED : 0,
NULL);
if (*child_pipe == INVALID_HANDLE_VALUE) {
@ -848,7 +849,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
wchar_t* path = NULL;
int size;
BOOL result;
wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, *env = NULL, *cwd = NULL;
wchar_t* application_path = NULL, *application = NULL, *arguments = NULL,
*env = NULL, *cwd = NULL;
HANDLE* child_stdio = process->child_stdio;
STARTUPINFOW startup;
PROCESS_INFORMATION info;
@ -904,12 +906,23 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
/* Create stdio pipes. */
if (options.stdin_stream) {
err = uv_create_stdio_pipe_pair(
loop,
options.stdin_stream,
&child_stdio[0],
PIPE_ACCESS_OUTBOUND,
GENERIC_READ | FILE_WRITE_ATTRIBUTES);
if (options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
err = uv_create_stdio_pipe_pair(
loop,
options.stdin_stream,
&child_stdio[0],
PIPE_ACCESS_DUPLEX,
GENERIC_READ | FILE_WRITE_ATTRIBUTES | GENERIC_WRITE,
1);
} else {
err = uv_create_stdio_pipe_pair(
loop,
options.stdin_stream,
&child_stdio[0],
PIPE_ACCESS_OUTBOUND,
GENERIC_READ | FILE_WRITE_ATTRIBUTES,
0);
}
} else {
err = duplicate_std_handle(loop, STD_INPUT_HANDLE, &child_stdio[0]);
}
@ -922,7 +935,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
loop, options.stdout_stream,
&child_stdio[1],
PIPE_ACCESS_INBOUND,
GENERIC_WRITE);
GENERIC_WRITE,
0);
} else {
err = duplicate_std_handle(loop, STD_OUTPUT_HANDLE, &child_stdio[1]);
}
@ -936,7 +950,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
options.stderr_stream,
&child_stdio[2],
PIPE_ACCESS_INBOUND,
GENERIC_WRITE);
GENERIC_WRITE,
0);
} else {
err = duplicate_std_handle(loop, STD_ERROR_HANDLE, &child_stdio[2]);
}
@ -969,6 +984,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process,
process->process_handle = info.hProcess;
process->pid = info.dwProcessId;
if (options.stdin_stream &&
options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) {
options.stdin_stream->ipc_pid = info.dwProcessId;
}
/* Setup notifications for when the child process exits. */
result = RegisterWaitForSingleObject(&process->wait_handle,
process->process_handle, exit_wait_callback, (void*)process, INFINITE,

View File

@ -62,13 +62,11 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
assert(client->type == server->type);
switch (server->type) {
case UV_TCP:
return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client);
case UV_NAMED_PIPE:
return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client);
return uv_pipe_accept((uv_pipe_t*)server, client);
default:
assert(0);
return -1;
@ -92,6 +90,18 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
}
int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
switch (handle->type) {
case UV_NAMED_PIPE:
return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb);
default:
assert(0);
return -1;
}
}
int uv_read_stop(uv_stream_t* handle) {
if (handle->type == UV_TTY) {
return uv_tty_read_stop((uv_tty_t*) handle);
@ -121,6 +131,21 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
}
int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
uv_loop_t* loop = handle->loop;
switch (handle->type) {
case UV_NAMED_PIPE:
return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb);
default:
assert(0);
uv__set_sys_error(loop, WSAEINVAL);
return -1;
}
}
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
uv_loop_t* loop = handle->loop;

View File

@ -47,7 +47,7 @@ static unsigned int active_tcp_streams = 0;
static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle,
SOCKET socket) {
SOCKET socket, int imported) {
DWORD yes = 1;
assert(handle->socket == INVALID_SOCKET);
@ -70,8 +70,12 @@ static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle,
loop->iocp,
(ULONG_PTR)socket,
0) == NULL) {
uv__set_sys_error(loop, GetLastError());
return -1;
if (imported) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
} else {
uv__set_sys_error(loop, GetLastError());
return -1;
}
}
if (pSetFileCompletionNotificationModes) {
@ -109,6 +113,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
int status;
int sys_error;
unsigned int i;
uv_tcp_accept_t* req;
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_SHUTTING &&
@ -139,6 +145,20 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) {
handle->flags |= UV_HANDLE_CLOSED;
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
req = &handle->accept_reqs[i];
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
}
free(handle->accept_reqs);
handle->accept_reqs = NULL;
}
@ -169,7 +189,7 @@ static int uv__bind(uv_tcp_t* handle,
return -1;
}
if (uv_tcp_set_socket(handle->loop, handle, sock) == -1) {
if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) {
closesocket(sock);
return -1;
}
@ -218,24 +238,40 @@ int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
static void CALLBACK post_completion(void* context, BOOLEAN timed_out) {
uv_tcp_accept_t* req;
uv_tcp_t* handle;
req = (uv_tcp_accept_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
uv_loop_t* loop = handle->loop;
BOOL success;
DWORD bytes;
SOCKET accept_socket;
short family;
LPFN_ACCEPTEX pAcceptExFamily;
assert(handle->flags & UV_HANDLE_LISTENING);
assert(req->accept_socket == INVALID_SOCKET);
/* choose family and extension function */
if ((handle->flags & UV_HANDLE_IPV6) != 0) {
if (handle->flags & UV_HANDLE_IPV6) {
family = AF_INET6;
pAcceptExFamily = pAcceptEx6;
} else {
family = AF_INET;
pAcceptExFamily = pAcceptEx;
}
/* Open a socket for the accepted connection. */
@ -249,15 +285,18 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
}
success = pAcceptExFamily(handle->socket,
accept_socket,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&req->overlapped);
success = handle->func_acceptex(handle->socket,
accept_socket,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
@ -268,6 +307,15 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
/* The req will be processed with IOCP. */
req->accept_socket = accept_socket;
handle->reqs_pending++;
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
return;
}
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
@ -275,6 +323,11 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
/* Destroy the event handle */
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
CloseHandle(req->overlapped.hEvent);
req->event_handle = NULL;
}
}
}
@ -357,6 +410,14 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
if (listen(handle->socket, backlog) == SOCKET_ERROR) {
uv__set_sys_error(loop, WSAGetLastError());
return -1;
@ -378,6 +439,17 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
req->wait_handle = INVALID_HANDLE_VALUE;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
} else {
req->event_handle = NULL;
}
uv_tcp_queue_accept(handle, req);
}
@ -402,7 +474,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
return -1;
}
if (uv_tcp_set_socket(client->loop, client, req->accept_socket) == -1) {
if (uv_tcp_set_socket(client->loop, client, req->accept_socket, 0) == -1) {
closesocket(req->accept_socket);
rv = -1;
} else {
@ -476,19 +548,27 @@ int uv__tcp_connect(uv_connect_t* req,
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
success = pConnectEx(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
success = handle->func_connectex(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
@ -529,19 +609,27 @@ int uv__tcp_connect6(uv_connect_t* req,
uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0)
return -1;
if (!(handle->flags & UV_HANDLE_WINSOCK_EXT_INIT)) {
if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) {
uv__set_sys_error(loop, WSAEAFNOSUPPORT);
return -1;
}
handle->flags |= UV_HANDLE_WINSOCK_EXT_INIT;
}
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->overlapped, 0, sizeof(req->overlapped));
success = pConnectEx6(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
success = handle->func_connectex(handle->socket,
(struct sockaddr*) &address,
addrsize,
NULL,
0,
&bytes,
&req->overlapped);
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
handle->reqs_pending++;
@ -848,3 +936,23 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
DECREASE_PENDING_REQ_COUNT(handle);
}
int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) {
SOCKET socket = WSASocketW(AF_INET,
SOCK_STREAM,
IPPROTO_IP,
socket_protocol_info,
0,
WSA_FLAG_OVERLAPPED);
if (socket == INVALID_SOCKET) {
uv__set_sys_error(tcp->loop, WSAGetLastError());
return -1;
}
tcp->flags |= UV_HANDLE_BOUND;
tcp->flags |= UV_HANDLE_DUPLICATED_SOCKET;
return uv_tcp_set_socket(tcp->loop, tcp, socket, 1);
}

View File

@ -25,6 +25,7 @@
#include "uv.h"
#include "internal.h"
#include "Tlhelp32.h"
int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size,
@ -95,11 +96,13 @@ done:
return retVal;
}
void uv_loadavg(double avg[3]) {
/* Can't be implemented */
avg[0] = avg[1] = avg[2] = 0;
}
double uv_get_free_memory(void) {
MEMORYSTATUSEX memory_status;
memory_status.dwLength = sizeof(memory_status);
@ -112,6 +115,7 @@ double uv_get_free_memory(void) {
return (double)memory_status.ullAvailPhys;
}
double uv_get_total_memory(void) {
MEMORYSTATUSEX memory_status;
memory_status.dwLength = sizeof(memory_status);
@ -123,3 +127,26 @@ double uv_get_total_memory(void) {
return (double)memory_status.ullTotalPhys;
}
int uv_parent_pid() {
int parent_pid = -1;
HANDLE handle;
PROCESSENTRY32 pe;
int current_pid = GetCurrentProcessId();
pe.dwSize = sizeof(PROCESSENTRY32);
handle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
if (Process32First(handle, &pe)) {
do {
if (pe.th32ProcessID == current_pid) {
parent_pid = pe.th32ParentProcessID;
break;
}
} while( Process32Next(handle, &pe));
}
CloseHandle(handle);
return parent_pid;
}

View File

@ -25,21 +25,6 @@
#include "../uv-common.h"
#include "internal.h"
/* Winsock extension functions (ipv4) */
LPFN_CONNECTEX pConnectEx;
LPFN_ACCEPTEX pAcceptEx;
LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs;
LPFN_DISCONNECTEX pDisconnectEx;
LPFN_TRANSMITFILE pTransmitFile;
/* Winsock extension functions (ipv6) */
LPFN_CONNECTEX pConnectEx6;
LPFN_ACCEPTEX pAcceptEx6;
LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6;
LPFN_DISCONNECTEX pDisconnectEx6;
LPFN_TRANSMITFILE pTransmitFile6;
/* Whether ipv6 is supported */
int uv_allow_ipv6;
@ -74,6 +59,18 @@ static BOOL uv_get_extension_function(SOCKET socket, GUID guid,
}
BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target) {
const GUID wsaid_acceptex = WSAID_ACCEPTEX;
return uv_get_extension_function(socket, wsaid_acceptex, (void**)target);
}
BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target) {
const GUID wsaid_connectex = WSAID_CONNECTEX;
return uv_get_extension_function(socket, wsaid_connectex, (void**)target);
}
void uv_winsock_init() {
const GUID wsaid_connectex = WSAID_CONNECTEX;
const GUID wsaid_acceptex = WSAID_ACCEPTEX;
@ -83,7 +80,6 @@ void uv_winsock_init() {
WSADATA wsa_data;
int errorno;
SOCKET dummy;
SOCKET dummy6;
/* Initialize winsock */
@ -96,58 +92,10 @@ void uv_winsock_init() {
uv_addr_ip4_any_ = uv_ip4_addr("0.0.0.0", 0);
uv_addr_ip6_any_ = uv_ip6_addr("::", 0);
/* Retrieve the needed winsock extension function pointers. */
dummy = socket(AF_INET, SOCK_STREAM, IPPROTO_IP);
if (dummy == INVALID_SOCKET) {
uv_fatal_error(WSAGetLastError(), "socket");
}
if (!uv_get_extension_function(dummy,
wsaid_connectex,
(void**)&pConnectEx) ||
!uv_get_extension_function(dummy,
wsaid_acceptex,
(void**)&pAcceptEx) ||
!uv_get_extension_function(dummy,
wsaid_getacceptexsockaddrs,
(void**)&pGetAcceptExSockAddrs) ||
!uv_get_extension_function(dummy,
wsaid_disconnectex,
(void**)&pDisconnectEx) ||
!uv_get_extension_function(dummy,
wsaid_transmitfile,
(void**)&pTransmitFile)) {
uv_fatal_error(WSAGetLastError(),
"WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)");
}
if (closesocket(dummy) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}
/* optional IPv6 versions of winsock extension functions */
/* Detect IPV6 support */
dummy6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_IP);
if (dummy6 != INVALID_SOCKET) {
uv_allow_ipv6 = TRUE;
if (!uv_get_extension_function(dummy6,
wsaid_connectex,
(void**)&pConnectEx6) ||
!uv_get_extension_function(dummy6,
wsaid_acceptex,
(void**)&pAcceptEx6) ||
!uv_get_extension_function(dummy6,
wsaid_getacceptexsockaddrs,
(void**)&pGetAcceptExSockAddrs6) ||
!uv_get_extension_function(dummy6,
wsaid_disconnectex,
(void**)&pDisconnectEx6) ||
!uv_get_extension_function(dummy6,
wsaid_transmitfile,
(void**)&pTransmitFile6)) {
uv_allow_ipv6 = FALSE;
}
if (closesocket(dummy6) == SOCKET_ERROR) {
uv_fatal_error(WSAGetLastError(), "closesocket");
}

View File

@ -109,24 +109,12 @@
#define IPV6_V6ONLY 27
#endif
/* Winsock extension functions (ipv4) */
extern LPFN_CONNECTEX pConnectEx;
extern LPFN_ACCEPTEX pAcceptEx;
extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs;
extern LPFN_DISCONNECTEX pDisconnectEx;
extern LPFN_TRANSMITFILE pTransmitFile;
/* Winsock extension functions (ipv6) */
extern LPFN_CONNECTEX pConnectEx6;
extern LPFN_ACCEPTEX pAcceptEx6;
extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6;
extern LPFN_DISCONNECTEX pDisconnectEx6;
extern LPFN_TRANSMITFILE pTransmitFile6;
/* Whether ipv6 is supported */
extern int uv_allow_ipv6;
BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target);
BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target);
/* Ip address used to bind to any port at any interface */
extern struct sockaddr_in uv_addr_ip4_any_;
extern struct sockaddr_in6 uv_addr_ip6_any_;

View File

@ -22,9 +22,9 @@
#include <stdio.h>
#include <string.h>
#include "uv.h"
#include "runner.h"
#include "task.h"
#include "uv.h"
/* Actual tests and helpers are defined in test-list.h */
#include "test-list.h"
@ -49,10 +49,61 @@ int main(int argc, char **argv) {
}
static uv_tcp_t server;
static uv_pipe_t channel;
static uv_tcp_t tcp_server;
static uv_write_t conn_notify_req;
static int close_cb_called;
static int connection_accepted;
static void close_cb(uv_handle_t* handle) {
close_cb_called++;
}
static void close_conn_cb(uv_handle_t* handle) {
free(handle);
close_cb_called++;
}
void conn_notify_write_cb(uv_write_t* req, int status) {
uv_close((uv_handle_t*)&tcp_server, close_cb);
uv_close((uv_handle_t*)&channel, close_cb);
}
static void ipc_on_connection(uv_stream_t* server, int status) {
int r;
uv_buf_t buf;
uv_tcp_t* conn;
if (!connection_accepted) {
/*
* Accept the connection and close it. Also let the other
* side know.
*/
ASSERT(status == 0);
ASSERT((uv_stream_t*)&tcp_server == server);
conn = malloc(sizeof(*conn));
ASSERT(conn);
r = uv_tcp_init(server->loop, conn);
ASSERT(r == 0);
r = uv_accept(server, (uv_stream_t*)conn);
ASSERT(r == 0);
uv_close((uv_handle_t*)conn, close_conn_cb);
buf = uv_buf_init("accepted_connection\n", 20);
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
NULL, conn_notify_write_cb);
ASSERT(r == 0);
connection_accepted = 1;
}
}
@ -63,7 +114,7 @@ static int ipc_helper() {
* data is transfered over the channel. XXX edit this comment after handle
* transfer is added.
*/
uv_pipe_t channel;
uv_write_t write_req;
int r;
uv_buf_t buf;
@ -73,23 +124,26 @@ static int ipc_helper() {
uv_pipe_open(&channel, 0);
r = uv_tcp_init(uv_default_loop(), &server);
r = uv_tcp_init(uv_default_loop(), &tcp_server);
ASSERT(r == 0);
r = uv_tcp_bind(&server, uv_ip4_addr("0.0.0.0", TEST_PORT));
r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT));
ASSERT(r == 0);
r = uv_listen((uv_stream_t*)&server, 12, ipc_on_connection);
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
ASSERT(r == 0);
buf = uv_buf_init("hello\n", 6);
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
(uv_stream_t*)&server, NULL);
(uv_stream_t*)&tcp_server, NULL);
ASSERT(r == 0);
r = uv_run(uv_default_loop());
ASSERT(r == 0);
ASSERT(connection_accepted == 1);
ASSERT(close_cb_called == 3);
return 0;
}

View File

@ -33,12 +33,46 @@ static uv_tcp_t tcp_server;
static int exit_cb_called;
static int read2_cb_called;
static int local_conn_accepted;
static int remote_conn_accepted;
static int tcp_server_listening;
static uv_write_t write_req;
typedef struct {
uv_connect_t conn_req;
uv_tcp_t conn;
} tcp_conn;
#define CONN_COUNT 100
static void close_server_conn_cb(uv_handle_t* handle) {
free(handle);
}
static void ipc_on_connection(uv_stream_t* server, int status) {
ASSERT(status == 0);
ASSERT((uv_stream_t*)&tcp_server == server);
uv_tcp_t* conn;
int r;
if (!local_conn_accepted) {
/* Accept the connection and close it. Also and close the server. */
ASSERT(status == 0);
ASSERT((uv_stream_t*)&tcp_server == server);
conn = malloc(sizeof(*conn));
ASSERT(conn);
r = uv_tcp_init(server->loop, conn);
ASSERT(r == 0);
r = uv_accept(server, (uv_stream_t*)conn);
ASSERT(r == 0);
uv_close((uv_handle_t*)conn, close_server_conn_cb);
uv_close((uv_handle_t*)server, NULL);
local_conn_accepted = 1;
}
}
@ -55,6 +89,39 @@ static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) {
}
static void close_client_conn_cb(uv_handle_t* handle) {
tcp_conn* p = (tcp_conn*)handle->data;
free(p);
}
static void connect_cb(uv_connect_t* req, int status) {
uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
}
static void make_many_connections() {
tcp_conn* conn;
struct sockaddr_in addr;
int r, i;
for (i = 0; i < CONN_COUNT; i++) {
conn = malloc(sizeof(*conn));
ASSERT(conn);
r = uv_tcp_init(uv_default_loop(), &conn->conn);
ASSERT(r == 0);
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb);
ASSERT(r == 0);
conn->conn.data = conn;
}
}
static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
int r;
@ -78,27 +145,40 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
abort();
}
ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
read2_cb_called++;
/* Accept the pending TCP server, and start listening on it. */
ASSERT(pending == UV_TCP);
r = uv_tcp_init(uv_default_loop(), &tcp_server);
ASSERT(r == 0);
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
ASSERT(r == 0);
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
ASSERT(r == 0);
/* Make sure that the expected data is correctly multiplexed. */
ASSERT(memcmp("hello\n", buf.base, nread) == 0);
fprintf(stderr, "got %d bytes\n", (int)nread);
outbuf = uv_buf_init("world\n", 6);
r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
ASSERT(r == 0);
if (!tcp_server_listening) {
ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
read2_cb_called++;
/* Accept the pending TCP server, and start listening on it. */
ASSERT(pending == UV_TCP);
r = uv_tcp_init(uv_default_loop(), &tcp_server);
ASSERT(r == 0);
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
ASSERT(r == 0);
r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection);
ASSERT(r == 0);
tcp_server_listening = 1;
/* Make sure that the expected data is correctly multiplexed. */
ASSERT(memcmp("hello\n", buf.base, nread) == 0);
outbuf = uv_buf_init("world\n", 6);
r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
ASSERT(r == 0);
/* Create a bunch of connections to get both servers to accept. */
make_many_connections();
} else if (memcmp("accepted_connection\n", buf.base, nread) == 0) {
/* Remote server has accepted a connection. Close the channel. */
ASSERT(pending == UV_UNKNOWN_HANDLE);
remote_conn_accepted = 1;
uv_close((uv_handle_t*)&channel, NULL);
}
free(buf.base);
}
@ -133,6 +213,8 @@ TEST_IMPL(ipc) {
r = uv_run(uv_default_loop());
ASSERT(r == 0);
ASSERT(local_conn_accepted == 1);
ASSERT(remote_conn_accepted == 1);
ASSERT(read2_cb_called == 1);
ASSERT(exit_cb_called == 1);
return 0;

1
uv.gyp
View File

@ -117,7 +117,6 @@
'src/win/pipe.c',
'src/win/process.c',
'src/win/req.c',
'src/win/stdio.c',
'src/win/stream.c',
'src/win/tcp.c',
'src/win/tty.c',