diff --git a/Makefile.am b/Makefile.am index 75ec751b..3aa40c61 100644 --- a/Makefile.am +++ b/Makefile.am @@ -158,6 +158,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-pipe-bind-error.c \ test/test-pipe-connect-error.c \ test/test-pipe-getsockname.c \ + test/test-pipe-sendmsg.c \ test/test-pipe-server-close.c \ test/test-platform-output.c \ test/test-poll-close.c \ diff --git a/checksparse.sh b/checksparse.sh index 54cd5805..d2dc3a59 100755 --- a/checksparse.sh +++ b/checksparse.sh @@ -115,6 +115,7 @@ test/test-pass-always.c test/test-ping-pong.c test/test-pipe-bind-error.c test/test-pipe-connect-error.c +test/test-pipe-sendmsg.c test/test-pipe-server-close.c test/test-platform-output.c test/test-poll-close.c diff --git a/include/uv-unix.h b/include/uv-unix.h index eea5a3f5..b1509058 100644 --- a/include/uv-unix.h +++ b/include/uv-unix.h @@ -230,6 +230,7 @@ typedef struct { uv_connection_cb connection_cb; \ int delayed_error; \ int accepted_fd; \ + void* queued_fds; \ UV_STREAM_PRIVATE_PLATFORM_FIELDS \ #define UV_TCP_PRIVATE_FIELDS /* empty */ diff --git a/include/uv-win.h b/include/uv-win.h index db8f8618..c2542801 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -424,10 +424,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_write_t ipc_header_write_req; \ int ipc_pid; \ uint64_t remaining_ipc_rawdata_bytes; \ - unsigned char reserved[sizeof(void*)]; \ struct { \ - WSAPROTOCOL_INFOW* socket_info; \ - int tcp_connection; \ + void* queue[2]; \ + int queue_len; \ } pending_ipc_info; \ uv_write_t* non_overlapped_writes_tail; diff --git a/include/uv.h b/include/uv.h index cf5dca50..fbb57af7 100644 --- a/include/uv.h +++ b/include/uv.h @@ -398,17 +398,6 @@ typedef void (*uv_alloc_cb)(uv_handle_t* handle, typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf); - -/* - * Just like the uv_read_cb except that if the pending parameter is true - * then you can use uv_accept() to pull the new handle into the process. - * If no handle is pending then pending will be UV_UNKNOWN_HANDLE. - */ -typedef void (*uv_read2_cb)(uv_pipe_t* pipe, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending); - typedef void (*uv_write_cb)(uv_write_t* req, int status); typedef void (*uv_connect_cb)(uv_connect_t* req, int status); typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); @@ -611,7 +600,6 @@ UV_EXTERN uv_buf_t uv_buf_init(char* base, unsigned int len); size_t write_queue_size; \ uv_alloc_cb alloc_cb; \ uv_read_cb read_cb; \ - uv_read2_cb read2_cb; \ /* private */ \ UV_STREAM_PRIVATE_FIELDS @@ -660,13 +648,6 @@ UV_EXTERN int uv_read_start(uv_stream_t*, UV_EXTERN int uv_read_stop(uv_stream_t*); -/* - * Extended read methods for receiving handles over a pipe. The pipe must be - * initialized with ipc == 1. - */ -UV_EXTERN int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb); - /* * Write data to stream. Buffers are written in order. Example: @@ -1213,6 +1194,15 @@ UV_EXTERN int uv_pipe_getsockname(const uv_pipe_t* handle, */ UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count); +/* + * Used to receive handles over ipc pipes. + * + * First - call `uv_pipe_pending_count`, if it is > 0 - initialize handle + * using type, returned by `uv_pipe_pending_type` and call + * `uv_accept(pipe, handle)`. + */ +UV_EXTERN int uv_pipe_pending_count(uv_pipe_t* handle); +UV_EXTERN uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle); /* * uv_poll_t is a subclass of uv_handle_t. diff --git a/src/unix/internal.h b/src/unix/internal.h index 4a4656a5..59cae1bb 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -120,6 +120,8 @@ # define O_CLOEXEC 0x00100000 #endif +typedef struct uv__stream_queued_fds_s uv__stream_queued_fds_t; + /* handle flags */ enum { UV_CLOSING = 0x01, /* uv_close() called but not finished. */ @@ -142,6 +144,13 @@ typedef enum { UV_CLOCK_FAST = 1 /* Use the fastest clock with <= 1ms granularity. */ } uv_clocktype_t; +struct uv__stream_queued_fds_s { + unsigned int size; + unsigned int offset; + int fds[1]; +}; + + /* core */ int uv__nonblock(int fd, int set); int uv__close(int fd); @@ -226,6 +235,7 @@ void uv__tcp_close(uv_tcp_t* handle); void uv__timer_close(uv_timer_t* handle); void uv__udp_close(uv_udp_t* handle); void uv__udp_finish_close(uv_udp_t* handle); +uv_handle_type uv__handle_type(int fd); #if defined(__APPLE__) int uv___stream_fd(uv_stream_t* handle); diff --git a/src/unix/pipe.c b/src/unix/pipe.c index 34c118b7..a26c3dbc 100644 --- a/src/unix/pipe.c +++ b/src/unix/pipe.c @@ -246,3 +246,31 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) { void uv_pipe_pending_instances(uv_pipe_t* handle, int count) { } + + +int uv_pipe_pending_count(uv_pipe_t* handle) { + uv__stream_queued_fds_t* queued_fds; + + if (!handle->ipc) + return 0; + + if (handle->accepted_fd == -1) + return 0; + + if (handle->queued_fds == NULL) + return 1; + + queued_fds = handle->queued_fds; + return queued_fds->offset + 1; +} + + +uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { + if (!handle->ipc) + return UV_UNKNOWN_HANDLE; + + if (handle->accepted_fd == -1) + return UV_UNKNOWN_HANDLE; + else + return uv__handle_type(handle->accepted_fd); +} diff --git a/src/unix/stream.c b/src/unix/stream.c index ad6856b4..d79b1109 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -112,13 +112,13 @@ void uv__stream_init(uv_loop_t* loop, uv__handle_init(loop, (uv_handle_t*)stream, type); stream->read_cb = NULL; - stream->read2_cb = NULL; stream->alloc_cb = NULL; stream->close_cb = NULL; stream->connection_cb = NULL; stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; + stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); @@ -570,6 +570,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (server->accepted_fd == -1) return -EAGAIN; + err = 0; switch (client->type) { case UV_NAMED_PIPE: case UV_TCP: @@ -579,8 +580,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (err) { /* TODO handle error */ uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -588,8 +588,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -597,9 +596,33 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { assert(0); } - uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); - server->accepted_fd = -1; - return 0; +done: + /* Process queued fds */ + if (server->queued_fds != NULL) { + uv__stream_queued_fds_t* queued_fds; + + queued_fds = server->queued_fds; + + /* Read first */ + server->accepted_fd = queued_fds->fds[0]; + + /* All read, free */ + assert(queued_fds->offset > 0); + if (--queued_fds->offset == 0) { + free(queued_fds); + server->queued_fds = NULL; + } else { + /* Shift rest */ + memmove(queued_fds->fds, + queued_fds->fds + 1, + queued_fds->offset * sizeof(*queued_fds->fds)); + } + } else { + server->accepted_fd = -1; + if (err == 0) + uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); + } + return err; } @@ -777,12 +800,12 @@ start: msg.msg_flags = 0; msg.msg_control = (void*) scratch; - msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send)); + msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send)); cmsg = CMSG_FIRSTHDR(&msg); cmsg->cmsg_level = SOL_SOCKET; cmsg->cmsg_type = SCM_RIGHTS; - cmsg->cmsg_len = msg.msg_controllen; + cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send)); /* silence aliasing warning */ { @@ -913,7 +936,7 @@ static void uv__write_callbacks(uv_stream_t* stream) { } -static uv_handle_type uv__handle_type(int fd) { +uv_handle_type uv__handle_type(int fd) { struct sockaddr_storage ss; socklen_t len; int type; @@ -947,24 +970,106 @@ static uv_handle_type uv__handle_type(int fd) { } -static void uv__stream_read_cb(uv_stream_t* stream, - int status, - const uv_buf_t* buf, - uv_handle_type type) { - if (stream->read_cb != NULL) - stream->read_cb(stream, status, buf); - else - stream->read2_cb((uv_pipe_t*) stream, status, buf, type); -} - - static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { stream->flags |= UV_STREAM_READ_EOF; uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) uv__handle_stop(stream); uv__stream_osx_interrupt_select(stream); - uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, UV_EOF, buf); +} + + +static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { + uv__stream_queued_fds_t* queued_fds; + unsigned int queue_size; + + queued_fds = stream->queued_fds; + if (queued_fds == NULL) { + queue_size = 8; + queued_fds = malloc((queue_size - 1) * sizeof(*queued_fds->fds) + + sizeof(*queued_fds)); + if (queued_fds == NULL) + return -ENOMEM; + queued_fds->size = queue_size; + queued_fds->offset = 0; + stream->queued_fds = queued_fds; + + /* Grow */ + } else if (queued_fds->size == queued_fds->offset) { + queue_size = queued_fds->size + 8; + queued_fds = realloc(queued_fds, + (queue_size - 1) * sizeof(*queued_fds->fds) + + sizeof(*queued_fds)); + + /* + * Allocation failure, report back. + * NOTE: if it is fatal - sockets will be closed in uv__stream_close + */ + if (queued_fds == NULL) + return -ENOMEM; + queued_fds->size = queue_size; + stream->queued_fds = queued_fds; + } + + /* Put fd in a queue */ + queued_fds->fds[queued_fds->offset++] = fd; + + return 0; +} + + +#define UV__CMSG_FD_COUNT 64 +#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int)) + + +static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) { + struct cmsghdr* cmsg; + + for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) { + char* start; + char* end; + int err; + void* pv; + int* pi; + unsigned int i; + unsigned int count; + + if (cmsg->cmsg_type != SCM_RIGHTS) { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + continue; + } + + /* silence aliasing warning */ + pv = CMSG_DATA(cmsg); + pi = pv; + + /* Count available fds */ + start = (char*) cmsg; + end = (char*) cmsg + cmsg->cmsg_len; + count = 0; + while (start + CMSG_LEN(count * sizeof(*pi)) < end) + count++; + assert(start + CMSG_LEN(count * sizeof(*pi)) == end); + + for (i = 0; i < count; i++) { + /* Already has accepted fd, queue now */ + if (stream->accepted_fd != -1) { + err = uv__stream_queue_fd(stream, pi[i]); + if (err != 0) { + /* Close rest */ + for (; i < count; i++) + uv__close(pi[i]); + return err; + } + } else { + stream->accepted_fd = pi[i]; + } + } + } + + return 0; } @@ -972,9 +1077,10 @@ static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; struct msghdr msg; - struct cmsghdr* cmsg; - char cmsg_space[64]; + char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)]; int count; + int err; + int is_ipc; stream->flags &= ~UV_STREAM_READ_PARTIAL; @@ -983,10 +1089,12 @@ static void uv__read(uv_stream_t* stream) { */ count = 32; + is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc; + /* XXX: Maybe instead of having UV_STREAM_READING we just test if * tcp->read_cb is NULL or not? */ - while ((stream->read_cb || stream->read2_cb) + while (stream->read_cb && (stream->flags & UV_STREAM_READING) && (count-- > 0)) { assert(stream->alloc_cb != NULL); @@ -994,29 +1102,28 @@ static void uv__read(uv_stream_t* stream) { stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf); if (buf.len == 0) { /* User indicates it can't or won't handle the read. */ - uv__stream_read_cb(stream, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, UV_ENOBUFS, &buf); return; } assert(buf.base != NULL); assert(uv__stream_fd(stream) >= 0); - if (stream->read_cb) { + if (!is_ipc) { do { nread = read(uv__stream_fd(stream), buf.base, buf.len); } while (nread < 0 && errno == EINTR); } else { - assert(stream->read2_cb); - /* read2_cb uses recvmsg */ + /* ipc uses recvmsg */ msg.msg_flags = 0; msg.msg_iov = (struct iovec*) &buf; msg.msg_iovlen = 1; msg.msg_name = NULL; msg.msg_namelen = 0; /* Set up to receive a descriptor even if one isn't in the message */ - msg.msg_controllen = 64; - msg.msg_control = (void*) cmsg_space; + msg.msg_controllen = sizeof(cmsg_space); + msg.msg_control = cmsg_space; do { nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0); @@ -1032,10 +1139,10 @@ static void uv__read(uv_stream_t* stream) { uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); uv__stream_osx_interrupt_select(stream); } - uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, 0, &buf); } else { /* Error. User should call uv_close(). */ - uv__stream_read_cb(stream, -errno, &buf, UV_UNKNOWN_HANDLE); + stream->read_cb(stream, -errno, &buf); assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) && "stream->read_cb(status=-1) did not call uv_close()"); } @@ -1047,50 +1154,14 @@ static void uv__read(uv_stream_t* stream) { /* Successful read */ ssize_t buflen = buf.len; - if (stream->read_cb) { - stream->read_cb(stream, nread, &buf); - } else { - assert(stream->read2_cb); - - /* - * XXX: Some implementations can send multiple file descriptors in a - * single message. We should be using CMSG_NXTHDR() to walk the - * chain to get at them all. This would require changing the API to - * hand these back up the caller, is a pain. - */ - - for (cmsg = CMSG_FIRSTHDR(&msg); - msg.msg_controllen > 0 && cmsg != NULL; - cmsg = CMSG_NXTHDR(&msg, cmsg)) { - - if (cmsg->cmsg_type == SCM_RIGHTS) { - if (stream->accepted_fd != -1) { - fprintf(stderr, "(libuv) ignoring extra FD received\n"); - } - - /* silence aliasing warning */ - { - void* pv = CMSG_DATA(cmsg); - int* pi = pv; - stream->accepted_fd = *pi; - } - - } else { - fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", - cmsg->cmsg_type); - } - } - - - if (stream->accepted_fd >= 0) { - stream->read2_cb((uv_pipe_t*) stream, - nread, - &buf, - uv__handle_type(stream->accepted_fd)); - } else { - stream->read2_cb((uv_pipe_t*) stream, nread, &buf, UV_UNKNOWN_HANDLE); + if (is_ipc) { + err = uv__stream_recv_cmsg(stream, &msg); + if (err != 0) { + stream->read_cb(stream, err, NULL); + return; } } + stream->read_cb(stream, nread, &buf); /* Return if we didn't fill the buffer, there is no more data to read. */ if (nread < buflen) { @@ -1102,6 +1173,10 @@ static void uv__read(uv_stream_t* stream) { } +#undef UV__CMSG_FD_COUNT +#undef UV__CMSG_FD_SIZE + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) && "uv_shutdown (unix) only supports uv_handle_t right now"); @@ -1371,10 +1446,9 @@ int uv_try_write(uv_stream_t* stream, } -static int uv__read_start_common(uv_stream_t* stream, - uv_alloc_cb alloc_cb, - uv_read_cb read_cb, - uv_read2_cb read2_cb) { +int uv_read_start(uv_stream_t* stream, + uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -1394,7 +1468,6 @@ static int uv__read_start_common(uv_stream_t* stream, assert(alloc_cb); stream->read_cb = read_cb; - stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); @@ -1405,18 +1478,6 @@ static int uv__read_start_common(uv_stream_t* stream, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { - return uv__read_start_common(stream, alloc_cb, read_cb, NULL); -} - - -int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb) { - return uv__read_start_common(stream, alloc_cb, NULL, read_cb); -} - - int uv_read_stop(uv_stream_t* stream) { /* Sanity check. We're going to stop the handle unless it's primed for * writing but that means there should be some kind of write action in @@ -1435,7 +1496,6 @@ int uv_read_stop(uv_stream_t* stream) { uv__stream_osx_interrupt_select(stream); stream->read_cb = NULL; - stream->read2_cb = NULL; stream->alloc_cb = NULL; return 0; } @@ -1469,6 +1529,9 @@ int uv___stream_fd(uv_stream_t* handle) { void uv__stream_close(uv_stream_t* handle) { + unsigned int i; + uv__stream_queued_fds_t* queued_fds; + #if defined(__APPLE__) /* Terminate select loop first */ if (handle->select != NULL) { @@ -1506,6 +1569,15 @@ void uv__stream_close(uv_stream_t* handle) { handle->accepted_fd = -1; } + /* Close all queued fds */ + if (handle->queued_fds != NULL) { + queued_fds = handle->queued_fds; + for (i = 0; i < queued_fds->offset; i++) + uv__close(queued_fds->fds[i]); + free(handle->queued_fds); + handle->queued_fds = NULL; + } + assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT)); } diff --git a/src/win/internal.h b/src/win/internal.h index cf6c8584..592ee15d 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -166,8 +166,6 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); -int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb); int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, diff --git a/src/win/pipe.c b/src/win/pipe.c index 69c5cde6..b25050b5 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -31,6 +31,17 @@ #include "stream-inl.h" #include "req-inl.h" +typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t; + +struct uv__ipc_queue_item_s { + /* + * NOTE: It is important for socket_info to be the first field, + * because we will we assigning it to the pending_ipc_info.socket_info + */ + WSAPROTOCOL_INFOW socket_info; + QUEUE member; + int tcp_connection; +}; /* A zero-size buffer for use by uv_pipe_read */ static char uv_zero_[] = ""; @@ -86,8 +97,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->name = NULL; handle->ipc_pid = 0; handle->remaining_ipc_rawdata_bytes = 0; - handle->pending_ipc_info.socket_info = NULL; - handle->pending_ipc_info.tcp_connection = 0; + QUEUE_INIT(&handle->pending_ipc_info.queue); + handle->pending_ipc_info.queue_len = 0; handle->ipc = ipc; handle->non_overlapped_writes_tail = NULL; @@ -287,6 +298,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { NTSTATUS nt_status; IO_STATUS_BLOCK io_status; FILE_PIPE_LOCAL_INFORMATION pipe_info; + uv__ipc_queue_item_t* item; if ((handle->flags & UV_HANDLE_CONNECTION) && handle->shutdown_req != NULL && @@ -362,10 +374,28 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); if (handle->flags & UV_HANDLE_CONNECTION) { - if (handle->pending_ipc_info.socket_info) { - free(handle->pending_ipc_info.socket_info); - handle->pending_ipc_info.socket_info = NULL; + /* Free pending sockets */ + while (!QUEUE_EMPTY(&handle->pending_ipc_info.queue)) { + QUEUE* q; + SOCKET socket; + + q = QUEUE_HEAD(&handle->pending_ipc_info.queue); + QUEUE_REMOVE(q); + item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); + + /* Materialize socket and close it */ + socket = WSASocketW(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + &item->socket_info, + 0, + WSA_FLAG_OVERLAPPED); + free(item); + + if (socket != INVALID_SOCKET) + closesocket(socket); } + handle->pending_ipc_info.queue_len = 0; if (handle->flags & UV_HANDLE_EMULATE_IOCP) { if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { @@ -720,15 +750,29 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_loop_t* loop = server->loop; uv_pipe_t* pipe_client; uv_pipe_accept_t* req; + QUEUE* q; + uv__ipc_queue_item_t* item; + int err; if (server->ipc) { - if (!server->pending_ipc_info.socket_info) { + if (QUEUE_EMPTY(&server->pending_ipc_info.queue)) { /* No valid pending sockets. */ return WSAEWOULDBLOCK; } - return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info, - server->pending_ipc_info.tcp_connection); + q = QUEUE_HEAD(&server->pending_ipc_info.queue); + QUEUE_REMOVE(q); + server->pending_ipc_info.queue_len--; + item = QUEUE_DATA(q, uv__ipc_queue_item_t, member); + + err = uv_tcp_import((uv_tcp_t*)client, + &item->socket_info, + item->tcp_connection); + if (err != 0) + return err; + + free(item); + } else { pipe_client = (uv_pipe_t*)client; @@ -956,14 +1000,14 @@ error: } -static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb, uv_read2_cb read2_cb) { +int uv_pipe_read_start(uv_pipe_t* handle, + uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { uv_loop_t* loop = handle->loop; handle->flags |= UV_HANDLE_READING; INCREASE_ACTIVE_COUNT(loop, handle); handle->read_cb = read_cb; - handle->read2_cb = read2_cb; handle->alloc_cb = alloc_cb; /* If reading was stopped and then started again, there could still be a */ @@ -975,18 +1019,6 @@ static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } -int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { - return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL); -} - - -int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb) { - return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb); -} - - static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, uv_write_t* req) { req->next_req = NULL; @@ -1314,11 +1346,7 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle, handle->flags &= ~UV_HANDLE_READABLE; uv_read_stop((uv_stream_t*) handle); - if (handle->read2_cb) { - handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE); - } else { - handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_); - } + handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_); } @@ -1330,14 +1358,7 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_read_stop((uv_stream_t*) handle); - if (handle->read2_cb) { - handle->read2_cb(handle, - uv_translate_sys_error(error), - &buf, - UV_UNKNOWN_HANDLE); - } else { - handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf); - } + handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf); } @@ -1351,6 +1372,23 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, } +void uv__pipe_insert_pending_socket(uv_pipe_t* handle, + WSAPROTOCOL_INFOW* info, + int tcp_connection) { + WSAPROTOCOL_INFOW* socket_info; + uv__ipc_queue_item_t* item; + + item = (uv__ipc_queue_item_t*) malloc(sizeof(*item)); + if (item == NULL) + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + + memcpy(&item->socket_info, info, sizeof(item->socket_info)); + item->tcp_connection = tcp_connection; + QUEUE_INSERT_TAIL(&handle->pending_ipc_info.queue, &item->member); + handle->pending_ipc_info.queue_len++; +} + + void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, avail; @@ -1426,16 +1464,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); /* Store the pending socket info. */ - assert(!handle->pending_ipc_info.socket_info); - handle->pending_ipc_info.socket_info = - (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info))); - if (!handle->pending_ipc_info.socket_info) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info; - handle->pending_ipc_info.tcp_connection = - ipc_frame.header.flags & UV_IPC_TCP_CONNECTION; + uv__pipe_insert_pending_socket( + handle, + &ipc_frame.socket_info, + ipc_frame.header.flags & UV_IPC_TCP_CONNECTION); } if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { @@ -1450,11 +1482,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, handle->alloc_cb((uv_handle_t*) handle, avail, &buf); if (buf.len == 0) { - if (handle->read2_cb) { - handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE); - } else if (handle->read_cb) { - handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); - } + handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf); break; } assert(buf.base != NULL); @@ -1469,20 +1497,8 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, assert(handle->remaining_ipc_rawdata_bytes >= bytes); handle->remaining_ipc_rawdata_bytes = handle->remaining_ipc_rawdata_bytes - bytes; - if (handle->read2_cb) { - handle->read2_cb(handle, bytes, &buf, - handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); - } else if (handle->read_cb) { - handle->read_cb((uv_stream_t*)handle, bytes, &buf); - } - - if (handle->pending_ipc_info.socket_info) { - free(handle->pending_ipc_info.socket_info); - handle->pending_ipc_info.socket_info = NULL; - } - } else { - handle->read_cb((uv_stream_t*)handle, bytes, &buf); } + handle->read_cb((uv_stream_t*)handle, bytes, &buf); /* Read again only if bytes == buf.len */ if (bytes <= buf.len) { @@ -1859,3 +1875,20 @@ error: free(name_info); return err; } + + +int uv_pipe_pending_count(uv_pipe_t* handle) { + if (!handle->ipc) + return 0; + return handle->pending_ipc_info.queue_len; +} + + +uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) { + if (!handle->ipc) + return UV_UNKNOWN_HANDLE; + if (handle->pending_ipc_info.queue_len == 0) + return UV_UNKNOWN_HANDLE; + else + return UV_TCP; +} diff --git a/src/win/stream.c b/src/win/stream.c index 0abca3ad..6553ab11 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -96,31 +96,6 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, } -int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, - uv_read2_cb read_cb) { - int err; - - if (handle->flags & UV_HANDLE_READING) { - return UV_EALREADY; - } - - if (!(handle->flags & UV_HANDLE_READABLE)) { - return UV_ENOTCONN; - } - - err = ERROR_INVALID_PARAMETER; - switch (handle->type) { - case UV_NAMED_PIPE: - err = uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb); - break; - default: - assert(0); - } - - return uv_translate_sys_error(err); -} - - int uv_read_stop(uv_stream_t* handle) { int err; diff --git a/src/win/tcp.c b/src/win/tcp.c index f77db3b9..d3df3ae3 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -1099,9 +1099,9 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info, int tcp_connection) { int err; - SOCKET socket = WSASocketW(AF_INET, - SOCK_STREAM, - IPPROTO_IP, + SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, + FROM_PROTOCOL_INFO, socket_protocol_info, 0, WSA_FLAG_OVERLAPPED); diff --git a/test/benchmark-multi-accept.c b/test/benchmark-multi-accept.c index b1b0c1e3..da0c76df 100644 --- a/test/benchmark-multi-accept.c +++ b/test/benchmark-multi-accept.c @@ -83,10 +83,9 @@ static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status); static void ipc_write_cb(uv_write_t* req, int status); static void ipc_close_cb(uv_handle_t* handle); static void ipc_connect_cb(uv_connect_t* req, int status); -static void ipc_read2_cb(uv_pipe_t* ipc_pipe, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type type); +static void ipc_read_cb(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf); static void ipc_alloc_cb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); @@ -155,9 +154,9 @@ static void ipc_connect_cb(uv_connect_t* req, int status) { struct ipc_client_ctx* ctx; ctx = container_of(req, struct ipc_client_ctx, connect_req); ASSERT(0 == status); - ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe, - ipc_alloc_cb, - ipc_read2_cb)); + ASSERT(0 == uv_read_start((uv_stream_t*) &ctx->ipc_pipe, + ipc_alloc_cb, + ipc_read_cb)); } @@ -171,16 +170,20 @@ static void ipc_alloc_cb(uv_handle_t* handle, } -static void ipc_read2_cb(uv_pipe_t* ipc_pipe, - ssize_t nread, - const uv_buf_t* buf, - uv_handle_type type) { +static void ipc_read_cb(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf) { struct ipc_client_ctx* ctx; uv_loop_t* loop; + uv_handle_type type; + uv_pipe_t* ipc_pipe; + ipc_pipe = (uv_pipe_t*) handle; ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe); loop = ipc_pipe->loop; + ASSERT(1 == uv_pipe_pending_count(ipc_pipe)); + type = uv_pipe_pending_type(ipc_pipe); if (type == UV_TCP) ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle)); else if (type == UV_NAMED_PIPE) @@ -188,7 +191,7 @@ static void ipc_read2_cb(uv_pipe_t* ipc_pipe, else ASSERT(0); - ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle)); + ASSERT(0 == uv_accept(handle, ctx->server_handle)); uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL); } diff --git a/test/test-ipc-send-recv.c b/test/test-ipc-send-recv.c index b2b5aa0e..d9b91333 100644 --- a/test/test-ipc-send-recv.c +++ b/test/test-ipc-send-recv.c @@ -60,15 +60,20 @@ static void alloc_cb(uv_handle_t* handle, } -static void recv_cb(uv_pipe_t* handle, +static void recv_cb(uv_stream_t* handle, ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { + const uv_buf_t* buf) { + uv_handle_type pending; + uv_pipe_t* pipe; int r; - ASSERT(pending == ctx.expected_type); - ASSERT(handle == &ctx.channel); + pipe = (uv_pipe_t*) handle; + ASSERT(pipe == &ctx.channel); ASSERT(nread >= 0); + ASSERT(1 == uv_pipe_pending_count(pipe)); + + pending = uv_pipe_pending_type(pipe); + ASSERT(pending == ctx.expected_type); if (pending == UV_NAMED_PIPE) r = uv_pipe_init(ctx.channel.loop, &ctx.recv.pipe, 0); @@ -78,7 +83,7 @@ static void recv_cb(uv_pipe_t* handle, abort(); ASSERT(r == 0); - r = uv_accept((uv_stream_t*)&ctx.channel, &ctx.recv.stream); + r = uv_accept(handle, &ctx.recv.stream); ASSERT(r == 0); uv_close((uv_handle_t*)&ctx.channel, NULL); @@ -103,7 +108,7 @@ static int run_test(void) { NULL); ASSERT(r == 0); - r = uv_read2_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb); + r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb); ASSERT(r == 0); r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); @@ -165,16 +170,21 @@ static void write2_cb(uv_write_t* req, int status) { } -static void read2_cb(uv_pipe_t* handle, - ssize_t nread, - const uv_buf_t* rdbuf, - uv_handle_type pending) { +static void read_cb(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* rdbuf) { uv_buf_t wrbuf; + uv_pipe_t* pipe; + uv_handle_type pending; int r; - ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP); - ASSERT(handle == &ctx.channel); + pipe = (uv_pipe_t*) handle; + ASSERT(pipe == &ctx.channel); ASSERT(nread >= 0); + ASSERT(1 == uv_pipe_pending_count(pipe)); + + pending = uv_pipe_pending_type(pipe); + ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP); wrbuf = uv_buf_init(".", 1); @@ -186,7 +196,7 @@ static void read2_cb(uv_pipe_t* handle, abort(); ASSERT(r == 0); - r = uv_accept((uv_stream_t*)handle, &ctx.recv.stream); + r = uv_accept(handle, &ctx.recv.stream); ASSERT(r == 0); r = uv_write2(&ctx.write_req, @@ -215,7 +225,7 @@ int ipc_send_recv_helper(void) { ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx.channel)); ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx.channel)); - r = uv_read2_start((uv_stream_t*)&ctx.channel, alloc_cb, read2_cb); + r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb); ASSERT(r == 0); r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); diff --git a/test/test-ipc.c b/test/test-ipc.c index cc44d32e..61b649b6 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -30,7 +30,7 @@ static uv_tcp_t tcp_server; static uv_tcp_t tcp_connection; static int exit_cb_called; -static int read2_cb_called; +static int read_cb_called; static int tcp_write_cb_called; static int tcp_read_cb_called; static int on_pipe_read_called; @@ -138,13 +138,16 @@ static void make_many_connections(void) { } -static void on_read(uv_pipe_t* pipe, +static void on_read(uv_stream_t* handle, ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { + const uv_buf_t* buf) { int r; + uv_pipe_t* pipe; + uv_handle_type pending; uv_buf_t outbuf; + pipe = (uv_pipe_t*) handle; + if (nread == 0) { /* Everything OK, but nothing read. */ free(buf->base); @@ -163,9 +166,11 @@ static void on_read(uv_pipe_t* pipe, fprintf(stderr, "got %d bytes\n", (int)nread); + pending = uv_pipe_pending_type(pipe); if (!tcp_server_listening) { + ASSERT(1 == uv_pipe_pending_count(pipe)); ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE); - read2_cb_called++; + read_cb_called++; /* Accept the pending TCP server, and start listening on it. */ ASSERT(pending == UV_TCP); @@ -191,6 +196,7 @@ static void on_read(uv_pipe_t* pipe, make_many_connections(); } else if (memcmp("accepted_connection\n", buf->base, nread) == 0) { /* Remote server has accepted a connection. Close the channel. */ + ASSERT(0 == uv_pipe_pending_count(pipe)); ASSERT(pending == UV_UNKNOWN_HANDLE); remote_conn_accepted = 1; uv_close((uv_handle_t*)&channel, NULL); @@ -267,13 +273,15 @@ static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { } -static void on_read_connection(uv_pipe_t* pipe, +static void on_read_connection(uv_stream_t* handle, ssize_t nread, - const uv_buf_t* buf, - uv_handle_type pending) { + const uv_buf_t* buf) { int r; uv_buf_t outbuf; + uv_pipe_t* pipe; + uv_handle_type pending; + pipe = (uv_pipe_t*) handle; if (nread == 0) { /* Everything OK, but nothing read. */ free(buf->base); @@ -292,15 +300,18 @@ static void on_read_connection(uv_pipe_t* pipe, fprintf(stderr, "got %d bytes\n", (int)nread); + ASSERT(1 == uv_pipe_pending_count(pipe)); + pending = uv_pipe_pending_type(pipe); + ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE); - read2_cb_called++; + read_cb_called++; /* Accept the pending TCP connection */ ASSERT(pending == UV_TCP); r = uv_tcp_init(uv_default_loop(), &tcp_connection); ASSERT(r == 0); - r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection); + r = uv_accept(handle, (uv_stream_t*)&tcp_connection); ASSERT(r == 0); /* Make sure that the expected data is correctly multiplexed. */ @@ -319,12 +330,12 @@ static void on_read_connection(uv_pipe_t* pipe, } -static int run_ipc_test(const char* helper, uv_read2_cb read_cb) { +static int run_ipc_test(const char* helper, uv_read_cb read_cb) { uv_process_t process; int r; spawn_helper(&channel, &process, helper); - uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb); + uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb); r = uv_run(uv_default_loop(), UV_RUN_DEFAULT); ASSERT(r == 0); @@ -338,7 +349,7 @@ TEST_IMPL(ipc_listen_before_write) { int r = run_ipc_test("ipc_helper_listen_before_write", on_read); ASSERT(local_conn_accepted == 1); ASSERT(remote_conn_accepted == 1); - ASSERT(read2_cb_called == 1); + ASSERT(read_cb_called == 1); ASSERT(exit_cb_called == 1); return r; } @@ -348,7 +359,7 @@ TEST_IMPL(ipc_listen_after_write) { int r = run_ipc_test("ipc_helper_listen_after_write", on_read); ASSERT(local_conn_accepted == 1); ASSERT(remote_conn_accepted == 1); - ASSERT(read2_cb_called == 1); + ASSERT(read_cb_called == 1); ASSERT(exit_cb_called == 1); return r; } @@ -356,7 +367,7 @@ TEST_IMPL(ipc_listen_after_write) { TEST_IMPL(ipc_tcp_connection) { int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection); - ASSERT(read2_cb_called == 1); + ASSERT(read_cb_called == 1); ASSERT(tcp_write_cb_called == 1); ASSERT(tcp_read_cb_called == 1); ASSERT(exit_cb_called == 1); diff --git a/test/test-list.h b/test/test-list.h index 0252acc3..345df511 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -96,6 +96,7 @@ TEST_DECLARE (pipe_connect_bad_name) TEST_DECLARE (pipe_connect_to_file) TEST_DECLARE (pipe_getsockname) TEST_DECLARE (pipe_getsockname_abstract) +TEST_DECLARE (pipe_sendmsg) TEST_DECLARE (pipe_server_close) TEST_DECLARE (connection_fail) TEST_DECLARE (connection_fail_doesnt_auto_close) @@ -362,6 +363,7 @@ TASK_LIST_START TEST_ENTRY (pipe_listen_without_bind) TEST_ENTRY (pipe_getsockname) TEST_ENTRY (pipe_getsockname_abstract) + TEST_ENTRY (pipe_sendmsg) TEST_ENTRY (connection_fail) TEST_ENTRY (connection_fail_doesnt_auto_close) diff --git a/test/test-pipe-sendmsg.c b/test/test-pipe-sendmsg.c new file mode 100644 index 00000000..f6d893b4 --- /dev/null +++ b/test/test-pipe-sendmsg.c @@ -0,0 +1,169 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + + +#ifndef _WIN32 + +#include +#include +#include +#include +#include +#include +#include + + +/* NOTE: size should be divisible by 2 */ +static uv_pipe_t incoming[4]; +static unsigned int incoming_count; +static unsigned int close_called; + + +static void set_nonblocking(uv_os_sock_t sock) { + int r; +#ifdef _WIN32 + unsigned long on = 1; + r = ioctlsocket(sock, FIONBIO, &on); + ASSERT(r == 0); +#else + int flags = fcntl(sock, F_GETFL, 0); + ASSERT(flags >= 0); + r = fcntl(sock, F_SETFL, flags | O_NONBLOCK); + ASSERT(r >= 0); +#endif +} + + + + +static void close_cb(uv_handle_t* handle) { + close_called++; +} + + +static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { + static char base[1]; + + buf->base = base; + buf->len = sizeof(base); +} + + +static void read_cb(uv_stream_t* handle, + ssize_t nread, + const uv_buf_t* buf) { + uv_pipe_t* p; + uv_pipe_t* inc; + uv_handle_type pending; + unsigned int i; + + p = (uv_pipe_t*) handle; + ASSERT(nread >= 0); + + while (uv_pipe_pending_count(p) != 0) { + pending = uv_pipe_pending_type(p); + ASSERT(pending == UV_NAMED_PIPE); + + ASSERT(incoming_count < ARRAY_SIZE(incoming)); + inc = &incoming[incoming_count++]; + ASSERT(0 == uv_pipe_init(p->loop, inc, 0)); + ASSERT(0 == uv_accept(handle, (uv_stream_t*) inc)); + } + + if (incoming_count != ARRAY_SIZE(incoming)) + return; + + ASSERT(0 == uv_read_stop((uv_stream_t*) p)); + uv_close((uv_handle_t*) p, close_cb); + for (i = 0; i < ARRAY_SIZE(incoming); i++) + uv_close((uv_handle_t*) &incoming[i], close_cb); +} + + +TEST_IMPL(pipe_sendmsg) { + uv_pipe_t p; + int r; + int fds[2]; + int send_fds[ARRAY_SIZE(incoming)]; + struct msghdr msg; + char scratch[64]; + struct cmsghdr *cmsg; + unsigned int i; + uv_buf_t buf; + + ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, fds)); + for (i = 0; i < ARRAY_SIZE(send_fds); i += 2) + ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, send_fds + i)); + ASSERT(i == ARRAY_SIZE(send_fds)); + ASSERT(0 == uv_pipe_init(uv_default_loop(), &p, 1)); + ASSERT(0 == uv_pipe_open(&p, fds[1])); + + buf = uv_buf_init("X", 1); + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = (struct iovec*) &buf; + msg.msg_iovlen = 1; + msg.msg_flags = 0; + + msg.msg_control = (void*) scratch; + msg.msg_controllen = CMSG_LEN(sizeof(send_fds)); + ASSERT(sizeof(scratch) >= msg.msg_controllen); + + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = msg.msg_controllen; + + /* silence aliasing warning */ + { + void* pv = CMSG_DATA(cmsg); + int* pi = pv; + for (i = 0; i < ARRAY_SIZE(send_fds); i++) + pi[i] = send_fds[i]; + } + + set_nonblocking(fds[1]); + ASSERT(0 == uv_read_start((uv_stream_t*) &p, alloc_cb, read_cb)); + + do + r = sendmsg(fds[0], &msg, 0); + while (r == -1 && errno == EINTR); + ASSERT(r == 1); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + ASSERT(ARRAY_SIZE(incoming) == incoming_count); + ASSERT(ARRAY_SIZE(incoming) + 1 == close_called); + close(fds[0]); + + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#else /* !_WIN32 */ + +TEST_IMPL(pipe_sendmsg) { + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#endif /* _WIN32 */ diff --git a/uv.gyp b/uv.gyp index d84755aa..ddf303e4 100644 --- a/uv.gyp +++ b/uv.gyp @@ -338,6 +338,7 @@ 'test/test-pipe-bind-error.c', 'test/test-pipe-connect-error.c', 'test/test-pipe-getsockname.c', + 'test/test-pipe-sendmsg.c', 'test/test-pipe-server-close.c', 'test/test-platform-output.c', 'test/test-poll.c',