Windows: Do simultaneous pending AcceptEx calls.

This commit is contained in:
Igor Zinkovsky 2011-08-11 13:18:07 -07:00
parent 123119342f
commit 8448ee4bf6
2 changed files with 76 additions and 32 deletions

View File

@ -75,7 +75,13 @@ typedef struct uv_buf_t {
UV_REQ_FIELDS \
HANDLE pipeHandle; \
struct uv_pipe_accept_s* next_pending; \
} uv_pipe_accept_t;
} uv_pipe_accept_t; \
typedef struct uv_tcp_accept_s { \
UV_REQ_FIELDS \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
@ -95,9 +101,8 @@ typedef struct uv_buf_t {
};
#define uv_tcp_server_fields \
SOCKET accept_socket; \
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32]; \
struct uv_req_s accept_req;
uv_tcp_accept_t* accept_reqs; \
uv_tcp_accept_t* pending_accepts;
#define uv_tcp_connection_fields \
uv_buf_t read_buffer;

View File

@ -109,6 +109,12 @@
const unsigned int uv_active_tcp_streams_threshold = 50;
/*
* Number of simultaneous pending AcceptEx calls.
*/
const unsigned int uv_simultaneous_server_accepts = 32;
/* Pointers to winsock extension functions to be retrieved dynamically */
static LPFN_CONNECTEX pConnectEx;
static LPFN_ACCEPTEX pAcceptEx;
@ -293,10 +299,11 @@ static int uv_tcp_set_socket(uv_tcp_t* handle, SOCKET socket) {
int uv_tcp_init(uv_tcp_t* handle) {
uv_stream_init((uv_stream_t*)handle);
handle->accept_reqs = NULL;
handle->pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->type = UV_TCP;
handle->reqs_pending = 0;
handle->accept_socket = INVALID_SOCKET;
uv_counters()->tcp_init++;
@ -308,7 +315,8 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
uv_err_t err;
int status;
if (handle->flags & UV_HANDLE_SHUTTING &&
if (handle->flags & UV_HANDLE_CONNECTION &&
handle->flags & UV_HANDLE_SHUTTING &&
!(handle->flags & UV_HANDLE_SHUT) &&
handle->write_reqs_pending == 0) {
@ -333,6 +341,11 @@ void uv_tcp_endgame(uv_tcp_t* handle) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
handle->flags |= UV_HANDLE_CLOSED;
if (!(handle->flags & UV_HANDLE_CONNECTION) && handle->accept_reqs) {
free(handle->accept_reqs);
handle->accept_reqs = NULL;
}
if (handle->close_cb) {
handle->close_cb((uv_handle_t*)handle);
}
@ -407,8 +420,7 @@ int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
}
static void uv_tcp_queue_accept(uv_tcp_t* handle) {
uv_req_t* req;
static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
BOOL success;
DWORD bytes;
SOCKET accept_socket;
@ -416,10 +428,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
LPFN_ACCEPTEX pAcceptExFamily;
assert(handle->flags & UV_HANDLE_LISTENING);
assert(handle->accept_socket == INVALID_SOCKET);
/* Prepare the uv_req structure. */
req = &handle->accept_req;
assert(req->accept_socket == INVALID_SOCKET);
/* choose family and extension function */
if ((handle->flags & UV_HANDLE_IPV6) != 0) {
@ -434,7 +443,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
accept_socket = socket(family, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
return;
}
@ -444,7 +453,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
success = pAcceptExFamily(handle->socket,
accept_socket,
(void*)&handle->accept_buffer,
(void*)req->accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
@ -453,17 +462,17 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle) {
if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
/* Process the req without IOCP. */
handle->accept_socket = accept_socket;
req->accept_socket = accept_socket;
handle->reqs_pending++;
uv_insert_pending_req((uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
/* The req will be processed with IOCP. */
handle->accept_socket = accept_socket;
req->accept_socket = accept_socket;
handle->reqs_pending++;
} else {
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
uv_insert_pending_req((uv_req_t*)req);
handle->reqs_pending++;
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
@ -527,6 +536,9 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
unsigned int i;
uv_tcp_accept_t* req;
assert(backlog > 0);
if (handle->flags & UV_HANDLE_BIND_ERROR) {
@ -553,10 +565,21 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
handle->flags |= UV_HANDLE_LISTENING;
handle->connection_cb = cb;
uv_req_init(&(handle->accept_req));
handle->accept_req.type = UV_ACCEPT;
handle->accept_req.data = handle;
uv_tcp_queue_accept(handle);
assert(!handle->accept_reqs);
handle->accept_reqs = (uv_tcp_accept_t*)
malloc(uv_simultaneous_server_accepts * sizeof(uv_tcp_accept_t));
if (!handle->accept_reqs) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
req = &handle->accept_reqs[i];
uv_req_init((uv_req_t*)req);
req->type = UV_ACCEPT;
req->accept_socket = INVALID_SOCKET;
req->data = handle;
uv_tcp_queue_accept(handle, req);
}
return 0;
}
@ -565,22 +588,33 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0;
if (server->accept_socket == INVALID_SOCKET) {
uv_tcp_accept_t* req = server->pending_accepts;
if (!req) {
/* No valid connections found, so we error out. */
uv_set_sys_error(WSAEWOULDBLOCK);
return -1;
}
if (req->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}
if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
closesocket(server->accept_socket);
if (uv_tcp_set_socket(client, req->accept_socket) == -1) {
closesocket(req->accept_socket);
rv = -1;
} else {
uv_connection_init((uv_stream_t*)client);
}
server->accept_socket = INVALID_SOCKET;
/* Prepare the req to pick up a new connection */
server->pending_accepts = req->next_pending;
req->next_pending = NULL;
req->accept_socket = INVALID_SOCKET;
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_tcp_queue_accept(server);
uv_tcp_queue_accept(server, req);
}
active_tcp_streams++;
@ -905,14 +939,16 @@ void uv_process_tcp_write_req(uv_tcp_t* handle, uv_write_t* req) {
}
void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* raw_req) {
uv_tcp_accept_t* req = (uv_tcp_accept_t*) raw_req;
assert(handle->type == UV_TCP);
/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
/* callback. */
if (handle->accept_socket == INVALID_SOCKET) {
if (req->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
@ -921,11 +957,14 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
}
}
} else if (req->error.code == UV_OK &&
setsockopt(handle->accept_socket,
setsockopt(req->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
req->next_pending = handle->pending_accepts;
handle->pending_accepts = req;
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb((uv_stream_t*)handle, 0);
@ -934,10 +973,10 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
handle->accept_socket = INVALID_SOCKET;
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle);
uv_tcp_queue_accept(handle, req);
}
}