diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 1b6d86df..72709c8a 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -91,8 +91,6 @@ typedef int uv_file; #define UV_STREAM_PRIVATE_FIELDS \ - uv_read_cb read_cb; \ - uv_alloc_cb alloc_cb; \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 6610e016..7ede6882 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -98,7 +98,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ - /* empty */ + int ipc_header; #define UV_CONNECT_PRIVATE_FIELDS \ /* empty */ @@ -120,6 +120,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); UV_REQ_FIELDS \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + HANDLE event_handle; \ + HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ } uv_tcp_accept_t; @@ -132,8 +134,6 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_alloc_cb alloc_cb; \ - uv_read_cb read_cb; \ uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ @@ -142,10 +142,12 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define uv_tcp_server_fields \ uv_tcp_accept_t* accept_reqs; \ - uv_tcp_accept_t* pending_accepts; + uv_tcp_accept_t* pending_accepts; \ + LPFN_ACCEPTEX func_acceptex; #define uv_tcp_connection_fields \ - uv_buf_t read_buffer; + uv_buf_t read_buffer; \ + LPFN_CONNECTEX func_connectex; #define UV_TCP_PRIVATE_FIELDS \ SOCKET socket; \ @@ -166,11 +168,15 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_alloc_cb alloc_cb; #define uv_pipe_server_fields \ - uv_pipe_accept_t accept_reqs[4]; \ - uv_pipe_accept_t* pending_accepts; + uv_pipe_accept_t accept_reqs[4]; \ + uv_pipe_accept_t* pending_accepts; #define uv_pipe_connection_fields \ - uv_timer_t* eof_timer; + uv_timer_t* eof_timer; \ + uv_write_t ipc_header_write_req; \ + int ipc_pid; \ + uint64_t remaining_ipc_rawdata_bytes; \ + WSAPROTOCOL_INFOW* pending_socket_info; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/include/uv.h b/include/uv.h index 824b3c41..8ea7ab0e 100644 --- a/include/uv.h +++ b/include/uv.h @@ -41,114 +41,12 @@ extern "C" { typedef intptr_t ssize_t; #endif -typedef struct uv_loop_s uv_loop_t; -typedef struct uv_ares_task_s uv_ares_task_t; -typedef struct uv_err_s uv_err_t; -typedef struct uv_handle_s uv_handle_t; -typedef struct uv_stream_s uv_stream_t; -typedef struct uv_tcp_s uv_tcp_t; -typedef struct uv_udp_s uv_udp_t; -typedef struct uv_pipe_s uv_pipe_t; -typedef struct uv_tty_s uv_tty_t; -typedef struct uv_timer_s uv_timer_t; -typedef struct uv_prepare_s uv_prepare_t; -typedef struct uv_check_s uv_check_t; -typedef struct uv_idle_s uv_idle_t; -typedef struct uv_async_s uv_async_t; -typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; -typedef struct uv_process_s uv_process_t; -typedef struct uv_counters_s uv_counters_t; -/* Request types */ -typedef struct uv_req_s uv_req_t; -typedef struct uv_shutdown_s uv_shutdown_t; -typedef struct uv_write_s uv_write_t; -typedef struct uv_connect_s uv_connect_t; -typedef struct uv_udp_send_s uv_udp_send_t; -typedef struct uv_fs_s uv_fs_t; -/* uv_fs_event_t is a subclass of uv_handle_t. */ -typedef struct uv_fs_event_s uv_fs_event_t; -typedef struct uv_work_s uv_work_t; - #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) # include "uv-private/uv-unix.h" #else # include "uv-private/uv-win.h" #endif - -/* - * This function must be called before any other functions in libuv. - * - * All functions besides uv_run() are non-blocking. - * - * All callbacks in libuv are made asynchronously. That is they are never - * made by the function that takes them as a parameter. - */ -uv_loop_t* uv_loop_new(); -void uv_loop_delete(uv_loop_t*); - - -/* - * Returns the default loop. - */ -uv_loop_t* uv_default_loop(); - -/* - * This function starts the event loop. It blocks until the reference count - * of the loop drops to zero. - */ -int uv_run(uv_loop_t*); - -/* - * Manually modify the event loop's reference count. Useful if the user wants - * to have a handle or timeout that doesn't keep the loop alive. - */ -void uv_ref(uv_loop_t*); -void uv_unref(uv_loop_t*); - -void uv_update_time(uv_loop_t*); -int64_t uv_now(uv_loop_t*); - - -/* - * The status parameter is 0 if the request completed successfully, - * and should be -1 if the request was cancelled or failed. - * For uv_close_cb, -1 means that the handle was closed due to an error. - * Error details can be obtained by calling uv_last_error(). - * - * In the case of uv_read_cb the uv_buf_t returned should be freed by the - * user. - */ -typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); -typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); -typedef void (*uv_write_cb)(uv_write_t* req, int status); -typedef void (*uv_connect_cb)(uv_connect_t* req, int status); -typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); -typedef void (*uv_connection_cb)(uv_stream_t* server, int status); -typedef void (*uv_close_cb)(uv_handle_t* handle); -typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); -/* TODO: do these really need a status argument? */ -typedef void (*uv_async_cb)(uv_async_t* handle, int status); -typedef void (*uv_prepare_cb)(uv_prepare_t* handle, int status); -typedef void (*uv_check_cb)(uv_check_t* handle, int status); -typedef void (*uv_idle_cb)(uv_idle_t* handle, int status); -typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* handle, int status, - struct addrinfo* res); -typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); -typedef void (*uv_fs_cb)(uv_fs_t* req); -typedef void (*uv_work_cb)(uv_work_t* req); -typedef void (*uv_after_work_cb)(uv_work_t* req); - -/* -* This will be called repeatedly after the uv_fs_event_t is initialized. -* If uv_fs_event_t was initialized with a directory the filename parameter -* will be a relative path to a file contained in the directory. -* The events paramenter is an ORed mask of enum uv_fs_event elements. -*/ -typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, - int events, int status); - - /* Expand this list if necessary. */ typedef enum { UV_UNKNOWN = -1, @@ -232,6 +130,116 @@ typedef enum { } uv_req_type; + +typedef struct uv_loop_s uv_loop_t; +typedef struct uv_ares_task_s uv_ares_task_t; +typedef struct uv_err_s uv_err_t; +typedef struct uv_handle_s uv_handle_t; +typedef struct uv_stream_s uv_stream_t; +typedef struct uv_tcp_s uv_tcp_t; +typedef struct uv_udp_s uv_udp_t; +typedef struct uv_pipe_s uv_pipe_t; +typedef struct uv_tty_s uv_tty_t; +typedef struct uv_timer_s uv_timer_t; +typedef struct uv_prepare_s uv_prepare_t; +typedef struct uv_check_s uv_check_t; +typedef struct uv_idle_s uv_idle_t; +typedef struct uv_async_s uv_async_t; +typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; +typedef struct uv_process_s uv_process_t; +typedef struct uv_counters_s uv_counters_t; +/* Request types */ +typedef struct uv_req_s uv_req_t; +typedef struct uv_shutdown_s uv_shutdown_t; +typedef struct uv_write_s uv_write_t; +typedef struct uv_connect_s uv_connect_t; +typedef struct uv_udp_send_s uv_udp_send_t; +typedef struct uv_fs_s uv_fs_t; +/* uv_fs_event_t is a subclass of uv_handle_t. */ +typedef struct uv_fs_event_s uv_fs_event_t; +typedef struct uv_work_s uv_work_t; + + +/* + * This function must be called before any other functions in libuv. + * + * All functions besides uv_run() are non-blocking. + * + * All callbacks in libuv are made asynchronously. That is they are never + * made by the function that takes them as a parameter. + */ +uv_loop_t* uv_loop_new(); +void uv_loop_delete(uv_loop_t*); + + +/* + * Returns the default loop. + */ +uv_loop_t* uv_default_loop(); + +/* + * This function starts the event loop. It blocks until the reference count + * of the loop drops to zero. + */ +int uv_run(uv_loop_t*); + +/* + * Manually modify the event loop's reference count. Useful if the user wants + * to have a handle or timeout that doesn't keep the loop alive. + */ +void uv_ref(uv_loop_t*); +void uv_unref(uv_loop_t*); + +void uv_update_time(uv_loop_t*); +int64_t uv_now(uv_loop_t*); + + +/* + * The status parameter is 0 if the request completed successfully, + * and should be -1 if the request was cancelled or failed. + * For uv_close_cb, -1 means that the handle was closed due to an error. + * Error details can be obtained by calling uv_last_error(). + * + * In the case of uv_read_cb the uv_buf_t returned should be freed by the + * user. + */ +typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); +typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); +/* + * Just like the uv_read_cb except that if the pending parameter is true + * then you can use uv_accept() to pull the new handle into the process. + * If no handle is pending then pending will be UV_UNKNOWN_HANDLE. + */ +typedef void (*uv_read2_cb)(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending); +typedef void (*uv_write_cb)(uv_write_t* req, int status); +typedef void (*uv_connect_cb)(uv_connect_t* req, int status); +typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); +typedef void (*uv_connection_cb)(uv_stream_t* server, int status); +typedef void (*uv_close_cb)(uv_handle_t* handle); +typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); +/* TODO: do these really need a status argument? */ +typedef void (*uv_async_cb)(uv_async_t* handle, int status); +typedef void (*uv_prepare_cb)(uv_prepare_t* handle, int status); +typedef void (*uv_check_cb)(uv_check_t* handle, int status); +typedef void (*uv_idle_cb)(uv_idle_t* handle, int status); +typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* handle, int status, + struct addrinfo* res); +typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal); +typedef void (*uv_fs_cb)(uv_fs_t* req); +typedef void (*uv_work_cb)(uv_work_t* req); +typedef void (*uv_after_work_cb)(uv_work_t* req); + +/* +* This will be called repeatedly after the uv_fs_event_t is initialized. +* If uv_fs_event_t was initialized with a directory the filename parameter +* will be a relative path to a file contained in the directory. +* The events paramenter is an ORed mask of enum uv_fs_event elements. +*/ +typedef void (*uv_fs_event_cb)(uv_fs_event_t* handle, const char* filename, + int events, int status); + + struct uv_err_s { /* read-only */ uv_err_code code; @@ -330,6 +338,9 @@ uv_buf_t uv_buf_init(char* base, size_t len); #define UV_STREAM_FIELDS \ /* number of bytes queued for writing */ \ size_t write_queue_size; \ + uv_alloc_cb alloc_cb; \ + uv_read_cb read_cb; \ + uv_read2_cb read2_cb; \ /* private */ \ UV_STREAM_PRIVATE_FIELDS @@ -338,8 +349,8 @@ uv_buf_t uv_buf_init(char* base, size_t len); * * uv_stream is an abstract class. * - * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t - * and soon uv_file_t. + * uv_stream_t is the parent class of uv_tcp_t, uv_pipe_t, uv_tty_t, and + * soon uv_file_t. */ struct uv_stream_s { UV_HANDLE_FIELDS @@ -375,13 +386,12 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); -typedef enum { - UV_STDIN = 0, - UV_STDOUT, - UV_STDERR -} uv_std_type; +/* + * Extended read methods for receiving handles over a pipe. The pipe must be + * initialized with ipc == 1. + */ +int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read2_cb read_cb); -uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); /* * Write data to stream. Buffers are written in order. Example: @@ -404,10 +414,14 @@ uv_stream_t* uv_std_handle(uv_loop_t*, uv_std_type type); int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb); + /* uv_write_t is a subclass of uv_req_t */ struct uv_write_s { UV_REQ_FIELDS uv_write_cb cb; + uv_stream_t* send_handle; uv_stream_t* handle; UV_WRITE_PRIVATE_FIELDS }; @@ -648,9 +662,14 @@ struct uv_pipe_s { UV_HANDLE_FIELDS UV_STREAM_FIELDS UV_PIPE_PRIVATE_FIELDS + int ipc; /* non-zero if this pipe is used for passing handles */ }; -int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle); +/* + * Initialize a pipe. The last argument is a boolean to indicate if + * this pipe will be used for handle passing between processes. + */ +int uv_pipe_init(uv_loop_t*, uv_pipe_t* handle, int ipc); /* * Opens an existing file descriptor or HANDLE as a pipe. diff --git a/src/unix/core.c b/src/unix/core.c index 719327a9..c834aaae 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -790,10 +790,3 @@ size_t uv__strlcpy(char* dst, const char* src, size_t size) { return src - org; } - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - assert(0 && "implement me"); - return NULL; -} - diff --git a/src/unix/pipe.c b/src/unix/pipe.c index 86c11dea..dabdcd6c 100644 --- a/src/unix/pipe.c +++ b/src/unix/pipe.c @@ -29,10 +29,12 @@ #include #include -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { + +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE); loop->counters.pipe_init++; handle->pipe_fname = NULL; + handle->ipc = ipc; return 0; } diff --git a/src/unix/process.c b/src/unix/process.c index 487f2075..06af65d5 100644 --- a/src/unix/process.c +++ b/src/unix/process.c @@ -1,4 +1,3 @@ - /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. * * Permission is hereby granted, free of charge, to any person obtaining a copy @@ -63,6 +62,34 @@ static void uv__chld(EV_P_ ev_child* watcher, int revents) { } } + +/* + * Used for initializing stdio streams like options.stdin_stream. Returns + * zero on success. + */ +static int uv__process_init_pipe(uv_pipe_t* handle, int fds[2]) { + if (handle->type != UV_NAMED_PIPE) { + errno = EINVAL; + return -1; + } + + if (handle->ipc) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return -1; + } + } else { + if (pipe(fds) < 0) { + return -1; + } + } + + uv__cloexec(fds[0], 1); + uv__cloexec(fds[1], 1); + + return 0; +} + + #ifndef SPAWN_WAIT_EXEC # define SPAWN_WAIT_EXEC 1 #endif @@ -89,43 +116,19 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->exit_cb = options.exit_cb; - if (options.stdin_stream) { - if (options.stdin_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdin_pipe) < 0) { - goto error; - } - uv__cloexec(stdin_pipe[0], 1); - uv__cloexec(stdin_pipe[1], 1); + if (options.stdin_stream && + uv__process_init_pipe(options.stdin_stream, stdin_pipe)) { + goto error; } - if (options.stdout_stream) { - if (options.stdout_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stdout_pipe) < 0) { - goto error; - } - uv__cloexec(stdout_pipe[0], 1); - uv__cloexec(stdout_pipe[1], 1); + if (options.stdout_stream && + uv__process_init_pipe(options.stdout_stream, stdout_pipe)) { + goto error; } - if (options.stderr_stream) { - if (options.stderr_stream->type != UV_NAMED_PIPE) { - errno = EINVAL; - goto error; - } - - if (pipe(stderr_pipe) < 0) { - goto error; - } - uv__cloexec(stderr_pipe[0], 1); - uv__cloexec(stderr_pipe[1], 1); + if (options.stderr_stream && + uv__process_init_pipe(options.stderr_stream, stderr_pipe)) { + goto error; } /* This pipe is used by the parent to wait until @@ -154,7 +157,7 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, goto error; } # else - if (pipe(signal_pipe) < 0) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, signal_pipe) < 0) { goto error; } uv__cloexec(signal_pipe[0], 1); diff --git a/src/unix/stream.c b/src/unix/stream.c index f7c0a684..02681582 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -29,6 +29,8 @@ #include #include +#include + static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); @@ -349,14 +351,43 @@ static void uv__write(uv_stream_t* stream) { * inside the iov each time we write. So there is no need to offset it. */ - do { - if (iovcnt == 1) { - n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); - } else { - n = writev(stream->fd, iov, iovcnt); + if (req->send_handle) { + struct msghdr msg; + char scratch[64]; + struct cmsghdr *cmsg; + int fd_to_send = req->send_handle->fd; + + assert(fd_to_send >= 0); + + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = iov; + msg.msg_iovlen = iovcnt; + msg.msg_flags = 0; + + msg.msg_control = (void*) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + *(int*) CMSG_DATA(cmsg) = fd_to_send; + + do { + n = sendmsg(stream->fd, &msg, 0); } + while (n == -1 && errno == EINTR); + } else { + do { + if (iovcnt == 1) { + n = write(stream->fd, iov[0].iov_base, iov[0].iov_len); + } else { + n = writev(stream->fd, iov, iovcnt); + } + } + while (n == -1 && errno == EINTR); } - while (n == -1 && errno == EINTR); if (n < 0) { if (errno != EAGAIN) { @@ -447,12 +478,17 @@ static void uv__write_callbacks(uv_stream_t* stream) { static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; + struct msghdr msg; + struct cmsghdr* cmsg; + char cmsg_space[64]; + int received_fd = -1; struct ev_loop* ev = stream->loop->ev; /* XXX: Maybe instead of having UV_READING we just test if * tcp->read_cb is NULL or not? */ - while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { + while ((stream->read_cb || stream->read2_cb) && + stream->flags & UV_READING) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); @@ -460,10 +496,29 @@ static void uv__read(uv_stream_t* stream) { assert(buf.base); assert(stream->fd >= 0); - do { - nread = read(stream->fd, buf.base, buf.len); + if (stream->read_cb) { + do { + nread = read(stream->fd, buf.base, buf.len); + } + while (nread < 0 && errno == EINTR); + } else { + assert(stream->read2_cb); + /* read2_cb uses recvmsg */ + msg.msg_flags = 0; + msg.msg_iov = (struct iovec*) &buf; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + /* Set up to receive a descriptor even if one isn't in the message */ + msg.msg_controllen = 64; + msg.msg_control = (void *) cmsg_space; + + do { + nread = recvmsg(stream->fd, &msg, 0); + } + while (nread < 0 && errno == EINTR); } - while (nread < 0 && errno == EINTR); + if (nread < 0) { /* Error */ @@ -473,24 +528,78 @@ static void uv__read(uv_stream_t* stream) { ev_io_start(ev, &stream->read_watcher); } uv__set_sys_error(stream->loop, EAGAIN); - stream->read_cb(stream, 0, buf); + + if (stream->read_cb) { + stream->read_cb(stream, 0, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE); + } + return; } else { /* Error. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } + assert(!ev_is_active(&stream->read_watcher)); return; } + } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); ev_io_stop(ev, &stream->read_watcher); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } return; } else { /* Successful read */ - stream->read_cb(stream, nread, buf); + + if (stream->read_cb) { + stream->read_cb(stream, nread, buf); + } else { + assert(stream->read2_cb); + + /* + * XXX: Some implementations can send multiple file descriptors in a + * single message. We should be using CMSG_NXTHDR() to walk the + * chain to get at them all. This would require changing the API to + * hand these back up the caller, is a pain. + */ + + for (cmsg = CMSG_FIRSTHDR(&msg); + msg.msg_controllen > 0 && cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + + if (cmsg->cmsg_type == SCM_RIGHTS) { + if (stream->accepted_fd != -1) { + fprintf(stderr, "(libuv) ignoring extra FD received\n"); + } + + stream->accepted_fd = *(int *) CMSG_DATA(cmsg); + + } else { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + } + } + + + if (stream->accepted_fd >= 0) { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + } else { + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_UNKNOWN_HANDLE); + } + } } } } @@ -672,11 +781,8 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr, } -/* The buffers to be written must remain valid until the callback is called. - * This is not required for the uv_buf_t array. - */ -int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, - uv_write_cb cb) { +int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { int empty_queue; assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || @@ -688,6 +794,13 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, return -1; } + if (send_handle) { + if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc) { + uv__set_sys_error(stream->loop, EOPNOTSUPP); + return -1; + } + } + empty_queue = (stream->write_queue_size == 0); /* Initialize the req */ @@ -695,6 +808,7 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, req->cb = cb; req->handle = stream; req->error = 0; + req->send_handle = send_handle; req->type = UV_WRITE; ngx_queue_init(&req->queue); @@ -737,7 +851,17 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { +/* The buffers to be written must remain valid until the callback is called. + * This is not required for the uv_buf_t array. + */ +int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + return uv_write2(req, stream, bufs, bufcnt, NULL, cb); +} + + +int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -759,6 +883,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) assert(alloc_cb); stream->read_cb = read_cb; + stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; /* These should have been set by uv_tcp_init. */ @@ -769,6 +894,18 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) } +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, read_cb, NULL); +} + + +int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, NULL, read_cb); +} + + int uv_read_stop(uv_stream_t* stream) { uv_tcp_t* tcp = (uv_tcp_t*)stream; @@ -776,6 +913,7 @@ int uv_read_stop(uv_stream_t* stream) { ev_io_stop(tcp->loop->ev, &tcp->read_watcher); tcp->read_cb = NULL; + tcp->read2_cb = NULL; tcp->alloc_cb = NULL; return 0; } diff --git a/src/win/internal.h b/src/win/internal.h index f8762145..7753e70b 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -44,27 +44,28 @@ void uv_process_timers(uv_loop_t* loop); */ /* Private uv_handle flags */ -#define UV_HANDLE_CLOSING 0x0001 -#define UV_HANDLE_CLOSED 0x0002 -#define UV_HANDLE_BOUND 0x0004 -#define UV_HANDLE_LISTENING 0x0008 -#define UV_HANDLE_CONNECTION 0x0010 -#define UV_HANDLE_CONNECTED 0x0020 -#define UV_HANDLE_READING 0x0040 -#define UV_HANDLE_ACTIVE 0x0040 -#define UV_HANDLE_EOF 0x0080 -#define UV_HANDLE_SHUTTING 0x0100 -#define UV_HANDLE_SHUT 0x0200 -#define UV_HANDLE_ENDGAME_QUEUED 0x0400 -#define UV_HANDLE_BIND_ERROR 0x1000 -#define UV_HANDLE_IPV6 0x2000 -#define UV_HANDLE_PIPESERVER 0x4000 -#define UV_HANDLE_READ_PENDING 0x8000 -#define UV_HANDLE_GIVEN_OS_HANDLE 0x10000 -#define UV_HANDLE_UV_ALLOCED 0x20000 -#define UV_HANDLE_SYNC_BYPASS_IOCP 0x40000 -#define UV_HANDLE_ZERO_READ 0x80000 -#define UV_HANDLE_TTY_RAW 0x100000 +#define UV_HANDLE_CLOSING 0x0001 +#define UV_HANDLE_CLOSED 0x0002 +#define UV_HANDLE_BOUND 0x0004 +#define UV_HANDLE_LISTENING 0x0008 +#define UV_HANDLE_CONNECTION 0x0010 +#define UV_HANDLE_CONNECTED 0x0020 +#define UV_HANDLE_READING 0x0040 +#define UV_HANDLE_ACTIVE 0x0040 +#define UV_HANDLE_EOF 0x0080 +#define UV_HANDLE_SHUTTING 0x0100 +#define UV_HANDLE_SHUT 0x0200 +#define UV_HANDLE_ENDGAME_QUEUED 0x0400 +#define UV_HANDLE_BIND_ERROR 0x1000 +#define UV_HANDLE_IPV6 0x2000 +#define UV_HANDLE_PIPESERVER 0x4000 +#define UV_HANDLE_READ_PENDING 0x8000 +#define UV_HANDLE_UV_ALLOCED 0x10000 +#define UV_HANDLE_SYNC_BYPASS_IOCP 0x20000 +#define UV_HANDLE_ZERO_READ 0x40000 +#define UV_HANDLE_TTY_RAW 0x80000 +#define UV_HANDLE_USE_IPC_PROTOCOL 0x100000 +#define UV_HANDLE_EMULATE_IOCP 0x200000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -97,8 +98,8 @@ uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped); void uv_insert_pending_req(uv_loop_t* loop, uv_req_t* req); void uv_process_reqs(uv_loop_t* loop); -#define POST_COMPLETION_FOR_REQ(loop, req) \ - if (!PostQueuedCompletionStatus((loop)->iocp, \ +#define POST_COMPLETION_FOR_REQ(loop, req) \ + if (!PostQueuedCompletionStatus((loop)->iocp, \ 0, \ 0, \ &((req)->overlapped))) { \ @@ -135,6 +136,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); + /* * UDP @@ -149,19 +152,21 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle); /* * Pipes */ -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle); int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, char* name, size_t nameSize); void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client); +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb); +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); @@ -267,6 +272,10 @@ void uv_fs_event_close(uv_loop_t* loop, uv_fs_event_t* handle); void uv_fs_event_endgame(uv_loop_t* loop, uv_fs_event_t* handle); +/* Utils */ +int uv_parent_pid(); + + /* * Error handling */ diff --git a/src/win/pipe.c b/src/win/pipe.c index c997a1e2..34ff0245 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -20,6 +20,7 @@ */ #include +#include #include #include @@ -38,6 +39,22 @@ static const uv_buf_t uv_null_buf_ = { 0, NULL }; /* when the local ends wants to shut it down. */ static const int64_t eof_timeout = 50; /* ms */ +/* IPC protocol flags. */ +#define UV_IPC_RAW_DATA 0x0001 +#define UV_IPC_UV_STREAM 0x0002 + +/* IPC frame header. */ +typedef struct { + int flags; + uint64_t raw_data_length; +} uv_ipc_frame_header_t; + +/* IPC frame, which contains an imported TCP socket stream. */ +typedef struct { + uv_ipc_frame_header_t header; + WSAPROTOCOL_INFOW socket_info; +} uv_ipc_frame_uv_stream; + static void eof_timer_init(uv_pipe_t* pipe); static void eof_timer_start(uv_pipe_t* pipe); static void eof_timer_stop(uv_pipe_t* pipe); @@ -51,13 +68,22 @@ static void uv_unique_pipe_name(char* ptr, char* name, size_t size) { } -int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { +int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { uv_stream_init(loop, (uv_stream_t*)handle); handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; handle->handle = INVALID_HANDLE_VALUE; handle->name = NULL; + handle->ipc_pid = 0; + handle->remaining_ipc_rawdata_bytes = 0; + handle->pending_socket_info = NULL; + + uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); + + if (ipc) { + handle->flags |= UV_HANDLE_USE_IPC_PROTOCOL; + } loop->counters.pipe_init++; @@ -65,24 +91,6 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle) { } -int uv_pipe_init_with_handle(uv_loop_t* loop, uv_pipe_t* handle, - HANDLE pipeHandle) { - int err = uv_pipe_init(loop, handle); - - if (!err) { - /* - * At this point we don't know whether the pipe will be used as a client - * or a server. So, we assume that it will be a client until - * uv_listen is called. - */ - handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; - } - - return err; -} - - static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->eof_timer = NULL; @@ -131,7 +139,6 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access, uv_pipe_connection_init(handle); handle->handle = pipeHandle; - handle->flags |= UV_HANDLE_GIVEN_OS_HANDLE; err = 0; done: @@ -192,7 +199,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; - if (handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && handle->write_reqs_pending == 0) { @@ -250,6 +256,13 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (handle->flags & UV_HANDLE_CONNECTION) { + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; + } + } + /* Remember the state of this flag because the close callback is */ /* allowed to clobber or free the handle's memory */ uv_alloced = handle->flags & UV_HANDLE_UV_ALLOCED; @@ -567,30 +580,44 @@ static void uv_pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, } -int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { +int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_loop_t* loop = server->loop; - /* Find a connection instance that has been connected, but not yet */ - /* accepted. */ - uv_pipe_accept_t* req = server->pending_accepts; + uv_pipe_t* pipe_client; + uv_pipe_accept_t* req; - if (!req) { - /* No valid connections found, so we error out. */ - uv__set_sys_error(loop, WSAEWOULDBLOCK); - return -1; - } + if (server->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + if (!server->pending_socket_info) { + /* No valid pending sockets. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } - /* Initialize the client handle and copy the pipeHandle to the client */ - uv_pipe_connection_init(client); - client->handle = req->pipeHandle; + return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info); + } else { + pipe_client = (uv_pipe_t*)client; - /* Prepare the req to pick up a new connection */ - server->pending_accepts = req->next_pending; - req->next_pending = NULL; - req->pipeHandle = INVALID_HANDLE_VALUE; + /* Find a connection instance that has been connected, but not yet */ + /* accepted. */ + req = server->pending_accepts; - if (!(server->flags & UV_HANDLE_CLOSING) && - !(server->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { - uv_pipe_queue_accept(loop, server, req, FALSE); + if (!req) { + /* No valid connections found, so we error out. */ + uv__set_sys_error(loop, WSAEWOULDBLOCK); + return -1; + } + + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_connection_init(pipe_client); + pipe_client->handle = req->pipeHandle; + + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; + + if (!(server->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(loop, server, req, FALSE); + } } return 0; @@ -602,11 +629,8 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { uv_loop_t* loop = handle->loop; int i, errno; - uv_pipe_accept_t* req; - HANDLE pipeHandle; - if (!(handle->flags & UV_HANDLE_BOUND) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { uv__set_artificial_error(loop, UV_EINVAL); return -1; } @@ -617,8 +641,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { return -1; } - if (!(handle->flags & UV_HANDLE_PIPESERVER) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_PIPESERVER)) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -626,30 +649,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - if (handle->flags & UV_HANDLE_GIVEN_OS_HANDLE) { - handle->flags |= UV_HANDLE_PIPESERVER; - pipeHandle = handle->handle; - assert(pipeHandle != INVALID_HANDLE_VALUE); - req = &handle->accept_reqs[0]; - uv_req_init(loop, (uv_req_t*) req); - req->pipeHandle = pipeHandle; - req->type = UV_ACCEPT; - req->data = handle; - req->next_pending = NULL; + /* First pipe handle should have already been created in uv_pipe_bind */ + assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - if (uv_set_pipe_handle(loop, handle, pipeHandle)) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - - uv_pipe_queue_accept(loop, handle, req, TRUE); - } else { - /* First pipe handle should have already been created in uv_pipe_bind */ - assert(handle->accept_reqs[0].pipeHandle != INVALID_HANDLE_VALUE); - - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); - } + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(loop, handle, &handle->accept_reqs[i], i == 0); } return 0; @@ -693,8 +697,8 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { } -int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { uv_loop_t* loop = handle->loop; if (!(handle->flags & UV_HANDLE_CONNECTION)) { @@ -714,9 +718,10 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, handle->flags |= UV_HANDLE_READING; handle->read_cb = read_cb; + handle->read2_cb = read2_cb; handle->alloc_cb = alloc_cb; - /* If reading was stopped and then started again, there could stell be a */ + /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(loop, handle); @@ -725,11 +730,33 @@ int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } -int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, - uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { - int result; +int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); +} - if (bufcnt != 1) { + +int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); +} + + +static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, + uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + int result; + uv_tcp_t* tcp_send_handle; + uv_write_t* ipc_header_req; + DWORD written; + uv_ipc_frame_uv_stream ipc_frame; + + if (bufcnt != 1 && (bufcnt != 0 || !send_handle)) { + uv__set_artificial_error(loop, UV_ENOTSUP); + return -1; + } + + if (send_handle && send_handle->type != UV_TCP) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -750,8 +777,86 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, req->type = UV_WRITE; req->handle = (uv_stream_t*) handle; req->cb = cb; + req->ipc_header = 0; memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol. */ + if (send_handle) { + tcp_send_handle = (uv_tcp_t*)send_handle; + if (WSADuplicateSocketW(tcp_send_handle->socket, handle->ipc_pid, + &ipc_frame.socket_info)) { + uv__set_sys_error(loop, WSAGetLastError()); + return -1; + } + ipc_frame.header.flags |= UV_IPC_UV_STREAM; + } + + if (bufcnt == 1) { + ipc_frame.header.flags |= UV_IPC_RAW_DATA; + ipc_frame.header.raw_data_length = bufs[0].len; + } + + /* + * Use the provided req if we're only doing a single write. + * If we're doing multiple writes, use ipc_header_write_req to do + * the first write, and then use the provided req for the second write. + */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + ipc_header_req = req; + } else { + /* + * Try to use the preallocated write req if it's available. + * Otherwise allocate a new one. + */ + if (handle->ipc_header_write_req.type != UV_WRITE) { + ipc_header_req = (uv_write_t*)&handle->ipc_header_write_req; + } else { + ipc_header_req = (uv_write_t*)malloc(sizeof(uv_write_t)); + if (!handle->accept_reqs) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + } + + uv_req_init(loop, (uv_req_t*) ipc_header_req); + ipc_header_req->type = UV_WRITE; + ipc_header_req->handle = (uv_stream_t*) handle; + ipc_header_req->cb = NULL; + ipc_header_req->ipc_header = 1; + } + + /* Write the header or the whole frame. */ + memset(&ipc_header_req->overlapped, 0, sizeof(ipc_header_req->overlapped)); + + result = WriteFile(handle->handle, + &ipc_frame, + ipc_frame.header.flags & UV_IPC_UV_STREAM ? + sizeof(ipc_frame) : sizeof(ipc_frame.header), + &written, + &ipc_header_req->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = written; + handle->write_queue_size += req->queued_bytes; + } + + handle->reqs_pending++; + handle->write_reqs_pending++; + + /* If we don't have any raw data to write - we're done. */ + if (!(ipc_frame.header.flags & UV_IPC_RAW_DATA)) { + return 0; + } + } + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, @@ -779,6 +884,23 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, } +int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, NULL, cb); +} + + +int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, + uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { + if (!(handle->flags & UV_HANDLE_USE_IPC_PROTOCOL)) { + uv__set_artificial_error(loop, UV_EINVAL); + return -1; + } + + return uv_pipe_write_impl(loop, req, handle, bufs, bufcnt, send_handle, cb); +} + + static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_buf_t buf) { /* If there is an eof timer running, we don't need it any more, */ @@ -789,7 +911,11 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, uv_read_stop((uv_stream_t*) handle); uv__set_artificial_error(loop, UV_EOF); - handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, uv_null_buf_, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*) handle, -1, uv_null_buf_); + } } @@ -802,7 +928,11 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_read_stop((uv_stream_t*) handle); uv__set_sys_error(loop, error); - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (handle->read2_cb) { + handle->read2_cb(handle, -1, buf, UV_UNKNOWN_HANDLE); + } else { + handle->read_cb((uv_stream_t*)handle, -1, buf); + } } @@ -820,6 +950,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, avail; uv_buf_t buf; + uv_ipc_frame_uv_stream ipc_frame; assert(handle->type == UV_NAMED_PIPE); @@ -838,11 +969,11 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, /* Do non-blocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { if (!PeekNamedPipe(handle->handle, - NULL, - 0, - NULL, - &avail, - NULL)) { + NULL, + 0, + NULL, + &avail, + NULL)) { uv_pipe_read_error_or_eof(loop, handle, GetLastError(), uv_null_buf_); break; } @@ -852,6 +983,62 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, break; } + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + /* Use the IPC framing protocol to read the incoming data. */ + if (handle->remaining_ipc_rawdata_bytes == 0) { + /* We're reading a new frame. First, read the header. */ + assert(avail >= sizeof(ipc_frame.header)); + + if (!ReadFile(handle->handle, + &ipc_frame.header, + sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame.header)); + + if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { + assert(avail - sizeof(ipc_frame.header) >= + sizeof(ipc_frame.socket_info)); + + /* Read the TCP socket info. */ + if (!ReadFile(handle->handle, + &ipc_frame.socket_info, + sizeof(ipc_frame) - sizeof(ipc_frame.header), + &bytes, + NULL)) { + uv_pipe_read_error_or_eof(loop, handle, GetLastError(), + uv_null_buf_); + break; + } + + assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); + + /* Store the pending socket info. */ + assert(!handle->pending_socket_info); + handle->pending_socket_info = + (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info))); + if (!handle->pending_socket_info) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + *(handle->pending_socket_info) = ipc_frame.socket_info; + } + + if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { + handle->remaining_ipc_rawdata_bytes = + ipc_frame.header.raw_data_length; + continue; + } + } else { + avail = min(avail, (DWORD)handle->remaining_ipc_rawdata_bytes); + } + } + buf = handle->alloc_cb((uv_handle_t*) handle, avail); assert(buf.len > 0); @@ -861,7 +1048,25 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, &bytes, NULL)) { /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); + if (handle->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + assert(handle->remaining_ipc_rawdata_bytes >= bytes); + handle->remaining_ipc_rawdata_bytes = + handle->remaining_ipc_rawdata_bytes - bytes; + if (handle->read2_cb) { + handle->read2_cb(handle, bytes, buf, + handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); + } else if (handle->read_cb) { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + + if (handle->pending_socket_info) { + free(handle->pending_socket_info); + handle->pending_socket_info = NULL; + } + } else { + handle->read_cb((uv_stream_t*)handle, bytes, buf); + } + /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { break; @@ -889,12 +1094,20 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, handle->write_queue_size -= req->queued_bytes; - if (req->cb) { - if (!REQ_SUCCESS(req)) { - uv__set_sys_error(loop, GET_REQ_ERROR(req)); - ((uv_write_cb)req->cb)(req, -1); + if (req->ipc_header) { + if (req == &handle->ipc_header_write_req) { + req->type = UV_UNKNOWN_REQ; } else { - ((uv_write_cb)req->cb)(req, 0); + free(req); + } + } else { + if (req->cb) { + if (!REQ_SUCCESS(req)) { + uv__set_sys_error(loop, GET_REQ_ERROR(req)); + ((uv_write_cb)req->cb)(req, -1); + } else { + ((uv_write_cb)req->cb)(req, 0); + } } } @@ -927,8 +1140,7 @@ void uv_process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, CloseHandle(req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; } - if (!(handle->flags & UV_HANDLE_CLOSING) && - !(handle->flags & UV_HANDLE_GIVEN_OS_HANDLE)) { + if (!(handle->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(loop, handle, req, FALSE); } } @@ -1065,6 +1277,21 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - assert(0 && "implement me"); -} + HANDLE os_handle; + + /* Special-case stdin with ipc. */ + if (file == 0 && pipe->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + os_handle = (HANDLE)_get_osfhandle(file); + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } + + uv_pipe_connection_init(pipe); + pipe->ipc_pid = uv_parent_pid(); + assert(pipe->ipc_pid != -1); + + pipe->handle = os_handle; + } +} diff --git a/src/win/process.c b/src/win/process.c index 4db04832..da72d55e 100644 --- a/src/win/process.c +++ b/src/win/process.c @@ -45,7 +45,7 @@ typedef struct env_var { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); \ } \ if (!uv_utf8_to_utf16(s, t, size / sizeof(wchar_t))) { \ - uv__set_sys_error(loop, GetLastError()); \ + uv__set_sys_error(loop, GetLastError()); \ err = -1; \ goto done; \ } @@ -739,7 +739,8 @@ void uv_process_close(uv_loop_t* loop, uv_process_t* handle) { static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, - HANDLE* child_pipe, DWORD server_access, DWORD child_access) { + HANDLE* child_pipe, DWORD server_access, DWORD child_access, + int overlapped) { int err; SECURITY_ATTRIBUTES sa = { sizeof(SECURITY_ATTRIBUTES), NULL, TRUE }; char pipe_name[64]; @@ -767,7 +768,7 @@ static int uv_create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* server_pipe, 0, &sa, OPEN_EXISTING, - 0, + overlapped ? FILE_FLAG_OVERLAPPED : 0, NULL); if (*child_pipe == INVALID_HANDLE_VALUE) { @@ -848,7 +849,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, wchar_t* path = NULL; int size; BOOL result; - wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, *env = NULL, *cwd = NULL; + wchar_t* application_path = NULL, *application = NULL, *arguments = NULL, + *env = NULL, *cwd = NULL; HANDLE* child_stdio = process->child_stdio; STARTUPINFOW startup; PROCESS_INFORMATION info; @@ -904,12 +906,23 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, /* Create stdio pipes. */ if (options.stdin_stream) { - err = uv_create_stdio_pipe_pair( - loop, - options.stdin_stream, - &child_stdio[0], - PIPE_ACCESS_OUTBOUND, - GENERIC_READ | FILE_WRITE_ATTRIBUTES); + if (options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_DUPLEX, + GENERIC_READ | FILE_WRITE_ATTRIBUTES | GENERIC_WRITE, + 1); + } else { + err = uv_create_stdio_pipe_pair( + loop, + options.stdin_stream, + &child_stdio[0], + PIPE_ACCESS_OUTBOUND, + GENERIC_READ | FILE_WRITE_ATTRIBUTES, + 0); + } } else { err = duplicate_std_handle(loop, STD_INPUT_HANDLE, &child_stdio[0]); } @@ -922,7 +935,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, loop, options.stdout_stream, &child_stdio[1], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_OUTPUT_HANDLE, &child_stdio[1]); } @@ -936,7 +950,8 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, options.stderr_stream, &child_stdio[2], PIPE_ACCESS_INBOUND, - GENERIC_WRITE); + GENERIC_WRITE, + 0); } else { err = duplicate_std_handle(loop, STD_ERROR_HANDLE, &child_stdio[2]); } @@ -969,6 +984,11 @@ int uv_spawn(uv_loop_t* loop, uv_process_t* process, process->process_handle = info.hProcess; process->pid = info.dwProcessId; + if (options.stdin_stream && + options.stdin_stream->flags & UV_HANDLE_USE_IPC_PROTOCOL) { + options.stdin_stream->ipc_pid = info.dwProcessId; + } + /* Setup notifications for when the child process exits. */ result = RegisterWaitForSingleObject(&process->wait_handle, process->process_handle, exit_wait_callback, (void*)process, INFINITE, diff --git a/src/win/stdio.c b/src/win/stdio.c deleted file mode 100644 index b65e7fb5..00000000 --- a/src/win/stdio.c +++ /dev/null @@ -1,75 +0,0 @@ -/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to - * deal in the Software without restriction, including without limitation the - * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or - * sell copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING - * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS - * IN THE SOFTWARE. - */ - -#include -#include - -#include "uv.h" -#include "../uv-common.h" -#include "internal.h" - - -static uv_pipe_t* uv_make_pipe_for_std_handle(uv_loop_t* loop, HANDLE handle) { - uv_pipe_t* pipe = NULL; - - pipe = (uv_pipe_t*)malloc(sizeof(uv_pipe_t)); - if (!pipe) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - if (uv_pipe_init_with_handle(loop, pipe, handle)) { - free(pipe); - return NULL; - } - - pipe->flags |= UV_HANDLE_UV_ALLOCED; - return pipe; -} - - -uv_stream_t* uv_std_handle(uv_loop_t* loop, uv_std_type type) { - HANDLE handle; - - switch (type) { - case UV_STDIN: - handle = GetStdHandle(STD_INPUT_HANDLE); - if (handle == INVALID_HANDLE_VALUE) { - return NULL; - } - - /* Assume only named pipes for now. */ - return (uv_stream_t*)uv_make_pipe_for_std_handle(loop, handle); - break; - - case UV_STDOUT: - return NULL; - break; - - case UV_STDERR: - return NULL; - break; - - default: - assert(0); - uv__set_artificial_error(loop, UV_EINVAL); - return NULL; - } -} diff --git a/src/win/stream.c b/src/win/stream.c index c38e06bb..f1211784 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -62,13 +62,11 @@ int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { int uv_accept(uv_stream_t* server, uv_stream_t* client) { - assert(client->type == server->type); - switch (server->type) { case UV_TCP: return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); case UV_NAMED_PIPE: - return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + return uv_pipe_accept((uv_pipe_t*)server, client); default: assert(0); return -1; @@ -92,6 +90,18 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, } +int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb); + default: + assert(0); + return -1; + } +} + + int uv_read_stop(uv_stream_t* handle) { if (handle->type == UV_TTY) { return uv_tty_read_stop((uv_tty_t*) handle); @@ -121,6 +131,21 @@ int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, } +int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_stream_t* send_handle, uv_write_cb cb) { + uv_loop_t* loop = handle->loop; + + switch (handle->type) { + case UV_NAMED_PIPE: + return uv_pipe_write2(loop, req, (uv_pipe_t*) handle, bufs, bufcnt, send_handle, cb); + default: + assert(0); + uv__set_sys_error(loop, WSAEINVAL); + return -1; + } +} + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_loop_t* loop = handle->loop; diff --git a/src/win/tcp.c b/src/win/tcp.c index ee95aa11..897ea5e9 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -47,7 +47,7 @@ static unsigned int active_tcp_streams = 0; static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, - SOCKET socket) { + SOCKET socket, int imported) { DWORD yes = 1; assert(handle->socket == INVALID_SOCKET); @@ -70,8 +70,12 @@ static int uv_tcp_set_socket(uv_loop_t* loop, uv_tcp_t* handle, loop->iocp, (ULONG_PTR)socket, 0) == NULL) { - uv__set_sys_error(loop, GetLastError()); - return -1; + if (imported) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } else { + uv__set_sys_error(loop, GetLastError()); + return -1; + } } if (pSetFileCompletionNotificationModes) { @@ -99,6 +103,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { handle->socket = INVALID_SOCKET; handle->type = UV_TCP; handle->reqs_pending = 0; + handle->func_acceptex = NULL; + handle->func_connectex = NULL; loop->counters.tcp_init++; @@ -109,6 +115,8 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) { void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { int status; int sys_error; + unsigned int i; + uv_tcp_accept_t* req; if (handle->flags & UV_HANDLE_CONNECTION && handle->flags & UV_HANDLE_SHUTTING && @@ -139,6 +147,20 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + req = &handle->accept_reqs[i]; + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + } + free(handle->accept_reqs); handle->accept_reqs = NULL; } @@ -169,7 +191,7 @@ static int uv__bind(uv_tcp_t* handle, return -1; } - if (uv_tcp_set_socket(handle->loop, handle, sock) == -1) { + if (uv_tcp_set_socket(handle->loop, handle, sock, 0) == -1) { closesocket(sock); return -1; } @@ -218,24 +240,40 @@ int uv__tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } +static void CALLBACK post_completion(void* context, BOOLEAN timed_out) { + uv_tcp_accept_t* req; + uv_tcp_t* handle; + + req = (uv_tcp_accept_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { uv_loop_t* loop = handle->loop; BOOL success; DWORD bytes; SOCKET accept_socket; short family; - LPFN_ACCEPTEX pAcceptExFamily; assert(handle->flags & UV_HANDLE_LISTENING); assert(req->accept_socket == INVALID_SOCKET); /* choose family and extension function */ - if ((handle->flags & UV_HANDLE_IPV6) != 0) { + if (handle->flags & UV_HANDLE_IPV6) { family = AF_INET6; - pAcceptExFamily = pAcceptEx6; } else { family = AF_INET; - pAcceptExFamily = pAcceptEx; } /* Open a socket for the accepted connection. */ @@ -249,15 +287,18 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - success = pAcceptExFamily(handle->socket, - accept_socket, - (void*)req->accept_buffer, - 0, - sizeof(struct sockaddr_storage), - sizeof(struct sockaddr_storage), - &bytes, - &req->overlapped); + success = handle->func_acceptex(handle->socket, + accept_socket, + (void*)req->accept_buffer, + 0, + sizeof(struct sockaddr_storage), + sizeof(struct sockaddr_storage), + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -268,6 +309,15 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* The req will be processed with IOCP. */ req->accept_socket = accept_socket; handle->reqs_pending++; + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + return; + } } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); @@ -275,6 +325,11 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); + /* Destroy the event handle */ + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + CloseHandle(req->overlapped.hEvent); + req->event_handle = NULL; + } } } @@ -357,6 +412,13 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!handle->func_acceptex) { + if(!uv_get_acceptex_function(handle->socket, &handle->func_acceptex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + if (listen(handle->socket, backlog) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; @@ -378,6 +440,17 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { req->type = UV_ACCEPT; req->accept_socket = INVALID_SOCKET; req->data = handle; + + req->wait_handle = INVALID_HANDLE_VALUE; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } else { + req->event_handle = NULL; + } + uv_tcp_queue_accept(handle, req); } @@ -402,7 +475,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { return -1; } - if (uv_tcp_set_socket(client->loop, client, req->accept_socket) == -1) { + if (uv_tcp_set_socket(client->loop, client, req->accept_socket, 0) == -1) { closesocket(req->accept_socket); rv = -1; } else { @@ -476,19 +549,26 @@ int uv__tcp_connect(uv_connect_t* req, uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; + if (!handle->func_connectex) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ @@ -529,19 +609,26 @@ int uv__tcp_connect6(uv_connect_t* req, uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; + if (!handle->func_connectex) { + if(!uv_get_connectex_function(handle->socket, &handle->func_connectex)) { + uv__set_sys_error(loop, WSAEAFNOSUPPORT); + return -1; + } + } + uv_req_init(loop, (uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); - success = pConnectEx6(handle->socket, - (struct sockaddr*) &address, - addrsize, - NULL, - 0, - &bytes, - &req->overlapped); + success = handle->func_connectex(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { handle->reqs_pending++; @@ -848,3 +935,22 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, DECREASE_PENDING_REQ_COUNT(handle); } + + +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { + SOCKET socket = WSASocketW(AF_INET, + SOCK_STREAM, + IPPROTO_IP, + socket_protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + + if (socket == INVALID_SOCKET) { + uv__set_sys_error(tcp->loop, WSAGetLastError()); + return -1; + } + + tcp->flags |= UV_HANDLE_BOUND; + + return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); +} diff --git a/src/win/util.c b/src/win/util.c index cb2d4438..cc6f93cf 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -25,6 +25,7 @@ #include "uv.h" #include "internal.h" +#include "Tlhelp32.h" int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, @@ -95,11 +96,13 @@ done: return retVal; } + void uv_loadavg(double avg[3]) { /* Can't be implemented */ avg[0] = avg[1] = avg[2] = 0; } + double uv_get_free_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -112,6 +115,7 @@ double uv_get_free_memory(void) { return (double)memory_status.ullAvailPhys; } + double uv_get_total_memory(void) { MEMORYSTATUSEX memory_status; memory_status.dwLength = sizeof(memory_status); @@ -123,3 +127,26 @@ double uv_get_total_memory(void) { return (double)memory_status.ullTotalPhys; } + + +int uv_parent_pid() { + int parent_pid = -1; + HANDLE handle; + PROCESSENTRY32 pe; + int current_pid = GetCurrentProcessId(); + + pe.dwSize = sizeof(PROCESSENTRY32); + handle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); + + if (Process32First(handle, &pe)) { + do { + if (pe.th32ProcessID == current_pid) { + parent_pid = pe.th32ParentProcessID; + break; + } + } while( Process32Next(handle, &pe)); + } + + CloseHandle(handle); + return parent_pid; +} diff --git a/src/win/winsock.c b/src/win/winsock.c index 1f56b3d7..e37a60a9 100644 --- a/src/win/winsock.c +++ b/src/win/winsock.c @@ -25,21 +25,6 @@ #include "../uv-common.h" #include "internal.h" - -/* Winsock extension functions (ipv4) */ -LPFN_CONNECTEX pConnectEx; -LPFN_ACCEPTEX pAcceptEx; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -LPFN_DISCONNECTEX pDisconnectEx; -LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -LPFN_CONNECTEX pConnectEx6; -LPFN_ACCEPTEX pAcceptEx6; -LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -LPFN_DISCONNECTEX pDisconnectEx6; -LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ int uv_allow_ipv6; @@ -74,6 +59,18 @@ static BOOL uv_get_extension_function(SOCKET socket, GUID guid, } +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target) { + const GUID wsaid_acceptex = WSAID_ACCEPTEX; + return uv_get_extension_function(socket, wsaid_acceptex, (void**)target); +} + + +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target) { + const GUID wsaid_connectex = WSAID_CONNECTEX; + return uv_get_extension_function(socket, wsaid_connectex, (void**)target); +} + + void uv_winsock_init() { const GUID wsaid_connectex = WSAID_CONNECTEX; const GUID wsaid_acceptex = WSAID_ACCEPTEX; @@ -83,7 +80,6 @@ void uv_winsock_init() { WSADATA wsa_data; int errorno; - SOCKET dummy; SOCKET dummy6; /* Initialize winsock */ @@ -96,58 +92,10 @@ void uv_winsock_init() { uv_addr_ip4_any_ = uv_ip4_addr("0.0.0.0", 0); uv_addr_ip6_any_ = uv_ip6_addr("::", 0); - /* Retrieve the needed winsock extension function pointers. */ - dummy = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); - if (dummy == INVALID_SOCKET) { - uv_fatal_error(WSAGetLastError(), "socket"); - } - - if (!uv_get_extension_function(dummy, - wsaid_connectex, - (void**)&pConnectEx) || - !uv_get_extension_function(dummy, - wsaid_acceptex, - (void**)&pAcceptEx) || - !uv_get_extension_function(dummy, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs) || - !uv_get_extension_function(dummy, - wsaid_disconnectex, - (void**)&pDisconnectEx) || - !uv_get_extension_function(dummy, - wsaid_transmitfile, - (void**)&pTransmitFile)) { - uv_fatal_error(WSAGetLastError(), - "WSAIoctl(SIO_GET_EXTENSION_FUNCTION_POINTER)"); - } - - if (closesocket(dummy) == SOCKET_ERROR) { - uv_fatal_error(WSAGetLastError(), "closesocket"); - } - - /* optional IPv6 versions of winsock extension functions */ + /* Detect IPV6 support */ dummy6 = socket(AF_INET6, SOCK_STREAM, IPPROTO_IP); if (dummy6 != INVALID_SOCKET) { uv_allow_ipv6 = TRUE; - - if (!uv_get_extension_function(dummy6, - wsaid_connectex, - (void**)&pConnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_acceptex, - (void**)&pAcceptEx6) || - !uv_get_extension_function(dummy6, - wsaid_getacceptexsockaddrs, - (void**)&pGetAcceptExSockAddrs6) || - !uv_get_extension_function(dummy6, - wsaid_disconnectex, - (void**)&pDisconnectEx6) || - !uv_get_extension_function(dummy6, - wsaid_transmitfile, - (void**)&pTransmitFile6)) { - uv_allow_ipv6 = FALSE; - } - if (closesocket(dummy6) == SOCKET_ERROR) { uv_fatal_error(WSAGetLastError(), "closesocket"); } diff --git a/src/win/winsock.h b/src/win/winsock.h index 2c9fb92d..f879cc65 100644 --- a/src/win/winsock.h +++ b/src/win/winsock.h @@ -109,24 +109,12 @@ #define IPV6_V6ONLY 27 #endif - -/* Winsock extension functions (ipv4) */ -extern LPFN_CONNECTEX pConnectEx; -extern LPFN_ACCEPTEX pAcceptEx; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs; -extern LPFN_DISCONNECTEX pDisconnectEx; -extern LPFN_TRANSMITFILE pTransmitFile; - -/* Winsock extension functions (ipv6) */ -extern LPFN_CONNECTEX pConnectEx6; -extern LPFN_ACCEPTEX pAcceptEx6; -extern LPFN_GETACCEPTEXSOCKADDRS pGetAcceptExSockAddrs6; -extern LPFN_DISCONNECTEX pDisconnectEx6; -extern LPFN_TRANSMITFILE pTransmitFile6; - /* Whether ipv6 is supported */ extern int uv_allow_ipv6; +BOOL uv_get_acceptex_function(SOCKET socket, LPFN_ACCEPTEX* target); +BOOL uv_get_connectex_function(SOCKET socket, LPFN_CONNECTEX* target); + /* Ip address used to bind to any port at any interface */ extern struct sockaddr_in uv_addr_ip4_any_; extern struct sockaddr_in6 uv_addr_ip6_any_; diff --git a/test/benchmark-pound.c b/test/benchmark-pound.c index 1f56e27f..af7ce247 100644 --- a/test/benchmark-pound.c +++ b/test/benchmark-pound.c @@ -222,7 +222,7 @@ static void tcp_make_connect(conn_rec* p) { static void pipe_make_connect(conn_rec* p) { int r; - r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream); + r = uv_pipe_init(loop, (uv_pipe_t*)&p->stream, 0); ASSERT(r == 0); r = uv_pipe_connect(&((pipe_conn_rec*)p)->conn_req, (uv_pipe_t*)&p->stream, TEST_PIPENAME, connect_cb); diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index d0b09301..27e8abe0 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -253,7 +253,7 @@ static void maybe_connect_some() { } else { pipe = &pipe_write_handles[max_connect_socket++]; - r = uv_pipe_init(loop, pipe); + r = uv_pipe_init(loop, pipe, 0); ASSERT(r == 0); req = (uv_connect_t*) req_alloc(); @@ -277,7 +277,7 @@ static void connection_cb(uv_stream_t* s, int status) { ASSERT(r == 0); } else { stream = (uv_stream_t*)malloc(sizeof(uv_pipe_t)); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); } @@ -396,7 +396,7 @@ HELPER_IMPL(pipe_pump_server) { /* Server */ server = (uv_stream_t*)&pipeServer; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); ASSERT(r == 0); r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); ASSERT(r == 0); diff --git a/test/benchmark-spawn.c b/test/benchmark-spawn.c index 6e5493d5..d34f42b9 100644 --- a/test/benchmark-spawn.c +++ b/test/benchmark-spawn.c @@ -113,7 +113,7 @@ static void spawn() { options.args = args; options.exit_cb = exit_cb; - uv_pipe_init(loop, &out); + uv_pipe_init(loop, &out, 0); options.stdout_stream = &out; r = uv_spawn(loop, &process, options); diff --git a/test/echo-server.c b/test/echo-server.c index 453ada66..8b175441 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -151,7 +151,7 @@ static void on_connection(uv_stream_t* server, int status) { case PIPE: stream = malloc(sizeof(uv_pipe_t)); ASSERT(stream != NULL); - r = uv_pipe_init(loop, (uv_pipe_t*)stream); + r = uv_pipe_init(loop, (uv_pipe_t*)stream, 0); ASSERT(r == 0); break; @@ -248,7 +248,7 @@ static int pipe_echo_start(char* pipeName) { server = (uv_handle_t*)&pipeServer; serverType = PIPE; - r = uv_pipe_init(loop, &pipeServer); + r = uv_pipe_init(loop, &pipeServer, 0); if (r) { fprintf(stderr, "uv_pipe_init: %s\n", uv_strerror(uv_last_error(loop))); diff --git a/test/run-tests.c b/test/run-tests.c index a1878691..fa7b8b8f 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -22,6 +22,7 @@ #include #include +#include "uv.h" #include "runner.h" #include "task.h" @@ -48,12 +49,115 @@ int main(int argc, char **argv) { } +static uv_pipe_t channel; +static uv_tcp_t tcp_server; +static uv_write_t conn_notify_req; +static int close_cb_called; +static int connection_accepted; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void close_conn_cb(uv_handle_t* handle) { + free(handle); + close_cb_called++; +} + + +void conn_notify_write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*)&tcp_server, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); +} + + +static void ipc_on_connection(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + uv_tcp_t* conn; + + if (!connection_accepted) { + /* + * Accept the connection and close it. Also let the other + * side know. + */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_conn_cb); + + buf = uv_buf_init("accepted_connection\n", 20); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + NULL, conn_notify_write_cb); + ASSERT(r == 0); + + connection_accepted = 1; + } +} + + +static int ipc_helper() { + /* + * This is launched from test-ipc.c. stdin is a duplex channel that we + * over which a handle will be transmitted. In this initial version only + * data is transfered over the channel. XXX edit this comment after handle + * transfer is added. + */ + + uv_write_t write_req; + int r; + uv_buf_t buf; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + uv_pipe_open(&channel, 0); + + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + buf = uv_buf_init("hello\n", 6); + r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)&tcp_server, NULL); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(connection_accepted == 1); + ASSERT(close_cb_called == 3); + + return 0; +} + + static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); return 0; } + if (strcmp(argv[1], "ipc_helper") == 0) { + return ipc_helper(); + } + if (strcmp(argv[1], "spawn_helper1") == 0) { return 1; } diff --git a/test/test-ipc.c b/test/test-ipc.c new file mode 100644 index 00000000..0024cdee --- /dev/null +++ b/test/test-ipc.c @@ -0,0 +1,221 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +static char exepath[1024]; +static size_t exepath_size = 1024; +static char* args[3]; +static uv_pipe_t channel; +static uv_tcp_t tcp_server; + +static int exit_cb_called; +static int read2_cb_called; +static int local_conn_accepted; +static int remote_conn_accepted; +static int tcp_server_listening; + +static uv_write_t write_req; + +typedef struct { + uv_connect_t conn_req; + uv_tcp_t conn; +} tcp_conn; + +#define CONN_COUNT 100 + + +static void close_server_conn_cb(uv_handle_t* handle) { + free(handle); +} + + +static void ipc_on_connection(uv_stream_t* server, int status) { + uv_tcp_t* conn; + int r; + + if (!local_conn_accepted) { + /* Accept the connection and close it. Also and close the server. */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_server_conn_cb); + uv_close((uv_handle_t*)server, NULL); + local_conn_accepted = 1; + } +} + + +static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { + printf("exit_cb\n"); + exit_cb_called++; + ASSERT(exit_status == 0); + uv_close((uv_handle_t*)process, NULL); +} + + +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { + return uv_buf_init(malloc(suggested_size), suggested_size); +} + + +static void close_client_conn_cb(uv_handle_t* handle) { + tcp_conn* p = (tcp_conn*)handle->data; + free(p); +} + + +static void connect_cb(uv_connect_t* req, int status) { + uv_close((uv_handle_t*)req->handle, close_client_conn_cb); +} + + +static void make_many_connections() { + tcp_conn* conn; + struct sockaddr_in addr; + int r, i; + + for (i = 0; i < CONN_COUNT; i++) { + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(uv_default_loop(), &conn->conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_tcp_connect(&conn->conn_req, (uv_tcp_t*)&conn->conn, addr, connect_cb); + ASSERT(r == 0); + + conn->conn.data = conn; + } +} + + +static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { + int r; + uv_buf_t outbuf; + uv_err_t err; + + if (nread == 0) { + /* Everything OK, but nothing read. */ + free(buf.base); + return; + } + + if (nread < 0) { + err = uv_last_error(pipe->loop); + if (err.code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on channel: %s\n", uv_strerror(err)); + abort(); + } + + fprintf(stderr, "got %d bytes\n", (int)nread); + + if (!tcp_server_listening) { + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + tcp_server_listening = 1; + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, nread) == 0); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + /* Create a bunch of connections to get both servers to accept. */ + make_many_connections(); + } else if (memcmp("accepted_connection\n", buf.base, nread) == 0) { + /* Remote server has accepted a connection. Close the channel. */ + ASSERT(pending == UV_UNKNOWN_HANDLE); + remote_conn_accepted = 1; + uv_close((uv_handle_t*)&channel, NULL); + } + + free(buf.base); +} + + +TEST_IMPL(ipc) { + int r; + uv_process_options_t options; + uv_process_t process; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + memset(&options, 0, sizeof(uv_process_options_t)); + + r = uv_exepath(exepath, &exepath_size); + ASSERT(r == 0); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = "ipc_helper"; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; + options.stdin_stream = &channel; + + r = uv_spawn(uv_default_loop(), &process, options); + ASSERT(r == 0); + + uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); + ASSERT(read2_cb_called == 1); + ASSERT(exit_cb_called == 1); + return 0; +} diff --git a/test/test-list.h b/test/test-list.h index 95224ff4..d11257aa 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -20,6 +20,7 @@ */ TEST_DECLARE (tty) +TEST_DECLARE (ipc) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) TEST_DECLARE (tcp_ref) @@ -111,6 +112,7 @@ HELPER_DECLARE (pipe_echo_server) TASK_LIST_START TEST_ENTRY (tty) + TEST_ENTRY (ipc) TEST_ENTRY (tcp_ref) diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index f452fce5..0e59166c 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -204,7 +204,7 @@ static void pipe_pinger_new() { pinger->pongs = 0; /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe); + r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0); pinger->stream.pipe.data = pinger; ASSERT(!r); diff --git a/test/test-pipe-bind-error.c b/test/test-pipe-bind-error.c index 832ce023..3443f19d 100644 --- a/test/test-pipe-bind-error.c +++ b/test/test-pipe-bind-error.c @@ -45,12 +45,12 @@ TEST_IMPL(pipe_bind_error_addrinuse) { uv_pipe_t server1, server2; int r; - r = uv_pipe_init(uv_default_loop(), &server1); + r = uv_pipe_init(uv_default_loop(), &server1, 0); ASSERT(r == 0); r = uv_pipe_bind(&server1, TEST_PIPENAME); ASSERT(r == 0); - r = uv_pipe_init(uv_default_loop(), &server2); + r = uv_pipe_init(uv_default_loop(), &server2, 0); ASSERT(r == 0); r = uv_pipe_bind(&server2, TEST_PIPENAME); ASSERT(r == -1); @@ -79,7 +79,7 @@ TEST_IMPL(pipe_bind_error_addrnotavail) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, BAD_PIPENAME); @@ -100,7 +100,7 @@ TEST_IMPL(pipe_bind_error_inval) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_pipe_bind(&server, TEST_PIPENAME); ASSERT(r == 0); @@ -123,7 +123,7 @@ TEST_IMPL(pipe_listen_without_bind) { uv_pipe_t server; int r; - r = uv_pipe_init(uv_default_loop(), &server); + r = uv_pipe_init(uv_default_loop(), &server, 0); ASSERT(r == 0); r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); ASSERT(r == -1); diff --git a/test/test-spawn.c b/test/test-spawn.c index 653f9ac9..238e6c9c 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -67,7 +67,7 @@ static void kill_cb(uv_process_t* process, int exit_status, int term_signal) { } -uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = output + output_used; buf.len = OUTPUT_SIZE - output_used; @@ -138,7 +138,7 @@ TEST_IMPL(spawn_stdout) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; r = uv_spawn(uv_default_loop(), &process, options); @@ -169,8 +169,8 @@ int r; init_process_options("spawn_helper3", exit_cb); - uv_pipe_init(uv_default_loop(), &out); - uv_pipe_init(uv_default_loop(), &in); + uv_pipe_init(uv_default_loop(), &out, 0); + uv_pipe_init(uv_default_loop(), &in, 0); options.stdout_stream = &out; options.stdin_stream = ∈ @@ -229,7 +229,7 @@ TEST_IMPL(spawn_detect_pipe_name_collisions_on_windows) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; /* Create a pipe that'll cause a collision. */ diff --git a/uv.gyp b/uv.gyp index 1328c12c..a4298377 100644 --- a/uv.gyp +++ b/uv.gyp @@ -117,7 +117,6 @@ 'src/win/pipe.c', 'src/win/process.c', 'src/win/req.c', - 'src/win/stdio.c', 'src/win/stream.c', 'src/win/tcp.c', 'src/win/tty.c', @@ -262,6 +261,7 @@ 'test/test-getsockname.c', 'test/test-hrtime.c', 'test/test-idle.c', + 'test/test-ipc.c', 'test/test-list.h', 'test/test-loop-handles.c', 'test/test-pass-always.c',