From 8448ee4bf62f0a7ae7cab389e3cf4b473669ebd4 Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Thu, 11 Aug 2011 13:18:07 -0700 Subject: [PATCH] Windows: Do simultaneous pending AcceptEx calls. --- include/uv-win.h | 13 +++++-- src/win/tcp.c | 95 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 76 insertions(+), 32 deletions(-) diff --git a/include/uv-win.h b/include/uv-win.h index c4871d95..83dbebe3 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -75,7 +75,13 @@ typedef struct uv_buf_t { UV_REQ_FIELDS \ HANDLE pipeHandle; \ struct uv_pipe_accept_s* next_pending; \ - } uv_pipe_accept_t; + } uv_pipe_accept_t; \ + typedef struct uv_tcp_accept_s { \ + UV_REQ_FIELDS \ + SOCKET accept_socket; \ + char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ + struct uv_tcp_accept_s* next_pending; \ + } uv_tcp_accept_t; #define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ @@ -95,9 +101,8 @@ typedef struct uv_buf_t { }; #define uv_tcp_server_fields \ - SOCKET accept_socket; \ - char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \ - struct uv_req_s accept_req; + uv_tcp_accept_t* accept_reqs; \ + uv_tcp_accept_t* pending_accepts; #define uv_tcp_connection_fields \ uv_buf_t read_buffer; diff --git a/src/win/tcp.c b/src/win/tcp.c index c7a647a6..548f6cb4 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -109,6 +109,12 @@ const unsigned int uv_active_tcp_streams_threshold = 50; +/* + * Number of simultaneous pending AcceptEx calls. + */ +const unsigned int uv_simultaneous_server_accepts = 32; + + /* Pointers to winsock extension functions to be retrieved dynamically */ static LPFN_CONNECTEX pConnectEx; static LPFN_ACCEPTEX pAcceptEx; @@ -293,10 +299,11 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) { int uv_tcp_init(uv_tcp_t* handle) { uv_stream_init((uv_stream_t*)handle); + handle->accept_reqs = NULL; + handle->pending_accepts = NULL; handle->socket = INVALID_SOCKET; handle->type = UV_TCP; handle->reqs_pending = 0; - handle->accept_socket = INVALID_SOCKET; uv_counters()->tcp_init++; @@ -308,7 +315,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) { uv_err_t err; int status; - if (handle->flags & UV_HANDLE_SHUTTING && + if (handle->flags & UV_HANDLE_CONNECTION && + handle->flags & UV_HANDLE_SHUTTING && !(handle->flags & UV_HANDLE_SHUT) && handle->write_reqs_pending == 0) { @@ -333,6 +341,11 @@ void uv_tcp_endgame(uv_tcp_t* handle) { assert(!(handle->flags & UV_HANDLE_CLOSED)); handle->flags |= UV_HANDLE_CLOSED; + if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) { + free(handle->accept_reqs); + handle->accept_reqs = NULL; + } + if (handle->close_cb) { handle->close_cb((uv_handle_t*)handle); } @@ -407,8 +420,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) { } -static void uv_tcp_queue_accept(uv_tcp_t* handle) { - uv_req_t* req; +static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { BOOL success; DWORD bytes; SOCKET accept_socket; @@ -416,10 +428,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { LPFN_ACCEPTEX pAcceptExFamily; assert(handle->flags & UV_HANDLE_LISTENING); - assert(handle->accept_socket == INVALID_SOCKET); - - /* Prepare the uv_req structure. */ - req = &handle->accept_req; + assert(req->accept_socket == INVALID_SOCKET); /* choose family and extension function */ if ((handle->flags & UV_HANDLE_IPV6) != 0) { @@ -434,7 +443,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { accept_socket = socket(family, SOCK_STREAM, 0); if (accept_socket == INVALID_SOCKET) { req->error = uv_new_sys_error(WSAGetLastError()); - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*)req); handle->reqs_pending++; return; } @@ -444,7 +453,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { success = pAcceptExFamily(handle->socket, accept_socket, - (void*)&handle->accept_buffer, + (void*)req->accept_buffer, 0, sizeof(struct sockaddr_storage), sizeof(struct sockaddr_storage), @@ -453,17 +462,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) { if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { /* Process the req without IOCP. */ - handle->accept_socket = accept_socket; + req->accept_socket = accept_socket; handle->reqs_pending++; uv_insert_pending_req((uv_req_t*)req); } else if (UV_SUCCEEDED_WITH_IOCP(success)) { /* The req will be processed with IOCP. */ - handle->accept_socket = accept_socket; + req->accept_socket = accept_socket; handle->reqs_pending++; } else { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(WSAGetLastError()); - uv_insert_pending_req(req); + uv_insert_pending_req((uv_req_t*)req); handle->reqs_pending++; /* Destroy the preallocated client socket. */ closesocket(accept_socket); @@ -527,6 +536,9 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { + unsigned int i; + uv_tcp_accept_t* req; + assert(backlog > 0); if (handle->flags & UV_HANDLE_BIND_ERROR) { @@ -553,10 +565,21 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { handle->flags |= UV_HANDLE_LISTENING; handle->connection_cb = cb; - uv_req_init(&(handle->accept_req)); - handle->accept_req.type = UV_ACCEPT; - handle->accept_req.data = handle; - uv_tcp_queue_accept(handle); + assert(!handle->accept_reqs); + handle->accept_reqs = (uv_tcp_accept_t*) + malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t)); + if (!handle->accept_reqs) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + for (i = 0; i < uv_simultaneous_server_accepts; i++) { + req = &handle->accept_reqs[i]; + uv_req_init((uv_req_t*)req); + req->type = UV_ACCEPT; + req->accept_socket = INVALID_SOCKET; + req->data = handle; + uv_tcp_queue_accept(handle, req); + } return 0; } @@ -565,22 +588,33 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { int rv = 0; - if (server->accept_socket == INVALID_SOCKET) { + uv_tcp_accept_t* req = server->pending_accepts; + + if (!req) { + /* No valid connections found, so we error out. */ + uv_set_sys_error(WSAEWOULDBLOCK); + return -1; + } + + if (req->accept_socket == INVALID_SOCKET) { uv_set_sys_error(WSAENOTCONN); return -1; } - if (uv_tcp_set_socket(client, server->accept_socket) == -1) { - closesocket(server->accept_socket); + if (uv_tcp_set_socket(client, req->accept_socket) == -1) { + closesocket(req->accept_socket); rv = -1; } else { uv_connection_init((uv_stream_t*)client); } - server->accept_socket = INVALID_SOCKET; + /* Prepare the req to pick up a new connection */ + server->pending_accepts = req->next_pending; + req->next_pending = NULL; + req->accept_socket = INVALID_SOCKET; if (!(server->flags & UV_HANDLE_CLOSING)) { - uv_tcp_queue_accept(server); + uv_tcp_queue_accept(server, req); } active_tcp_streams++; @@ -905,14 +939,16 @@ void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) { } -void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { +void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* raw_req) { + uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req; + assert(handle->type == UV_TCP); /* If handle->accepted_socket is not a valid socket, then */ /* uv_queue_accept must have failed. This is a serious error. We stop */ /* accepting connections and report this error to the connection */ /* callback. */ - if (handle->accept_socket == INVALID_SOCKET) { + if (req->accept_socket == INVALID_SOCKET) { if (handle->flags & UV_HANDLE_LISTENING) { handle->flags &= ~UV_HANDLE_LISTENING; if (handle->connection_cb) { @@ -921,11 +957,14 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { } } } else if (req->error.code == UV_OK && - setsockopt(handle->accept_socket, + setsockopt(req->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&handle->socket, sizeof(handle->socket)) == 0) { + req->next_pending = handle->pending_accepts; + handle->pending_accepts = req; + /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */ if (handle->connection_cb) { handle->connection_cb((uv_stream_t*)handle, 0); @@ -934,10 +973,10 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { /* Error related to accepted socket is ignored because the server */ /* socket may still be healthy. If the server socket is broken /* uv_queue_accept will detect it. */ - closesocket(handle->accept_socket); - handle->accept_socket = INVALID_SOCKET; + closesocket(req->accept_socket); + req->accept_socket = INVALID_SOCKET; if (handle->flags & UV_HANDLE_LISTENING) { - uv_tcp_queue_accept(handle); + uv_tcp_queue_accept(handle, req); } }