diff --git a/include/uv-win.h b/include/uv-win.h index 31306b55..2d6093aa 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -46,13 +46,18 @@ typedef struct uv_buf_t { * Private uv_pipe_instance state. */ typedef enum { - UV_PIPEINSTANCE_DISCONNECTED = 0, - UV_PIPEINSTANCE_PENDING, - UV_PIPEINSTANCE_WAITING, - UV_PIPEINSTANCE_ACCEPTED, + UV_PIPEINSTANCE_CONNECTED = 0, + UV_PIPEINSTANCE_DISCONNECTED, UV_PIPEINSTANCE_ACTIVE } uv_pipeinstance_state; +/* Used to store active pipe instances inside a linked list. */ +typedef struct uv_pipe_instance_s { + HANDLE handle; + uv_pipeinstance_state state; + struct uv_pipe_instance_s* next; +} uv_pipe_instance_t; + #define UV_REQ_PRIVATE_FIELDS \ union { \ /* Used by I/O operations */ \ @@ -93,23 +98,14 @@ typedef enum { #define uv_pipe_server_fields \ char* name; \ - int connectionCount; \ uv_pipe_instance_t* connections; \ - uv_pipe_instance_t* acceptConnection; \ - uv_pipe_instance_t connectionsBuffer[4]; + struct uv_req_s accept_reqs[4]; #define uv_pipe_connection_fields \ uv_pipe_t* server; \ uv_pipe_instance_t* connection; \ uv_pipe_instance_t clientConnection; -#define UV_PIPE_PRIVATE_TYPEDEF \ - typedef struct uv_pipe_instance_s { \ - HANDLE handle; \ - uv_pipeinstance_state state; \ - uv_req_t accept_req; \ - } uv_pipe_instance_t; - #define UV_PIPE_PRIVATE_FIELDS \ union { \ struct { uv_pipe_server_fields }; \ diff --git a/include/uv.h b/include/uv.h index e5eada6c..d470a259 100644 --- a/include/uv.h +++ b/include/uv.h @@ -298,19 +298,17 @@ int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen); /* * A subclass of uv_stream_t representing a pipe stream or pipe server. */ -UV_PIPE_PRIVATE_TYPEDEF - -struct uv_pipe_s { - UV_HANDLE_FIELDS - UV_STREAM_FIELDS - UV_PIPE_PRIVATE_FIELDS +struct uv_pipe_s { + UV_HANDLE_FIELDS + UV_STREAM_FIELDS + UV_PIPE_PRIVATE_FIELDS }; int uv_pipe_init(uv_pipe_t* handle); int uv_pipe_bind(uv_pipe_t* handle, const char* name); -int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb); +int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb); int uv_pipe_connect(uv_req_t* req, const char* name); @@ -494,6 +492,7 @@ extern uint64_t uv_hrtime(void); /* the presence of this union forces similar struct layout */ union uv_any_handle { uv_tcp_t tcp; + uv_pipe_t pipe; uv_prepare_t prepare; uv_check_t check; uv_idle_t idle; diff --git a/src/uv-common.h b/src/uv-common.h index 74b4c4e3..fbe47973 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -29,6 +29,8 @@ #include "uv.h" +#define COUNTOF(a) (sizeof(a) / sizeof(a[0])) + /* * Subclass of uv_handle_t. Used for integration of c-ares. */ diff --git a/src/uv-unix.c b/src/uv-unix.c index b9cd6b88..ab8a5e6a 100644 --- a/src/uv-unix.c +++ b/src/uv-unix.c @@ -1629,7 +1629,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { } -int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { +int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { assert(0 && "implement me"); } diff --git a/src/uv-win.c b/src/uv-win.c index dad3e438..7166e37f 100644 --- a/src/uv-win.c +++ b/src/uv-win.c @@ -377,6 +377,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ERROR_INVALID_PARAMETER: return UV_EINVAL; case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET; case ERROR_BROKEN_PIPE: return UV_EOF; + case ERROR_PIPE_BUSY: return UV_EBUSY; default: return UV_UNKNOWN; } } @@ -531,11 +532,6 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) { } -static uv_pipe_instance_t* uv_req_to_pipeinstance(uv_req_t* req) { - return CONTAINING_RECORD(req, uv_pipe_instance_t, accept_req); -} - - static void uv_insert_pending_req(uv_req_t* req) { req->next_req = NULL; if (uv_pending_reqs_tail_) { @@ -1009,29 +1005,38 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { static void uv_pipe_queue_accept(uv_pipe_t* handle) { uv_req_t* req; - uv_pipe_instance_t* instance; + HANDLE pipeHandle; int i; assert(handle->flags & UV_HANDLE_LISTENING); - /* This loop goes through every pipe instance and calls ConnectNamedPipe for every pending instance. - * TODO: Make this faster (we could maintain a linked list of pending instances). - */ - for (i = 0; i < handle->connectionCount; i++) { - instance = &handle->connections[i]; + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + req = &handle->accept_reqs[i]; + if (!(req->flags & UV_REQ_PENDING)) { + pipeHandle = CreateNamedPipe(handle->name, + PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, + PIPE_UNLIMITED_INSTANCES, + 65536, + 65536, + 0, + NULL); - if (instance->state == UV_PIPEINSTANCE_PENDING) { - /* Prepare the uv_req structure. */ - req = &instance->accept_req; - uv_req_init(req, (uv_handle_t*)handle, NULL); - assert(!(req->flags & UV_REQ_PENDING)); - req->type = UV_ACCEPT; - req->flags |= UV_REQ_PENDING; + if (pipeHandle == INVALID_HANDLE_VALUE) { + continue; + } + + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + continue; + } /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); - if (!ConnectNamedPipe(instance->handle, &req->overlapped) && + if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(GetLastError()); @@ -1040,9 +1045,9 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle) { continue; } - instance->state = UV_PIPEINSTANCE_WAITING; - handle->reqs_pending++; + req->data = pipeHandle; req->flags |= UV_REQ_PENDING; + handle->reqs_pending++; } } } @@ -1174,16 +1179,28 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { - assert(server->acceptConnection); + uv_pipe_instance_t* connection = server->connections; + + /* Find a connection instance that has been connected, but not yet accepted. */ + while (connection) { + if (connection->state == UV_PIPEINSTANCE_CONNECTED) { + break; + } + + connection = connection->next; + } + + if (!connection) { + /* No valid connections found, so we error out. */ + uv_set_sys_error(UV_ENOTCONN); + return -1; + } /* Make the connection instance active */ - server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE; - - /* Move the connection instance from server to client */ - client->connection = server->acceptConnection; - server->acceptConnection = NULL; - - /* Remember the server */ + connection->state = UV_PIPEINSTANCE_ACTIVE; + + /* Assign the connection to the client. */ + client->connection = connection; client->server = server; uv_init_connection((uv_stream_t*)client); @@ -1858,15 +1875,30 @@ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { + uv_pipe_instance_t* pipeInstance; + assert(handle->type == UV_NAMED_PIPE); /* Mark the request non-pending */ req->flags &= ~UV_REQ_PENDING; if (req->error.code == UV_OK) { - /* Put the connection instance into accept state */ - handle->acceptConnection = uv_req_to_pipeinstance(req); - handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED; + assert(req->data); + + /* Create the connection instance and add it to the connections list. */ + pipeInstance = (uv_pipe_instance_t*)malloc(sizeof(uv_pipe_instance_t)); + if (!pipeInstance) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + pipeInstance->handle = req->data; + pipeInstance->state = UV_PIPEINSTANCE_CONNECTED; + pipeInstance->next = handle->connections; + handle->connections = pipeInstance; + + /* Clear the request. */ + req->data = NULL; + req->flags = 0; if (handle->connection_cb) { handle->connection_cb((uv_handle_t*)handle, 0); @@ -2985,13 +3017,23 @@ int uv_pipe_init(uv_pipe_t* handle) { /* Creates a pipe server. */ /* TODO: make this work with UTF8 name */ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { + int i; + if (!name) { return -1; } handle->connections = NULL; - handle->acceptConnection = NULL; - handle->connectionCount = 0; + + /* Initialize accept requests. */ + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + handle->accept_reqs[i].flags = 0; + handle->accept_reqs[i].type = UV_ACCEPT; + handle->accept_reqs[i].handle = (uv_handle_t*)handle; + handle->accept_reqs[i].cb = NULL; + handle->accept_reqs[i].data = NULL; + uv_counters()->req_init++; + } /* Make our own copy of the pipe name */ handle->name = (char*)malloc(MAX_PIPENAME_LEN); @@ -3007,7 +3049,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) { /* Starts listening for connections for the given pipe. */ -int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { +int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { int i, maxInstances, errno; HANDLE pipeHandle; uv_pipe_instance_t* pipeInstance; @@ -3023,61 +3065,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { return -1; } - if (instanceCount <= sizeof(handle->connectionsBuffer)) { - /* Use preallocated connections buffer */ - handle->connections = handle->connectionsBuffer; - } else { - handle->connections = (uv_pipe_instance_t*)malloc(instanceCount * sizeof(uv_pipe_instance_t)); - if (!handle->connections) { - uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); - } - } - - maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount; - - for (i = 0; i < instanceCount; i++) { - pipeHandle = CreateNamedPipe(handle->name, - PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, - maxInstances, - 65536, - 65536, - 0, - NULL); - - if (pipeHandle == INVALID_HANDLE_VALUE) { - errno = GetLastError(); - goto error; - } - - if (CreateIoCompletionPort(pipeHandle, - uv_iocp_, - (ULONG_PTR)handle, - 0) == NULL) { - errno = GetLastError(); - goto error; - } - - pipeInstance = &handle->connections[i]; - pipeInstance->handle = pipeHandle; - pipeInstance->state = UV_PIPEINSTANCE_PENDING; - } - - /* We don't need the pipe name anymore. */ - free(handle->name); - handle->name = NULL; - - handle->connectionCount = instanceCount; handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; uv_pipe_queue_accept(handle); return 0; - -error: - close_pipe(handle, NULL, NULL); - uv_set_sys_error(errno); - return -1; } /* TODO: make this work with UTF8 name */ @@ -3140,28 +3132,46 @@ error: /* Cleans up uv_pipe_t (server or connection) and all resources associated with it */ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { - uv_pipe_instance_t* connection; + uv_pipe_instance_t* connection, *next, *cur, **prev; + HANDLE pipeHandle; int i; if (handle->flags & UV_HANDLE_PIPESERVER) { if (handle->flags & UV_HANDLE_CONNECTION) { /* * The handle is for a connection instance on the pipe server. - * To clean-up, we call DisconnectNamedPipe, and return the instance to pending state, - * which will be ready to accept another pipe connection in uv_pipe_queue_accept. + * To clean-up, we call DisconnectNamedPipe, and then uv_pipe_queue_accept will cleanup the allocated uv_pipe_instance_t. */ + connection = handle->connection; - if (connection && connection->state != UV_PIPEINSTANCE_PENDING && connection->handle != INVALID_HANDLE_VALUE) { - /* Disconnect the connection intance and return it to pending state */ + if (connection && connection->handle != INVALID_HANDLE_VALUE) { + /* Disconnect the connection intance and return it to pending state. */ if (DisconnectNamedPipe(connection->handle)) { - connection->state = UV_PIPEINSTANCE_PENDING; - handle->connection = NULL; if (status) *status = 0; } else { if (status) *status = -1; if (err) *err = uv_new_sys_error(GetLastError()); } + connection->state = UV_PIPEINSTANCE_DISCONNECTED; + connection->handle = NULL; + + cur = handle->connections; + handle->connection = NULL; + prev = &handle->server->connections; + + /* Remove the connection from the list. */ + while (connection) { + if (cur == connection) { + *prev = connection->next; + free(connection); + break; + } else { + prev = &connection->next; + connection = connection->next; + } + } + /* Queue accept now that the instance is in pending state. */ if (!(handle->server->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(handle->server); @@ -3170,7 +3180,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { } else { /* * The handle is for the pipe server. - * To clean-up we close every connection instance that was made in uv_pipe_listen. + * To clean-up we close every active and pending connection instance. */ if (handle->name) { @@ -3178,23 +3188,31 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { handle->name = NULL; } - if (handle->connections) { - /* Go through the list of connections, and close each one with CloseHandle. */ - for (i = 0; i < handle->connectionCount; i++) { - connection = &handle->connections[i]; - if (connection->state != UV_PIPEINSTANCE_DISCONNECTED && connection->handle != INVALID_HANDLE_VALUE) { - CloseHandle(connection->handle); - connection->state = UV_PIPEINSTANCE_DISCONNECTED; - connection->handle = INVALID_HANDLE_VALUE; - } + connection = handle->connections; + while (connection) { + pipeHandle = connection->handle; + + if (pipeHandle) { + DisconnectNamedPipe(pipeHandle); + CloseHandle(pipeHandle); } - /* Free the connections buffer. */ - if (handle->connections != handle->connectionsBuffer) { - free(handle->connections); - } + next = connection->next; + free(connection); + connection = next; + } - handle->connections = NULL; + handle->connections = NULL; + + for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { + if (handle->accept_reqs[i].flags & UV_REQ_PENDING) { + pipeHandle = handle->accept_reqs[i].data; + assert(pipeHandle); + DisconnectNamedPipe(pipeHandle); + CloseHandle(pipeHandle); + handle->accept_reqs[i].flags = 0; + handle->reqs_pending--; + } } if (status) *status = 0; @@ -3202,7 +3220,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { } else { /* * The handle is for a connection instance on the pipe client. - * To clean-up + * To clean-up we close the pipe handle. */ connection = handle->connection; if (connection && connection->handle != INVALID_HANDLE_VALUE) { diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index 2d48c8ab..ad3676d7 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -265,6 +265,13 @@ static void maybe_connect_some() { uv_req_init(req, (uv_handle_t*)pipe, connect_cb); r = uv_pipe_connect(req, TEST_PIPENAME); ASSERT(r == 0); + +#ifdef _WIN32 + /* HACK: This is temporary to give the pipes server enough time to create new handles. + * This will go away once uv_pipe_connect can deal with UV_EBUSY. + */ + Sleep(1); +#endif } } } @@ -403,7 +410,7 @@ HELPER_IMPL(pipe_pump_server) { ASSERT(r == 0); r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); ASSERT(r == 0); - r = uv_pipe_listen(&pipeServer, MAX_WRITE_HANDLES, connection_cb); + r = uv_pipe_listen(&pipeServer, connection_cb); ASSERT(r == 0); uv_run(); diff --git a/test/echo-server.c b/test/echo-server.c index 0809858a..4dc0e20c 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -247,7 +247,7 @@ static int pipe_echo_start(char* pipeName) { return 1; } - r = uv_pipe_listen(&pipeServer, 1, on_connection); + r = uv_pipe_listen(&pipeServer, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error on IPv6\n");