diff --git a/include/uv-win.h b/include/uv-win.h index 2d6093aa..e0bfa848 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -42,22 +42,6 @@ typedef struct uv_buf_t { char* base; } uv_buf_t; -/* - * Private uv_pipe_instance state. - */ -typedef enum { - UV_PIPEINSTANCE_CONNECTED = 0, - UV_PIPEINSTANCE_DISCONNECTED, - UV_PIPEINSTANCE_ACTIVE -} uv_pipeinstance_state; - -/* Used to store active pipe instances inside a linked list. */ -typedef struct uv_pipe_instance_s { - HANDLE handle; - uv_pipeinstance_state state; - struct uv_pipe_instance_s* next; -} uv_pipe_instance_t; - #define UV_REQ_PRIVATE_FIELDS \ union { \ /* Used by I/O operations */ \ @@ -66,13 +50,21 @@ typedef struct uv_pipe_instance_s { size_t queued_bytes; \ }; \ }; \ - int flags; \ uv_err_t error; \ struct uv_req_s* next_req; +#define UV_WRITE_PRIVATE_FIELDS \ + /* empty */ + +#define UV_CONNECT_PRIVATE_FIELDS \ + /* empty */ + +#define UV_SHUTDOWN_PRIVATE_FIELDS \ + /* empty */ + #define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ - uv_req_t* shutdown_req; + uv_shutdown_t* shutdown_req; #define uv_stream_server_fields \ uv_connection_cb connection_cb; @@ -81,7 +73,7 @@ typedef struct uv_pipe_instance_s { unsigned int reqs_pending; \ uv_alloc_cb alloc_cb; \ uv_read_cb read_cb; \ - struct uv_req_s read_req; \ + uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ struct { uv_stream_server_fields }; \ @@ -94,17 +86,19 @@ typedef struct uv_pipe_instance_s { }; \ SOCKET accept_socket; \ char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ - struct uv_req_s accept_req; + struct uv_req_s accept_req; \ #define uv_pipe_server_fields \ char* name; \ - uv_pipe_instance_t* connections; \ - struct uv_req_s accept_reqs[4]; + struct uv_pipe_accept_s { \ + UV_REQ_FIELDS \ + HANDLE pipeHandle; \ + struct uv_pipe_accept_s* next_pending; \ + } accept_reqs[4]; \ + struct uv_pipe_accept_s* pending_accepts; #define uv_pipe_connection_fields \ - uv_pipe_t* server; \ - uv_pipe_instance_t* connection; \ - uv_pipe_instance_t clientConnection; + HANDLE handle; #define UV_PIPE_PRIVATE_FIELDS \ union { \ @@ -120,6 +114,7 @@ typedef struct uv_pipe_instance_s { #define UV_ASYNC_PRIVATE_FIELDS \ struct uv_req_s async_req; \ + uv_async_cb async_cb; \ /* char to avoid alignment issues */ \ char volatile async_sent; diff --git a/include/uv.h b/include/uv.h index f3cd256a..334f4307 100644 --- a/include/uv.h +++ b/include/uv.h @@ -291,7 +291,7 @@ struct uv_write_s { }; int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, - uv_write_cb); + uv_write_cb cb); /* @@ -310,7 +310,7 @@ int uv_tcp_init(uv_tcp_t* handle); int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in); int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6); -/* +/* * uv_tcp_connect, uv_tcp_connect6 * These functions establish IPv4 and IPv6 TCP connections. Provide an * initialized TCP handle and an uninitialized uv_connect_t*. The callback @@ -319,7 +319,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6); struct uv_connect_s { UV_REQ_FIELDS uv_connect_cb cb; - uv_tcp_t* handle; + uv_stream_t* handle; UV_CONNECT_PRIVATE_FIELDS }; @@ -336,10 +336,10 @@ int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen); /* * A subclass of uv_stream_t representing a pipe stream or pipe server. */ -struct uv_pipe_s { - UV_HANDLE_FIELDS - UV_STREAM_FIELDS - UV_PIPE_PRIVATE_FIELDS +struct uv_pipe_s { + UV_HANDLE_FIELDS + UV_STREAM_FIELDS + UV_PIPE_PRIVATE_FIELDS }; int uv_pipe_init(uv_pipe_t* handle); @@ -348,7 +348,8 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name); int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb); -int uv_pipe_connect(uv_req_t* req, const char* name); +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb); /* diff --git a/src/uv-win.c b/src/uv-win.c index 7166e37f..b9bdb7fa 100644 --- a/src/uv-win.c +++ b/src/uv-win.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "uv.h" #include "uv-common.h" @@ -165,12 +166,7 @@ static LPFN_TRANSMITFILE pTransmitFile6; #define UV_HANDLE_BIND_ERROR 0x1000 #define UV_HANDLE_IPV6 0x2000 #define UV_HANDLE_PIPESERVER 0x4000 - -/* - * Private uv_req flags. - */ -/* The request is currently queued. */ -#define UV_REQ_PENDING 0x01 +#define UV_HANDLE_READ_PENDING 0x8000 /* Binary tree used to keep the list of timers sorted. */ @@ -518,12 +514,9 @@ void uv_init() { } -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) { +static void uv_req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; - req->flags = 0; - req->handle = handle; - req->cb = cb; } @@ -602,7 +595,10 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) { static void uv_init_connection(uv_stream_t* handle) { handle->flags |= UV_HANDLE_CONNECTION; handle->write_reqs_pending = 0; - uv_req_init(&(handle->read_req), (uv_handle_t*)handle, NULL); + + uv_req_init((uv_req_t*) &(handle->read_req)); + handle->read_req.type = UV_READ; + handle->read_req.data = handle; } @@ -650,11 +646,10 @@ static void uv_tcp_endgame(uv_tcp_t* handle) { err = uv_new_sys_error(WSAGetLastError()); } if (handle->shutdown_req->cb) { - handle->shutdown_req->flags &= ~UV_REQ_PENDING; if (status == -1) { uv_last_error_ = err; } - ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); + handle->shutdown_req->cb(handle->shutdown_req, status); } handle->reqs_pending--; } @@ -683,11 +678,10 @@ static void uv_pipe_endgame(uv_pipe_t* handle) { close_pipe(handle, &status, &err); if (handle->shutdown_req->cb) { - handle->shutdown_req->flags &= ~UV_REQ_PENDING; if (status == -1) { uv_last_error_ = err; } - ((uv_shutdown_cb)handle->shutdown_req->cb)(handle->shutdown_req, status); + handle->shutdown_req->cb(handle->shutdown_req, status); } handle->reqs_pending--; } @@ -952,9 +946,6 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { /* Prepare the uv_req structure. */ req = &handle->accept_req; - assert(!(req->flags & UV_REQ_PENDING)); - req->type = UV_ACCEPT; - req->flags |= UV_REQ_PENDING; /* choose family and extension function */ if ((handle->flags & UV_HANDLE_IPV6) != 0) { @@ -999,57 +990,55 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { handle->accept_socket = accept_socket; handle->reqs_pending++; - req->flags |= UV_REQ_PENDING; } -static void uv_pipe_queue_accept(uv_pipe_t* handle) { - uv_req_t* req; +static void uv_pipe_queue_accept(uv_pipe_t* handle, struct uv_pipe_accept_s* req) { HANDLE pipeHandle; - int i; assert(handle->flags & UV_HANDLE_LISTENING); + assert(req->pipeHandle == INVALID_HANDLE_VALUE); - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - req = &handle->accept_reqs[i]; - if (!(req->flags & UV_REQ_PENDING)) { - pipeHandle = CreateNamedPipe(handle->name, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, - PIPE_UNLIMITED_INSTANCES, - 65536, - 65536, - 0, - NULL); + pipeHandle = CreateNamedPipe(handle->name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + NULL); - if (pipeHandle == INVALID_HANDLE_VALUE) { - continue; - } - - if (CreateIoCompletionPort(pipeHandle, - uv_iocp_, - (ULONG_PTR)handle, - 0) == NULL) { - continue; - } - - /* Prepare the overlapped structure. */ - memset(&(req->overlapped), 0, sizeof(req->overlapped)); - - if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && - GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { - /* Make this req pending reporting an error. */ - req->error = uv_new_sys_error(GetLastError()); - uv_insert_pending_req(req); - handle->reqs_pending++; - continue; - } - - req->data = pipeHandle; - req->flags |= UV_REQ_PENDING; - handle->reqs_pending++; - } + if (pipeHandle == INVALID_HANDLE_VALUE) { + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; } + + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; + } + + /* Prepare the overlapped structure. */ + memset(&(req->overlapped), 0, sizeof(req->overlapped)); + + if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && + GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(GetLastError()); + uv_insert_pending_req((uv_req_t*) req); + handle->reqs_pending++; + return; + } + + req->pipeHandle = pipeHandle; + handle->reqs_pending++; } @@ -1060,11 +1049,10 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { DWORD bytes, flags; assert(handle->flags & UV_HANDLE_READING); + assert(!(handle->flags & UV_HANDLE_READ_PENDING)); req = &handle->read_req; - assert(!(req->flags & UV_REQ_PENDING)); memset(&req->overlapped, 0, sizeof(req->overlapped)); - req->type = UV_READ; buf.base = (char*) &uv_zero_; buf.len = 0; @@ -1085,7 +1073,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { return; } - req->flags |= UV_REQ_PENDING; + handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -1095,16 +1083,15 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { int result; assert(handle->flags & UV_HANDLE_READING); - assert(handle->connection); - assert(handle->connection->handle != INVALID_HANDLE_VALUE); + assert(!(handle->flags & UV_HANDLE_READ_PENDING)); + + assert(handle->handle != INVALID_HANDLE_VALUE); req = &handle->read_req; - assert(!(req->flags & UV_REQ_PENDING)); memset(&req->overlapped, 0, sizeof(req->overlapped)); - req->type = UV_READ; /* Do 0-read */ - result = ReadFile(handle->connection->handle, + result = ReadFile(handle->handle, &uv_zero_, 0, NULL, @@ -1118,7 +1105,7 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { return; } - req->flags |= UV_REQ_PENDING; + handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -1146,7 +1133,9 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - uv_req_init(&(handle->accept_req), (uv_handle_t*)handle, NULL); + uv_req_init(&(handle->accept_req)); + handle->accept_req.type = UV_ACCEPT; + handle->accept_req.data = handle; uv_tcp_queue_accept(handle); return 0; @@ -1179,36 +1168,27 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { - uv_pipe_instance_t* connection = server->connections; - /* Find a connection instance that has been connected, but not yet accepted. */ - while (connection) { - if (connection->state == UV_PIPEINSTANCE_CONNECTED) { - break; - } + struct uv_pipe_accept_s* req = server->pending_accepts; - connection = connection->next; - } - - if (!connection) { + if (!req) { /* No valid connections found, so we error out. */ - uv_set_sys_error(UV_ENOTCONN); + uv_set_sys_error(WSAEWOULDBLOCK); return -1; } - /* Make the connection instance active */ - connection->state = UV_PIPEINSTANCE_ACTIVE; - - /* Assign the connection to the client. */ - client->connection = connection; - client->server = server; + /* Initialize the client handle and copy the pipeHandle to the client */ + uv_pipe_init(client); + uv_init_connection((uv_stream_t*) client); + client->handle = req->pipeHandle; - uv_init_connection((uv_stream_t*)client); - client->flags |= UV_HANDLE_PIPESERVER; - uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->pipeHandle = INVALID_HANDLE_VALUE; if (!(server->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(server); + uv_pipe_queue_accept(server, req); } return 0; @@ -1250,7 +1230,7 @@ static int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb /* If reading was stopped and then started again, there could stell be a */ /* read request pending. */ - if (!(handle->read_req.flags & UV_REQ_PENDING)) + if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_tcp_queue_read(handle); return 0; @@ -1279,7 +1259,7 @@ static int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_c /* If reading was stopped and then started again, there could stell be a */ /* read request pending. */ - if (!(handle->read_req.flags & UV_REQ_PENDING)) + if (!(handle->flags & UV_HANDLE_READ_PENDING)) uv_pipe_queue_read(handle); return 0; @@ -1304,20 +1284,18 @@ int uv_read_stop(uv_stream_t* handle) { } -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb) { int addrsize = sizeof(struct sockaddr_in); BOOL success; DWORD bytes; - uv_tcp_t* handle = (uv_tcp_t*)req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); if (handle->flags & UV_HANDLE_BIND_ERROR) { uv_last_error_ = handle->error; return -1; } - if (addr.sin_family != AF_INET) { + if (address.sin_family != AF_INET) { uv_set_sys_error(WSAEFAULT); return -1; } @@ -1326,11 +1304,14 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { uv_tcp_bind(handle, uv_addr_ip4_any_) < 0) return -1; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); success = pConnectEx(handle->socket, - (struct sockaddr*)&addr, + (struct sockaddr*) &address, addrsize, NULL, 0, @@ -1342,32 +1323,29 @@ int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { return -1; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; return 0; } -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb) { int addrsize = sizeof(struct sockaddr_in6); BOOL success; DWORD bytes; - uv_tcp_t* handle = (uv_tcp_t*)req->handle; if (!uv_allow_ipv6) { uv_new_sys_error(UV_EAFNOSUPPORT); return -1; } - assert(!(req->flags & UV_REQ_PENDING)); - if (handle->flags & UV_HANDLE_BIND_ERROR) { uv_last_error_ = handle->error; return -1; } - if (addr.sin6_family != AF_INET6) { + if (address.sin6_family != AF_INET6) { uv_set_sys_error(WSAEFAULT); return -1; } @@ -1376,11 +1354,14 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { uv_tcp_bind6(handle, uv_addr_ip6_any_) < 0) return -1; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); success = pConnectEx6(handle->socket, - (struct sockaddr*)&addr, + (struct sockaddr*) &address, addrsize, NULL, 0, @@ -1392,7 +1373,6 @@ int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { return -1; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; return 0; @@ -1429,25 +1409,26 @@ static size_t uv_count_bufs(uv_buf_t bufs[], int count) { } -int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { +static int uv_tcp_write(uv_write_t* req, uv_tcp_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { int result; DWORD bytes, err; - uv_tcp_t* handle = (uv_tcp_t*) req->handle; - assert(!(req->flags & UV_REQ_PENDING)); - - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(WSAEINVAL); return -1; } - if (req->handle->flags & UV_HANDLE_SHUTTING) { + if (handle->flags & UV_HANDLE_SHUTTING) { uv_set_sys_error(WSAESHUTDOWN); return -1; } - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_WRITE; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); result = WSASend(handle->socket, (WSABUF*)bufs, @@ -1474,7 +1455,6 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { handle->write_queue_size += req->queued_bytes; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; handle->write_reqs_pending++; @@ -1482,34 +1462,34 @@ int uv_tcp_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { } -int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { +int uv_pipe_write(uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { int result; - uv_pipe_t* handle = (uv_pipe_t*) req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); if (bufcnt != 1) { uv_set_sys_error(UV_ENOTSUP); return -1; } - assert(handle->connection); - assert(handle->connection->handle != INVALID_HANDLE_VALUE); + assert(handle->handle != INVALID_HANDLE_VALUE); - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(UV_EINVAL); return -1; } - if (req->handle->flags & UV_HANDLE_SHUTTING) { + if (handle->flags & UV_HANDLE_SHUTTING) { uv_set_sys_error(UV_EOF); return -1; } - memset(&req->overlapped, 0, sizeof(req->overlapped)); + uv_req_init((uv_req_t*) req); req->type = UV_WRITE; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); - result = WriteFile(handle->connection->handle, + result = WriteFile(handle->handle, bufs[0].base, bufs[0].len, NULL, @@ -1529,7 +1509,6 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { handle->write_queue_size += req->queued_bytes; } - req->flags |= UV_REQ_PENDING; handle->reqs_pending++; handle->write_reqs_pending++; @@ -1537,22 +1516,21 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { } -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { - if (req->handle->type == UV_TCP) { - return uv_tcp_write(req, bufs, bufcnt); - } else if (req->handle->type == UV_NAMED_PIPE) { - return uv_pipe_write(req, bufs, bufcnt); +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + if (handle->type == UV_TCP) { + return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb); + } else if (handle->type == UV_NAMED_PIPE) { + return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb); } + uv_set_sys_error(WSAEINVAL); return -1; } -int uv_shutdown(uv_req_t* req) { - uv_tcp_t* handle = (uv_tcp_t*) req->handle; - int status = 0; - - if (!(req->handle->flags & UV_HANDLE_CONNECTION)) { +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { + if (!(handle->flags & UV_HANDLE_CONNECTION)) { uv_set_sys_error(WSAEINVAL); return -1; } @@ -1562,8 +1540,10 @@ int uv_shutdown(uv_req_t* req) { return -1; } + uv_req_init((uv_req_t*) req); req->type = UV_SHUTDOWN; - req->flags |= UV_REQ_PENDING; + req->handle = handle; + req->cb = cb; handle->flags |= UV_HANDLE_SHUTTING; handle->shutdown_req = req; @@ -1592,8 +1572,7 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; + handle->flags &= ~UV_HANDLE_READ_PENDING; if (req->error.code != UV_OK) { /* An error occurred doing the 0-read. */ @@ -1649,7 +1628,8 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { } /* Post another 0-read if still reading and not closing. */ - if (handle->flags & UV_HANDLE_READING) { + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { uv_tcp_queue_read(handle); } } @@ -1658,12 +1638,9 @@ static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { } -static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) { +static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - handle->write_queue_size -= req->queued_bytes; if (req->cb) { @@ -1684,9 +1661,6 @@ static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) { static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - /* If handle->accepted_socket is not a valid socket, then */ /* uv_queue_accept must have failed. This is a serious error. We stop */ /* accepting connections and report this error to the connection */ @@ -1723,12 +1697,9 @@ static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { } -static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) { +static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req) { assert(handle->type == UV_TCP); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->cb) { if (req->error.code == UV_OK) { if (setsockopt(handle->socket, @@ -1758,8 +1729,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; + handle->flags &= ~UV_HANDLE_READ_PENDING; if (req->error.code != UV_OK) { /* An error occurred doing the 0-read. */ @@ -1777,7 +1747,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { * This is so that ReadFile doesn't block if the read buffer is empty. */ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; - if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { /* We can't continue processing this read. */ handle->flags &= ~UV_HANDLE_READING; uv_set_sys_error(GetLastError()); @@ -1791,11 +1761,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { buf = handle->alloc_cb((uv_stream_t*)handle, 65536); assert(buf.len > 0); - if (ReadFile(handle->connection->handle, - buf.base, - buf.len, - &bytes, - NULL)) { + if (ReadFile(handle->handle, + buf.base, + buf.len, + &bytes, + NULL)) { if (bytes > 0) { /* Successful read */ handle->read_cb((uv_stream_t*)handle, bytes, buf); @@ -1816,7 +1786,7 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { err = GetLastError(); if (err == ERROR_NO_DATA) { /* Read buffer was completely empty, report a 0-byte read. */ - uv_set_sys_error(UV_EAGAIN); + uv_set_sys_error(WSAEWOULDBLOCK); handle->read_cb((uv_stream_t*)handle, 0, buf); } else { /* Ouch! serious error. */ @@ -1829,10 +1799,11 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { /* TODO: if the read callback stops reading we can't start reading again because the pipe will still be in nowait mode. */ - if (handle->flags & UV_HANDLE_READING) { + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { /* Switch back to blocking mode so that we can use IOCP for 0-reads */ mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + if (SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { /* Post another 0-read */ uv_pipe_queue_read(handle); } else { @@ -1851,12 +1822,9 @@ static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { +static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_write_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - handle->write_queue_size -= req->queued_bytes; if (req->cb) { @@ -1874,39 +1842,27 @@ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { - uv_pipe_instance_t* pipeInstance; +static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* raw_req) { + struct uv_pipe_accept_s* req = (struct uv_pipe_accept_s*) raw_req; assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->error.code == UV_OK) { - assert(req->data); + assert(req->pipeHandle != INVALID_HANDLE_VALUE); - /* Create the connection instance and add it to the connections list. */ - pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t)); - if (!pipeInstance) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - - pipeInstance->handle = req->data; - pipeInstance->state = UV_PIPEINSTANCE_CONNECTED; - pipeInstance->next = handle->connections; - handle->connections = pipeInstance; - - /* Clear the request. */ - req->data = NULL; - req->flags = 0; + req->next_pending = handle->pending_accepts; + handle->pending_accepts = req; if (handle->connection_cb) { handle->connection_cb((uv_handle_t*)handle, 0); } } else { - /* Ignore errors and continue listening */ - if (handle->flags & UV_HANDLE_LISTENING) { - uv_pipe_queue_accept(handle); + if (req->pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(req->pipeHandle); + req->pipeHandle = INVALID_HANDLE_VALUE; + } + if (!(handle->flags & UV_HANDLE_CLOSING)) { + uv_pipe_queue_accept(handle, req); } } @@ -1914,12 +1870,9 @@ static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { } -static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) { +static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_connect_t* req) { assert(handle->type == UV_NAMED_PIPE); - /* Mark the request non-pending */ - req->flags &= ~UV_REQ_PENDING; - if (req->cb) { if (req->error.code == UV_OK) { uv_init_connection((uv_stream_t*)handle); @@ -2172,10 +2125,12 @@ int uv_async_init(uv_async_t* handle, uv_async_cb async_cb) { handle->flags = 0; handle->async_sent = 0; handle->error = uv_ok_; + handle->async_cb = async_cb; req = &handle->async_req; - uv_req_init(req, (uv_handle_t*)handle, async_cb); + uv_req_init(req); req->type = UV_WAKEUP; + req->data = handle; uv_refs_++; @@ -2211,8 +2166,8 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) { assert(req->type == UV_WAKEUP); handle->async_sent = 0; - if (req->cb) { - ((uv_async_cb)req->cb)((uv_async_t*) handle, 0); + if (handle->async_cb) { + handle->async_cb((uv_async_t*) handle, 0); } if (handle->flags & UV_HANDLE_CLOSING) { uv_want_endgame((uv_handle_t*)handle); @@ -2220,15 +2175,15 @@ static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) { } -#define DELEGATE_STREAM_REQ(req, method) \ +#define DELEGATE_STREAM_REQ(req, method, handle_at) \ do { \ - switch (req->handle->type) { \ + switch (((uv_handle_t*) (req)->handle_at)->type) { \ case UV_TCP: \ - uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req); \ + uv_process_tcp_##method##_req((uv_tcp_t*) ((req)->handle_at), req); \ break; \ \ case UV_NAMED_PIPE: \ - uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req); \ + uv_process_pipe_##method##_req((uv_pipe_t*) ((req)->handle_at), req); \ break; \ \ default: \ @@ -2243,35 +2198,35 @@ static void uv_process_reqs() { while (req = uv_remove_pending_req()) { switch (req->type) { case UV_READ: - DELEGATE_STREAM_REQ(req, read); + DELEGATE_STREAM_REQ(req, read, data); break; case UV_WRITE: - DELEGATE_STREAM_REQ(req, write); + DELEGATE_STREAM_REQ((uv_write_t*) req, write, handle); break; case UV_ACCEPT: - DELEGATE_STREAM_REQ(req, accept); + DELEGATE_STREAM_REQ(req, accept, data); break; case UV_CONNECT: - DELEGATE_STREAM_REQ(req, connect); + DELEGATE_STREAM_REQ((uv_connect_t*) req, connect, handle); break; case UV_WAKEUP: - uv_process_async_wakeup_req((uv_async_t*) req->handle, req); + uv_process_async_wakeup_req((uv_async_t*) req->data, req); break; case UV_ARES_EVENT_REQ: - uv_process_ares_event_req((uv_ares_action_t*) req->handle, req); + uv_process_ares_event_req((uv_ares_action_t*) req->data, req); break; case UV_ARES_CLEANUP_REQ: - uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req); + uv_process_ares_cleanup_req((uv_ares_task_t*) req->data, req); break; case UV_GETADDRINFO_REQ: - uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req); + uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->data, req); break; default: @@ -2505,8 +2460,9 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) { selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0; uv_ares_req = &selhandle->ares_req; - uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL); + uv_req_init(uv_ares_req); uv_ares_req->type = UV_ARES_EVENT_REQ; + uv_ares_req->data = selhandle; /* post ares needs to called */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2551,8 +2507,9 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { /* Post request to cleanup the Task */ uv_ares_req = &uv_handle_ares->ares_req; - uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL); + uv_req_init(uv_ares_req); uv_ares_req->type = UV_ARES_CLEANUP_REQ; + uv_ares_req->data = uv_handle_ares; /* post ares done with socket - finish cleanup when all threads done. */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2981,7 +2938,8 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, } /* init request for Post handling */ - uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL); + uv_req_init(&handle->getadddrinfo_req); + handle->getadddrinfo_req.data = handle; handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ; /* Ask thread to run. Treat this as a long operation */ @@ -3007,6 +2965,7 @@ int uv_pipe_init(uv_pipe_t* handle) { handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; + handle->pending_accepts = NULL; uv_counters()->pipe_init++; @@ -3018,30 +2977,27 @@ int uv_pipe_init(uv_pipe_t* handle) { /* TODO: make this work with UTF8 name */ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { int i; + struct uv_pipe_accept_s* req; if (!name) { + uv_set_sys_error(WSAEINVAL); return -1; } - handle->connections = NULL; - - /* Initialize accept requests. */ - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - handle->accept_reqs[i].flags = 0; - handle->accept_reqs[i].type = UV_ACCEPT; - handle->accept_reqs[i].handle = (uv_handle_t*)handle; - handle->accept_reqs[i].cb = NULL; - handle->accept_reqs[i].data = NULL; - uv_counters()->req_init++; - } - /* Make our own copy of the pipe name */ - handle->name = (char*)malloc(MAX_PIPENAME_LEN); + handle->name = strdup(name); if (!handle->name) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } - strcpy(handle->name, name); - handle->name[255] = '\0'; + + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + req = &handle->accept_reqs[i]; + uv_req_init((uv_req_t*) req); + req->type = UV_ACCEPT; + req->data = handle; + req->pipeHandle = INVALID_HANDLE_VALUE; + req->next_pending = NULL; + } handle->flags |= UV_HANDLE_PIPESERVER; return 0; @@ -3050,9 +3006,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { /* Starts listening for connections for the given pipe. */ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { - int i, maxInstances, errno; - HANDLE pipeHandle; - uv_pipe_instance_t* pipeInstance; + int i, errno; if (handle->flags & UV_HANDLE_LISTENING || handle->flags & UV_HANDLE_READING) { @@ -3068,32 +3022,36 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - uv_pipe_queue_accept(handle); + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + uv_pipe_queue_accept(handle, &handle->accept_reqs[i]); + } + return 0; } /* TODO: make this work with UTF8 name */ -int uv_pipe_connect(uv_req_t* req, const char* name) { +/* TODO: run this in the thread pool */ +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb) { int errno; DWORD mode; - uv_pipe_t* handle = (uv_pipe_t*)req->handle; - - assert(!(req->flags & UV_REQ_PENDING)); + uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; - handle->connection = &handle->clientConnection; - handle->server = NULL; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); - handle->clientConnection.handle = CreateFile(name, - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - NULL); + handle->handle = CreateFile(name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); - if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && + if (handle->handle == INVALID_HANDLE_VALUE && GetLastError() != ERROR_IO_PENDING) { errno = GetLastError(); goto error; @@ -3101,12 +3059,12 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { + if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { errno = GetLastError(); goto error; } - if (CreateIoCompletionPort(handle->clientConnection.handle, + if (CreateIoCompletionPort(handle->handle, uv_iocp_, (ULONG_PTR)handle, 0) == NULL) { @@ -3115,16 +3073,16 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { } req->error = uv_ok_; - req->flags |= UV_REQ_PENDING; - handle->connection->state = UV_PIPEINSTANCE_ACTIVE; - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return 0; error: - close_pipe(handle, NULL, NULL); + if (handle->handle != INVALID_HANDLE_VALUE) { + CloseHandle(handle->handle); + } req->error = uv_new_sys_error(errno); - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return 0; } @@ -3132,107 +3090,19 @@ error: /* Cleans up uv_pipe_t (server or connection) and all resources associated with it */ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { - uv_pipe_instance_t* connection, *next, *cur, **prev; - HANDLE pipeHandle; int i; + HANDLE pipeHandle; if (handle->flags & UV_HANDLE_PIPESERVER) { - if (handle->flags & UV_HANDLE_CONNECTION) { - /* - * The handle is for a connection instance on the pipe server. - * To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t. - */ - - connection = handle->connection; - if (connection && connection->handle != INVALID_HANDLE_VALUE) { - /* Disconnect the connection intance and return it to pending state. */ - if (DisconnectNamedPipe(connection->handle)) { - if (status) *status = 0; - } else { - if (status) *status = -1; - if (err) *err = uv_new_sys_error(GetLastError()); - } - - connection->state = UV_PIPEINSTANCE_DISCONNECTED; - connection->handle = NULL; - - cur = handle->connections; - handle->connection = NULL; - prev = &handle->server->connections; - - /* Remove the connection from the list. */ - while (connection) { - if (cur == connection) { - *prev = connection->next; - free(connection); - break; - } else { - prev = &connection->next; - connection = connection->next; - } - } - - /* Queue accept now that the instance is in pending state. */ - if (!(handle->server->flags & UV_HANDLE_CLOSING)) { - uv_pipe_queue_accept(handle->server); - } + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + pipeHandle = handle->accept_reqs[i].pipeHandle; + if (pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(pipeHandle); } - } else { - /* - * The handle is for the pipe server. - * To clean-up we close every active and pending connection instance. - */ - - if (handle->name) { - free(handle->name); - handle->name = NULL; - } - - connection = handle->connections; - while (connection) { - pipeHandle = connection->handle; - - if (pipeHandle) { - DisconnectNamedPipe(pipeHandle); - CloseHandle(pipeHandle); - } - - next = connection->next; - free(connection); - connection = next; - } - - handle->connections = NULL; - - for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { - if (handle->accept_reqs[i].flags & UV_REQ_PENDING) { - pipeHandle = handle->accept_reqs[i].data; - assert(pipeHandle); - DisconnectNamedPipe(pipeHandle); - CloseHandle(pipeHandle); - handle->accept_reqs[i].flags = 0; - handle->reqs_pending--; - } - } - - if (status) *status = 0; } + } else { - /* - * The handle is for a connection instance on the pipe client. - * To clean-up we close the pipe handle. - */ - connection = handle->connection; - if (connection && connection->handle != INVALID_HANDLE_VALUE) { - if (CloseHandle(connection->handle)) { - connection->state = UV_PIPEINSTANCE_DISCONNECTED; - handle->connection = NULL; - if (status) *status = 0; - } else { - if (status) *status = -1; - if (err) *err = uv_new_sys_error(GetLastError()); - } - } + CloseHandle(handle->handle); } handle->flags |= UV_HANDLE_SHUT; diff --git a/test/test-connection-fail.c b/test/test-connection-fail.c index 6c748d8d..1c2d2121 100644 --- a/test/test-connection-fail.c +++ b/test/test-connection-fail.c @@ -67,7 +67,7 @@ static void timer_cb(uv_timer_t* handle, int status) { static void on_connect_with_close(uv_connect_t *req, int status) { - ASSERT(&tcp == req->handle); + ASSERT((uv_stream_t*) &tcp == req->handle); ASSERT(status == -1); ASSERT(uv_last_error().code == UV_ECONNREFUSED); connect_cb_calls++; diff --git a/test/test-getsockname.c b/test/test-getsockname.c index 9d265d37..f533806c 100644 --- a/test/test-getsockname.c +++ b/test/test-getsockname.c @@ -29,7 +29,7 @@ static int getsocknamecount = 0; static uv_tcp_t tcp; -static uv_req_t connect_req; +static uv_connect_t connect_req; static uv_tcp_t tcpServer; @@ -47,22 +47,23 @@ static void on_close(uv_handle_t* peer) { } -static void after_shutdown(uv_req_t* req, int status) { - uv_close(req->handle, on_close); +static void after_shutdown(uv_shutdown_t* req, int status) { + uv_close((uv_handle_t*) req->handle, on_close); free(req); } static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { - uv_req_t* req; + uv_shutdown_t* req; + int r; if (buf.base) { free(buf.base); } - req = (uv_req_t*) malloc(sizeof *req); - uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown); - uv_shutdown(req); + req = (uv_shutdown_t*) malloc(sizeof *req); + r = uv_shutdown(req, handle, after_shutdown); + ASSERT(r == 0); } @@ -102,16 +103,18 @@ static void on_connection(uv_handle_t* server, int status) { } -static void on_connect(void* req) { +static void on_connect(uv_connect_t* req, int status) { struct sockaddr sockname; int namelen = sizeof(sockname); - int status; + int r; - status = uv_getsockname(&tcp, &sockname, &namelen); - if (status != 0) { + ASSERT(status == 0); + + r = uv_getsockname(&tcp, &sockname, &namelen); + if (r != 0) { fprintf(stderr, "uv_getsockname error (connector) %d\n", uv_last_error().code); } - ASSERT(status == 0); + ASSERT(r == 0); getsocknamecount++; @@ -162,9 +165,7 @@ static void tcp_connector() { tcp.data = &connect_req; ASSERT(!r); - uv_req_init(&connect_req, (uv_handle_t*)(&tcp), (void *(*)(void *))on_connect); - - r = uv_tcp_connect(&connect_req, server_addr); + r = uv_tcp_connect(&connect_req, &tcp, server_addr, on_connect); ASSERT(!r); } diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index be3d97bd..e4d4f1d0 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -158,7 +158,7 @@ static void tcp_pinger_v6_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, + r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr, pinger_on_connect); ASSERT(!r); } @@ -180,10 +180,7 @@ static void tcp_pinger_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_on_connect); - - r = uv_tcp_connect(&pinger->connect_req, server_addr); + r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_on_connect); ASSERT(!r); } @@ -204,8 +201,7 @@ static void pipe_pinger_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr, - pinger_on_connect); + r = uv_pipe_connect(&pinger->connect_req, &pinger->pipe, TEST_PIPENAME, pinger_on_connect); ASSERT(!r); }