diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 31336351..812b2741 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -318,7 +318,10 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_write_t ipc_header_write_req; \ int ipc_pid; \ uint64_t remaining_ipc_rawdata_bytes; \ - WSAPROTOCOL_INFOW* pending_socket_info; \ + struct { \ + WSAPROTOCOL_INFOW* socket_info; \ + int tcp_connection; \ + } pending_ipc_info; \ uv_write_t* non_overlapped_writes_tail; #define UV_PIPE_PRIVATE_FIELDS \ diff --git a/src/win/handle.c b/src/win/handle.c index a1ff7275..2e053b00 100644 --- a/src/win/handle.c +++ b/src/win/handle.c @@ -71,7 +71,6 @@ int uv_is_active(uv_handle_t* handle) { void uv_close(uv_handle_t* handle, uv_close_cb cb) { - uv_tcp_t* tcp; uv_pipe_t* pipe; uv_udp_t* udp; uv_process_t* process; @@ -88,19 +87,7 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) { /* Handle-specific close actions */ switch (handle->type) { case UV_TCP: - tcp = (uv_tcp_t*)handle; - /* If we don't shutdown before calling closesocket, windows will */ - /* silently discard the kernel send buffer and reset the connection. */ - if ((tcp->flags & UV_HANDLE_CONNECTION) && - !(tcp->flags & UV_HANDLE_SHUT)) { - shutdown(tcp->socket, SD_SEND); - tcp->flags |= UV_HANDLE_SHUTTING | UV_HANDLE_SHUT; - } - tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); - closesocket(tcp->socket); - if (tcp->reqs_pending == 0) { - uv_want_endgame(loop, handle); - } + uv_tcp_close((uv_tcp_t*)handle); return; case UV_NAMED_PIPE: diff --git a/src/win/internal.h b/src/win/internal.h index bd5ec1ae..470aed1c 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -68,11 +68,12 @@ void uv_process_timers(uv_loop_t* loop); #define UV_HANDLE_NON_OVERLAPPED_PIPE 0x00200000 #define UV_HANDLE_TTY_SAVED_POSITION 0x00400000 #define UV_HANDLE_TTY_SAVED_ATTRIBUTES 0x00800000 -#define UV_HANDLE_SHARED_TCP_SERVER 0x01000000 +#define UV_HANDLE_SHARED_TCP_SOCKET 0x01000000 #define UV_HANDLE_TCP_NODELAY 0x02000000 #define UV_HANDLE_TCP_KEEPALIVE 0x04000000 #define UV_HANDLE_TCP_SINGLE_ACCEPT 0x08000000 #define UV_HANDLE_TCP_ACCEPT_STATE_CHANGING 0x10000000 +#define UV_HANDLE_TCP_SOCKET_CLOSED 0x20000000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -143,7 +144,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle); -int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info); +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info, + int tcp_connection); int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, LPWSAPROTOCOL_INFOW protocol_info); diff --git a/src/win/pipe.c b/src/win/pipe.c index 83220a23..e2e385cb 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -42,8 +42,9 @@ static const int64_t eof_timeout = 50; /* ms */ static const int default_pending_pipe_instances = 4; /* IPC protocol flags. */ -#define UV_IPC_RAW_DATA 0x0001 -#define UV_IPC_UV_STREAM 0x0002 +#define UV_IPC_RAW_DATA 0x0001 +#define UV_IPC_TCP_SERVER 0x0002 +#define UV_IPC_TCP_CONNECTION 0x0004 /* IPC frame header. */ typedef struct { @@ -79,7 +80,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->name = NULL; handle->ipc_pid = 0; handle->remaining_ipc_rawdata_bytes = 0; - handle->pending_socket_info = NULL; + handle->pending_ipc_info.socket_info = NULL; + handle->pending_ipc_info.tcp_connection = 0; handle->ipc = ipc; handle->non_overlapped_writes_tail = NULL; @@ -356,9 +358,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { handle->flags |= UV_HANDLE_CLOSED; if (handle->flags & UV_HANDLE_CONNECTION) { - if (handle->pending_socket_info) { - free(handle->pending_socket_info); - handle->pending_socket_info = NULL; + if (handle->pending_ipc_info.socket_info) { + free(handle->pending_ipc_info.socket_info); + handle->pending_ipc_info.socket_info = NULL; } if (handle->flags & UV_HANDLE_EMULATE_IOCP) { @@ -711,13 +713,14 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_pipe_accept_t* req; if (server->ipc) { - if (!server->pending_socket_info) { + if (!server->pending_ipc_info.socket_info) { /* No valid pending sockets. */ uv__set_sys_error(loop, WSAEWOULDBLOCK); return -1; } - return uv_tcp_import((uv_tcp_t*)client, server->pending_socket_info); + return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info, + server->pending_ipc_info.tcp_connection); } else { pipe_client = (uv_pipe_t*)client; @@ -1051,9 +1054,8 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, return -1; } - /* Only TCP server handles are supported for sharing. */ - if (send_handle && (send_handle->type != UV_TCP || - send_handle->flags & UV_HANDLE_CONNECTION)) { + /* Only TCP handles are supported for sharing. */ + if (send_handle && send_handle->type != UV_TCP) { uv__set_artificial_error(loop, UV_ENOTSUP); return -1; } @@ -1091,7 +1093,11 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, &ipc_frame.socket_info)) { return -1; } - ipc_frame.header.flags |= UV_IPC_UV_STREAM; + ipc_frame.header.flags |= UV_IPC_TCP_SERVER; + + if (tcp_send_handle->flags & UV_HANDLE_CONNECTION) { + ipc_frame.header.flags |= UV_IPC_TCP_CONNECTION; + } } if (bufcnt == 1) { @@ -1132,7 +1138,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, result = WriteFile(handle->handle, &ipc_frame, - ipc_frame.header.flags & UV_IPC_UV_STREAM ? + ipc_frame.header.flags & UV_IPC_TCP_SERVER ? sizeof(ipc_frame) : sizeof(ipc_frame.header), NULL, &ipc_header_req->overlapped); @@ -1146,7 +1152,7 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, ipc_header_req->queued_bytes = 0; } else { /* Request queued by the kernel. */ - ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_UV_STREAM ? + ipc_header_req->queued_bytes = ipc_frame.header.flags & UV_IPC_TCP_SERVER ? sizeof(ipc_frame) : sizeof(ipc_frame.header); handle->write_queue_size += req->queued_bytes; } @@ -1330,9 +1336,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, } assert(bytes == sizeof(ipc_frame.header)); - assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA)); + assert(ipc_frame.header.flags <= (UV_IPC_TCP_SERVER | UV_IPC_RAW_DATA | + UV_IPC_TCP_CONNECTION)); - if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { + if (ipc_frame.header.flags & UV_IPC_TCP_SERVER) { assert(avail - sizeof(ipc_frame.header) >= sizeof(ipc_frame.socket_info)); @@ -1350,14 +1357,16 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header)); /* Store the pending socket info. */ - assert(!handle->pending_socket_info); - handle->pending_socket_info = - (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_socket_info))); - if (!handle->pending_socket_info) { + assert(!handle->pending_ipc_info.socket_info); + handle->pending_ipc_info.socket_info = + (WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info))); + if (!handle->pending_ipc_info.socket_info) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } - *(handle->pending_socket_info) = ipc_frame.socket_info; + *(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info; + handle->pending_ipc_info.tcp_connection = + ipc_frame.header.flags & UV_IPC_TCP_CONNECTION; } if (ipc_frame.header.flags & UV_IPC_RAW_DATA) { @@ -1385,14 +1394,14 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, handle->remaining_ipc_rawdata_bytes - bytes; if (handle->read2_cb) { handle->read2_cb(handle, bytes, buf, - handle->pending_socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); + handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE); } else if (handle->read_cb) { handle->read_cb((uv_stream_t*)handle, bytes, buf); } - if (handle->pending_socket_info) { - free(handle->pending_socket_info); - handle->pending_socket_info = NULL; + if (handle->pending_ipc_info.socket_info) { + free(handle->pending_ipc_info.socket_info); + handle->pending_ipc_info.socket_info = NULL; } } else { handle->read_cb((uv_stream_t*)handle, bytes, buf); diff --git a/src/win/tcp.c b/src/win/tcp.c index ab656a1c..21409ede 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -199,6 +199,11 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (!(handle->flags & UV_HANDLE_TCP_SOCKET_CLOSED)) { + closesocket(handle->socket); + handle->flags |= UV_HANDLE_TCP_SOCKET_CLOSED; + } + if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { if (handle->flags & UV_HANDLE_EMULATE_IOCP) { for (i = 0; i < uv_simultaneous_server_accepts; i++) { @@ -218,6 +223,18 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->accept_reqs = NULL; } + if (handle->flags & UV_HANDLE_CONNECTION && + handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(handle->read_req.wait_handle); + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; + } + if (handle->read_req.event_handle) { + CloseHandle(handle->read_req.event_handle); + handle->read_req.event_handle = NULL; + } + } + if (handle->close_cb) { handle->close_cb((uv_handle_t*)handle); } @@ -341,7 +358,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); if (handle->flags & UV_HANDLE_EMULATE_IOCP) { - req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); } success = handle->func_acceptex(handle->socket, @@ -415,6 +432,13 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { buf.len = 0; } + /* Prepare the overlapped structure. */ + memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + assert(req->event_handle); + req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); + } + flags = 0; result = WSARecv(handle->socket, (WSABUF*)&buf, @@ -434,6 +458,14 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { /* The req will be processed with IOCP. */ handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + } } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); @@ -466,7 +498,7 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { } } - if (!(handle->flags & UV_HANDLE_SHARED_TCP_SERVER) && + if (!(handle->flags & UV_HANDLE_SHARED_TCP_SOCKET) && listen(handle->socket, backlog) == SOCKET_ERROR) { uv__set_sys_error(loop, WSAGetLastError()); return -1; @@ -593,10 +625,18 @@ int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, handle->read_cb = read_cb; handle->alloc_cb = alloc_cb; - /* If reading was stopped and then started again, there could stell be a */ + /* If reading was stopped and then started again, there could still be a */ /* read request pending. */ - if (!(handle->flags & UV_HANDLE_READ_PENDING)) + if (!(handle->flags & UV_HANDLE_READ_PENDING)) { + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + !handle->read_req.event_handle) { + handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!handle->read_req.event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } uv_tcp_queue_read(loop, handle); + } return 0; } @@ -790,6 +830,16 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle, req->cb = cb; memset(&req->overlapped, 0, sizeof(req->overlapped)); + /* Prepare the overlapped structure. */ + memset(&(req->overlapped), 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + req->overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1); + } + result = WSASend(handle->socket, (WSABUF*)bufs, bufcnt, @@ -812,6 +862,14 @@ int uv_tcp_write(uv_loop_t* loop, uv_write_t* req, uv_tcp_t* handle, handle->write_reqs_pending++; handle->write_queue_size += req->queued_bytes; uv_ref(loop); + if (handle->flags & UV_HANDLE_EMULATE_IOCP && + req->wait_handle == INVALID_HANDLE_VALUE && + !RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, (uv_req_t*)req); + } } else { /* Send failed due to an error. */ uv__set_sys_error(loop, WSAGetLastError()); @@ -945,6 +1003,17 @@ void uv_process_tcp_write_req(uv_loop_t* loop, uv_tcp_t* handle, assert(handle->write_queue_size >= req->queued_bytes); handle->write_queue_size -= req->queued_bytes; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + if (req->cb) { uv__set_sys_error(loop, GET_REQ_SOCK_ERROR(req)); ((uv_write_cb)req->cb)(req, loop->last_err.code == UV_OK ? 0 : -1); @@ -1036,7 +1105,8 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, } -int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { +int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info, + int tcp_connection) { SOCKET socket = WSASocketW(AF_INET, SOCK_STREAM, IPPROTO_IP, @@ -1050,13 +1120,22 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info) { } tcp->flags |= UV_HANDLE_BOUND; - tcp->flags |= UV_HANDLE_SHARED_TCP_SERVER; + tcp->flags |= UV_HANDLE_SHARED_TCP_SOCKET; + + if (tcp_connection) { + uv_connection_init((uv_stream_t*)tcp); + } if (socket_protocol_info->iAddressFamily == AF_INET6) { tcp->flags |= UV_HANDLE_IPV6; } - return uv_tcp_set_socket(tcp->loop, tcp, socket, 1); + if (uv_tcp_set_socket(tcp->loop, tcp, socket, 1) != 0) { + return -1; + } + + tcp->loop->active_tcp_streams++; + return 0; } @@ -1097,28 +1176,24 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) { int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, LPWSAPROTOCOL_INFOW protocol_info) { - assert(!(handle->flags & UV_HANDLE_CONNECTION)); + if (!(handle->flags & UV_HANDLE_CONNECTION)) { + /* + * We're about to share the socket with another process. Because + * this is a listening socket, we assume that the other process will + * be accepting connections on it. So, before sharing the socket + * with another process, we call listen here in the parent process. + */ - /* - * We're about to share the socket with another process. Because - * this is a listening socket, we assume that the other process will - * be accepting connections on it. So, before sharing the socket - * with another process, we call listen here in the parent process. - * This needs to be modified if the socket is shared with - * another process for anything other than accepting connections. - */ - - if (!(handle->flags & UV_HANDLE_LISTENING)) { - if (!(handle->flags & UV_HANDLE_BOUND)) { - uv__set_artificial_error(handle->loop, UV_EINVAL); - return -1; + if (!(handle->flags & UV_HANDLE_LISTENING)) { + if (!(handle->flags & UV_HANDLE_BOUND)) { + uv__set_artificial_error(handle->loop, UV_EINVAL); + return -1; + } + if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) { + uv__set_sys_error(handle->loop, WSAGetLastError()); + return -1; + } } - if (listen(handle->socket, SOMAXCONN) == SOCKET_ERROR) { - uv__set_sys_error(handle->loop, WSAGetLastError()); - return -1; - } - - handle->flags |= UV_HANDLE_SHARED_TCP_SERVER; } if (WSADuplicateSocketW(handle->socket, pid, protocol_info)) { @@ -1126,6 +1201,8 @@ int uv_tcp_duplicate_socket(uv_tcp_t* handle, int pid, return -1; } + handle->flags |= UV_HANDLE_SHARED_TCP_SOCKET; + return 0; } @@ -1161,4 +1238,46 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) { } return 0; -} \ No newline at end of file +} + + +void uv_tcp_close(uv_tcp_t* tcp) { + int non_ifs_lsp; + int close_socket = 1; + + /* + * In order for winsock to do a graceful close there must not be + * any pending reads. + */ + if (tcp->flags & UV_HANDLE_READ_PENDING) { + /* Just do shutdown on non-shared sockets, which ensures graceful close. */ + if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET)) { + shutdown(tcp->socket, SD_SEND); + tcp->flags |= UV_HANDLE_SHUT; + } else { + /* Check if we have any non-IFS LSPs stacked on top of TCP */ + non_ifs_lsp = (tcp->flags & UV_HANDLE_IPV6) ? uv_tcp_non_ifs_lsp_ipv6 : + uv_tcp_non_ifs_lsp_ipv4; + + if (!non_ifs_lsp) { + /* + * Shared socket with no non-IFS LSPs, request to cancel pending I/O. + * The socket will be closed inside endgame. + */ + CancelIo((HANDLE)tcp->socket); + close_socket = 0; + } + } + } + + tcp->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); + + if (close_socket) { + closesocket(tcp->socket); + tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED; + } + + if (tcp->reqs_pending == 0) { + uv_want_endgame(tcp->loop, (uv_handle_t*)tcp); + } +} diff --git a/test/run-tests.c b/test/run-tests.c index 8f100233..db7ea70b 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -32,6 +32,11 @@ /* The time in milliseconds after which a single test times out. */ #define TEST_TIMEOUT 5000 +int ipc_helper(int listen_after_write); +int ipc_helper_tcp_connection(void); +int ipc_send_recv_helper(void); +int stdio_over_pipes_helper(void); + static int maybe_run_test(int argc, char **argv); @@ -51,212 +56,6 @@ int main(int argc, char **argv) { } -static uv_pipe_t channel; -static uv_tcp_t tcp_server; -static uv_write_t conn_notify_req; -static int close_cb_called; -static int connection_accepted; - -static uv_pipe_t stdin_pipe; -static uv_pipe_t stdout_pipe; -static int on_pipe_read_called; -static int after_write_called; - - -static void close_cb(uv_handle_t* handle) { - close_cb_called++; -} - - -static void close_conn_cb(uv_handle_t* handle) { - free(handle); - close_cb_called++; -} - - -void conn_notify_write_cb(uv_write_t* req, int status) { - uv_close((uv_handle_t*)&tcp_server, close_cb); - uv_close((uv_handle_t*)&channel, close_cb); -} - - -static void ipc_on_connection(uv_stream_t* server, int status) { - int r; - uv_buf_t buf; - uv_tcp_t* conn; - - if (!connection_accepted) { - /* - * Accept the connection and close it. Also let the other - * side know. - */ - ASSERT(status == 0); - ASSERT((uv_stream_t*)&tcp_server == server); - - conn = malloc(sizeof(*conn)); - ASSERT(conn); - - r = uv_tcp_init(server->loop, conn); - ASSERT(r == 0); - - r = uv_accept(server, (uv_stream_t*)conn); - ASSERT(r == 0); - - uv_close((uv_handle_t*)conn, close_conn_cb); - - buf = uv_buf_init("accepted_connection\n", 20); - r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, - NULL, conn_notify_write_cb); - ASSERT(r == 0); - - connection_accepted = 1; - } -} - - -static int ipc_helper(int listen_after_write) { - /* - * This is launched from test-ipc.c. stdin is a duplex channel that we - * over which a handle will be transmitted. In this initial version only - * data is transfered over the channel. XXX edit this comment after handle - * transfer is added. - */ - - uv_write_t write_req; - int r; - uv_buf_t buf; - - r = uv_pipe_init(uv_default_loop(), &channel, 1); - ASSERT(r == 0); - - uv_pipe_open(&channel, 0); - - ASSERT(uv_is_readable((uv_stream_t*) &channel)); - ASSERT(uv_is_writable((uv_stream_t*) &channel)); - - r = uv_tcp_init(uv_default_loop(), &tcp_server); - ASSERT(r == 0); - - r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); - ASSERT(r == 0); - - if (!listen_after_write) { - r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); - ASSERT(r == 0); - } - - buf = uv_buf_init("hello\n", 6); - r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, - (uv_stream_t*)&tcp_server, NULL); - ASSERT(r == 0); - - if (listen_after_write) { - r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); - ASSERT(r == 0); - } - - r = uv_run(uv_default_loop()); - ASSERT(r == 0); - - ASSERT(connection_accepted == 1); - ASSERT(close_cb_called == 3); - - return 0; -} - - -void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { - ASSERT(nread > 0); - ASSERT(memcmp("hello world\n", buf.base, nread) == 0); - on_pipe_read_called++; - - free(buf.base); - - uv_close((uv_handle_t*)&stdin_pipe, close_cb); - uv_close((uv_handle_t*)&stdout_pipe, close_cb); -} - - -static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle, - size_t suggested_size) { - uv_buf_t buf; - buf.base = (char*)malloc(suggested_size); - buf.len = suggested_size; - return buf; -} - - -static void after_pipe_write(uv_write_t* req, int status) { - ASSERT(status == 0); - after_write_called++; -} - - -static int stdio_over_pipes_helper() { - /* Write several buffers to test that the write order is preserved. */ - char* buffers[] = { - "he", - "ll", - "o ", - "wo", - "rl", - "d", - "\n" - }; - - uv_write_t write_req[ARRAY_SIZE(buffers)]; - uv_buf_t buf[ARRAY_SIZE(buffers)]; - int r, i; - uv_loop_t* loop = uv_default_loop(); - - ASSERT(UV_NAMED_PIPE == uv_guess_handle(0)); - ASSERT(UV_NAMED_PIPE == uv_guess_handle(1)); - - r = uv_pipe_init(loop, &stdin_pipe, 0); - ASSERT(r == 0); - r = uv_pipe_init(loop, &stdout_pipe, 0); - ASSERT(r == 0); - - uv_pipe_open(&stdin_pipe, 0); - uv_pipe_open(&stdout_pipe, 1); - - /* Unref both stdio handles to make sure that all writes complete. */ - uv_unref(loop); - uv_unref(loop); - - for (i = 0; i < ARRAY_SIZE(buffers); i++) { - buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i])); - } - - for (i = 0; i < ARRAY_SIZE(buffers); i++) { - r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1, - after_pipe_write); - ASSERT(r == 0); - } - - uv_run(loop); - - ASSERT(after_write_called == 7); - ASSERT(on_pipe_read_called == 0); - ASSERT(close_cb_called == 0); - - uv_ref(loop); - uv_ref(loop); - - r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc, - on_pipe_read); - ASSERT(r == 0); - - uv_run(loop); - - ASSERT(after_write_called == 7); - ASSERT(on_pipe_read_called == 1); - ASSERT(close_cb_called == 2); - - return 0; -} - - static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); @@ -272,10 +71,13 @@ static int maybe_run_test(int argc, char **argv) { } if (strcmp(argv[1], "ipc_send_recv_helper") == 0) { - int ipc_send_recv_helper(void); /* See test-ipc-send-recv.c */ return ipc_send_recv_helper(); } + if (strcmp(argv[1], "ipc_helper_tcp_connection") == 0) { + return ipc_helper_tcp_connection(); + } + if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) { return stdio_over_pipes_helper(); } diff --git a/test/test-ipc.c b/test/test-ipc.c index ba122af9..d70d5b23 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -27,17 +27,28 @@ static uv_pipe_t channel; static uv_tcp_t tcp_server; +static uv_tcp_t tcp_connection; static int exit_cb_called; static int read2_cb_called; +static int tcp_write_cb_called; +static int tcp_read_cb_called; +static int on_pipe_read_called; static int local_conn_accepted; static int remote_conn_accepted; static int tcp_server_listening; - static uv_write_t write_req; +static uv_pipe_t channel; +static uv_tcp_t tcp_server; +static uv_write_t conn_notify_req; +static int close_cb_called; +static int connection_accepted; +static int tcp_conn_read_cb_called; +static int tcp_conn_write_cb_called; typedef struct { uv_connect_t conn_req; + uv_write_t tcp_write_req; uv_tcp_t conn; } tcp_conn; @@ -49,7 +60,7 @@ static void close_server_conn_cb(uv_handle_t* handle) { } -static void ipc_on_connection(uv_stream_t* server, int status) { +static void on_connection(uv_stream_t* server, int status) { uv_tcp_t* conn; int r; @@ -156,7 +167,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); ASSERT(r == 0); - r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + r = uv_listen((uv_stream_t*)&tcp_server, 12, on_connection); ASSERT(r == 0); tcp_server_listening = 1; @@ -214,32 +225,127 @@ void spawn_helper(uv_pipe_t* channel, } -static int run_ipc_test(const char* helper) { +static void on_tcp_write(uv_write_t* req, int status) { + ASSERT(status == 0); + ASSERT(req->handle == (uv_stream_t*)&tcp_connection); + tcp_write_cb_called++; +} + + +static uv_buf_t on_read_alloc(uv_handle_t* handle, size_t suggested_size) { + uv_buf_t buf; + buf.base = (char*)malloc(suggested_size); + buf.len = suggested_size; + return buf; +} + + +static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + ASSERT(nread > 0); + ASSERT(memcmp("hello again\n", buf.base, nread) == 0); + ASSERT(tcp == (uv_stream_t*)&tcp_connection); + free(buf.base); + + tcp_read_cb_called++; + + uv_close((uv_handle_t*)tcp, NULL); + uv_close((uv_handle_t*)&channel, NULL); +} + + +static void on_read_connection(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { + int r; + uv_buf_t outbuf; + uv_err_t err; + + if (nread == 0) { + /* Everything OK, but nothing read. */ + free(buf.base); + return; + } + + if (nread < 0) { + err = uv_last_error(pipe->loop); + if (err.code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on channel: %s\n", uv_strerror(err)); + abort(); + } + + fprintf(stderr, "got %d bytes\n", (int)nread); + + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP connection */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_connection); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection); + ASSERT(r == 0); + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, nread) == 0); + + /* Write/read to/from the connection */ + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1, + on_tcp_write); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read); + ASSERT(r == 0); + + free(buf.base); +} + + +static int run_ipc_test(const char* helper, uv_read2_cb read_cb) { uv_process_t process; int r; spawn_helper(&channel, &process, helper); - uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read); + uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb); r = uv_run(uv_default_loop()); ASSERT(r == 0); - ASSERT(local_conn_accepted == 1); - ASSERT(remote_conn_accepted == 1); - ASSERT(read2_cb_called == 1); - ASSERT(exit_cb_called == 1); - return 0; } TEST_IMPL(ipc_listen_before_write) { - return run_ipc_test("ipc_helper_listen_before_write"); + int r = run_ipc_test("ipc_helper_listen_before_write", on_read); + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); + ASSERT(read2_cb_called == 1); + ASSERT(exit_cb_called == 1); + return r; } TEST_IMPL(ipc_listen_after_write) { - return run_ipc_test("ipc_helper_listen_after_write"); + int r = run_ipc_test("ipc_helper_listen_after_write", on_read); + ASSERT(local_conn_accepted == 1); + ASSERT(remote_conn_accepted == 1); + ASSERT(read2_cb_called == 1); + ASSERT(exit_cb_called == 1); + return r; +} + + +TEST_IMPL(ipc_tcp_connection) { + int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection); + ASSERT(read2_cb_called == 1); + ASSERT(tcp_write_cb_called == 1); + ASSERT(tcp_read_cb_called == 1); + ASSERT(exit_cb_called == 1); + return r; } @@ -287,3 +393,220 @@ TEST_IMPL(listen_no_simultaneous_accepts) { return 0; } #endif + + +/* Everything here runs in a child process. */ + +tcp_conn conn; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void conn_notify_write_cb(uv_write_t* req, int status) { + uv_close((uv_handle_t*)&tcp_server, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); +} + + +static void tcp_connection_write_cb(uv_write_t* req, int status) { + ASSERT((uv_handle_t*)&conn.conn == (uv_handle_t*)req->handle); + uv_close((uv_handle_t*)req->handle, close_cb); + uv_close((uv_handle_t*)&channel, close_cb); + uv_close((uv_handle_t*)&tcp_server, close_cb); + tcp_conn_write_cb_called++; +} + + +static void on_tcp_child_process_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + uv_buf_t outbuf; + int r; + + if (nread < 0) { + if (uv_last_error(tcp->loop).code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on tcp connection: %s\n", + uv_strerror(uv_last_error(tcp->loop))); + abort(); + } + + ASSERT(nread > 0); + ASSERT(memcmp("world\n", buf.base, nread) == 0); + on_pipe_read_called++; + free(buf.base); + + /* Write to the socket */ + outbuf = uv_buf_init("hello again\n", 12); + r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb); + ASSERT(r == 0); + + tcp_conn_read_cb_called++; +} + + +static void connect_child_process_cb(uv_connect_t* req, int status) { + int r; + + ASSERT(status == 0); + r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read); + ASSERT(r == 0); +} + + +static void ipc_on_connection(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + + if (!connection_accepted) { + /* + * Accept the connection and close it. Also let the other + * side know. + */ + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + r = uv_tcp_init(server->loop, &conn.conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)&conn.conn); + ASSERT(r == 0); + + uv_close((uv_handle_t*)&conn.conn, close_cb); + + buf = uv_buf_init("accepted_connection\n", 20); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + NULL, conn_notify_write_cb); + ASSERT(r == 0); + + connection_accepted = 1; + } +} + + +static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) { + int r; + uv_buf_t buf; + uv_tcp_t* conn; + + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); + + conn = malloc(sizeof(*conn)); + ASSERT(conn); + + r = uv_tcp_init(server->loop, conn); + ASSERT(r == 0); + + r = uv_accept(server, (uv_stream_t*)conn); + ASSERT(r == 0); + + /* Send the accepted connection to the other process */ + buf = uv_buf_init("hello\n", 6); + r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)conn, NULL); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*)conn, on_read_alloc, on_tcp_child_process_read); + ASSERT(r == 0); + + uv_close((uv_handle_t*)conn, close_cb); +} + + +int ipc_helper(int listen_after_write) { + /* + * This is launched from test-ipc.c. stdin is a duplex channel that we + * over which a handle will be transmitted. + */ + + uv_write_t write_req; + int r; + uv_buf_t buf; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + uv_pipe_open(&channel, 0); + + ASSERT(uv_is_readable((uv_stream_t*) &channel)); + ASSERT(uv_is_writable((uv_stream_t*) &channel)); + + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + if (!listen_after_write) { + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + } + + buf = uv_buf_init("hello\n", 6); + r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)&tcp_server, NULL); + ASSERT(r == 0); + + if (listen_after_write) { + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + } + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(connection_accepted == 1); + ASSERT(close_cb_called == 3); + + return 0; +} + + +int ipc_helper_tcp_connection() { + /* + * This is launched from test-ipc.c. stdin is a duplex channel that we + * over which a handle will be transmitted. + */ + + int r; + struct sockaddr_in addr; + + r = uv_pipe_init(uv_default_loop(), &channel, 1); + ASSERT(r == 0); + + uv_pipe_open(&channel, 0); + + ASSERT(uv_is_readable((uv_stream_t*)&channel)); + ASSERT(uv_is_writable((uv_stream_t*)&channel)); + + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_tcp_bind(&tcp_server, uv_ip4_addr("0.0.0.0", TEST_PORT)); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection_tcp_conn); + ASSERT(r == 0); + + /* Make a connection to the server */ + r = uv_tcp_init(uv_default_loop(), &conn.conn); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + r = uv_tcp_connect(&conn.conn_req, (uv_tcp_t*)&conn.conn, addr, connect_child_process_cb); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(tcp_conn_read_cb_called == 1); + ASSERT(tcp_conn_write_cb_called == 1); + ASSERT(close_cb_called == 4); + + return 0; +} \ No newline at end of file diff --git a/test/test-list.h b/test/test-list.h index 634d5677..5743243b 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -26,6 +26,7 @@ TEST_DECLARE (ipc_listen_before_write) TEST_DECLARE (ipc_listen_after_write) TEST_DECLARE (ipc_send_recv_pipe) TEST_DECLARE (ipc_send_recv_tcp) +TEST_DECLARE (ipc_tcp_connection) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) TEST_DECLARE (pipe_ping_pong) @@ -180,6 +181,7 @@ TASK_LIST_START TEST_ENTRY (ipc_listen_after_write) TEST_ENTRY (ipc_send_recv_pipe) TEST_ENTRY (ipc_send_recv_tcp) + TEST_ENTRY (ipc_tcp_connection) TEST_ENTRY (tcp_ping_pong) TEST_HELPER (tcp_ping_pong, tcp4_echo_server) diff --git a/test/test-stdio-over-pipes.c b/test/test-stdio-over-pipes.c index 7c0a692b..2116d164 100644 --- a/test/test-stdio-over-pipes.c +++ b/test/test-stdio-over-pipes.c @@ -40,6 +40,7 @@ static uv_loop_t* loop; static char output[OUTPUT_SIZE]; static int output_used; + typedef struct { uv_write_t req; uv_buf_t buf; @@ -155,3 +156,101 @@ TEST_IMPL(stdio_over_pipes) { return 0; } + +/* Everything here runs in a child process. */ + +static int on_pipe_read_called; +static int after_write_called; +static uv_pipe_t stdin_pipe; +static uv_pipe_t stdout_pipe; + +static void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + ASSERT(nread > 0); + ASSERT(memcmp("hello world\n", buf.base, nread) == 0); + on_pipe_read_called++; + + free(buf.base); + + uv_close((uv_handle_t*)&stdin_pipe, close_cb); + uv_close((uv_handle_t*)&stdout_pipe, close_cb); +} + + +static void after_pipe_write(uv_write_t* req, int status) { + ASSERT(status == 0); + after_write_called++; +} + + +static uv_buf_t on_read_alloc(uv_handle_t* handle, + size_t suggested_size) { + uv_buf_t buf; + buf.base = (char*)malloc(suggested_size); + buf.len = suggested_size; + return buf; +} + + +int stdio_over_pipes_helper() { + /* Write several buffers to test that the write order is preserved. */ + char* buffers[] = { + "he", + "ll", + "o ", + "wo", + "rl", + "d", + "\n" + }; + + uv_write_t write_req[ARRAY_SIZE(buffers)]; + uv_buf_t buf[ARRAY_SIZE(buffers)]; + int r, i; + uv_loop_t* loop = uv_default_loop(); + + ASSERT(UV_NAMED_PIPE == uv_guess_handle(0)); + ASSERT(UV_NAMED_PIPE == uv_guess_handle(1)); + + r = uv_pipe_init(loop, &stdin_pipe, 0); + ASSERT(r == 0); + r = uv_pipe_init(loop, &stdout_pipe, 0); + ASSERT(r == 0); + + uv_pipe_open(&stdin_pipe, 0); + uv_pipe_open(&stdout_pipe, 1); + + /* Unref both stdio handles to make sure that all writes complete. */ + uv_unref(loop); + uv_unref(loop); + + for (i = 0; i < ARRAY_SIZE(buffers); i++) { + buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i])); + } + + for (i = 0; i < ARRAY_SIZE(buffers); i++) { + r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1, + after_pipe_write); + ASSERT(r == 0); + } + + uv_run(loop); + + ASSERT(after_write_called == 7); + ASSERT(on_pipe_read_called == 0); + ASSERT(close_cb_called == 0); + + uv_ref(loop); + uv_ref(loop); + + r = uv_read_start((uv_stream_t*)&stdin_pipe, on_read_alloc, + on_pipe_read); + ASSERT(r == 0); + + uv_run(loop); + + ASSERT(after_write_called == 7); + ASSERT(on_pipe_read_called == 1); + ASSERT(close_cb_called == 2); + + return 0; +} \ No newline at end of file