Refactor request handling on Windows

- keep a list of completed reqs to be returned.
- do not implicitly close a socket after error or eof+shutdown.
This commit is contained in:
Bert Belder 2011-06-07 18:07:28 +02:00
parent cfca30433f
commit e58a1abff0
2 changed files with 198 additions and 119 deletions

313
uv-win.c
View File

@ -164,6 +164,11 @@ static uv_handle_t* uv_next_loop_handle_ = NULL;
static uv_handle_t* uv_endgame_handles_ = NULL;
/* Tail of a single-linked circular queue of pending reqs. If the queue is */
/* empty, tail_ is NULL. If there is only one item, tail_->next_req == tail_ */
static uv_req_t* uv_pending_reqs_tail_ = NULL;
/* The current time according to the event loop. in msecs. */
static int64_t uv_now_ = 0;
static int64_t uv_ticks_per_msec_ = 0;
@ -417,6 +422,39 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) {
}
static void uv_insert_pending_req(uv_req_t* req) {
req->next_req = NULL;
if (uv_pending_reqs_tail_) {
req->next_req = uv_pending_reqs_tail_->next_req;
uv_pending_reqs_tail_ = req;
} else {
req->next_req = req;
uv_pending_reqs_tail_ = req;
}
}
static uv_req_t* uv_remove_pending_req() {
uv_req_t* req;
if (uv_pending_reqs_tail_) {
req = uv_pending_reqs_tail_->next_req;
if (req == uv_pending_reqs_tail_) {
uv_pending_reqs_tail_ = NULL;
} else {
uv_pending_reqs_tail_->next_req = req->next_req;
}
return req;
} else {
/* queue empty */
return NULL;
}
}
static int uv_tcp_init_socket(uv_tcp_t* handle, uv_close_cb close_cb,
void* data, SOCKET socket) {
DWORD yes = 1;
@ -509,15 +547,6 @@ static void uv_tcp_endgame(uv_tcp_t* handle) {
handle->reqs_pending--;
}
if (handle->flags & UV_HANDLE_EOF &&
handle->flags & UV_HANDLE_SHUT &&
!(handle->flags & UV_HANDLE_CLOSING)) {
/* Because uv_close will add the handle to the endgame_handles list, */
/* return here and call the close cb the next time. */
uv_close((uv_handle_t*)handle);
return;
}
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
assert(!(handle->flags & UV_HANDLE_CLOSED));
@ -576,7 +605,7 @@ static void uv_async_endgame(uv_async_t* handle) {
}
static void uv_call_endgames() {
static void uv_process_endgames() {
uv_handle_t* handle;
while (uv_endgame_handles_) {
@ -731,17 +760,21 @@ static void uv_queue_accept(uv_tcp_t* handle) {
assert(handle->flags & UV_HANDLE_LISTENING);
assert(handle->accept_socket == INVALID_SOCKET);
accept_socket = socket(AF_INET, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
uv_close_error((uv_handle_t*)handle, uv_new_sys_error(WSAGetLastError()));
return;
}
/* Prepare the uv_req and OVERLAPPED structures. */
/* Prepare the uv_req structure. */
req = &handle->accept_req;
assert(!(req->flags & UV_REQ_PENDING));
req->type = UV_ACCEPT;
req->flags |= UV_REQ_PENDING;
/* Open a socket for the accepted connection. */
accept_socket = socket(AF_INET, SOCK_STREAM, 0);
if (accept_socket == INVALID_SOCKET) {
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
return;
}
/* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped));
success = pAcceptEx(handle->socket,
@ -754,11 +787,11 @@ static void uv_queue_accept(uv_tcp_t* handle) {
&req->overlapped);
if (!success && WSAGetLastError() != ERROR_IO_PENDING) {
uv_set_sys_error(WSAGetLastError());
/* destroy the preallocated client handle */
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
/* Destroy the preallocated client socket. */
closesocket(accept_socket);
/* destroy ourselves */
uv_close_error((uv_handle_t*)handle, uv_last_error_);
return;
}
@ -794,8 +827,9 @@ static void uv_queue_read(uv_tcp_t* handle) {
&req->overlapped,
NULL);
if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) {
uv_set_sys_error(WSAGetLastError());
uv_close_error((uv_handle_t*)handle, uv_last_error_);
/* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError());
uv_insert_pending_req(req);
return;
}
@ -1034,7 +1068,6 @@ int uv_shutdown(uv_req_t* req) {
static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
BOOL success;
DWORD bytes, flags, err;
uv_buf_t buf;
@ -1045,29 +1078,31 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
switch (req->type) {
case UV_WRITE:
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
handle->write_queue_size -= req->queued_bytes;
if (!success) {
uv_set_sys_error(GetLastError());
uv_close_error((uv_handle_t*)handle, uv_last_error_);
}
if (req->cb) {
((uv_write_cb)req->cb)(req, success ? 0 : -1);
uv_last_error_ = req->error;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1);
}
handle->write_reqs_pending--;
if (success &&
handle->write_reqs_pending == 0 &&
if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
uv_want_endgame((uv_handle_t*)handle);
}
break;
case UV_READ:
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
if (!success) {
uv_set_sys_error(GetLastError());
uv_close_error((uv_handle_t*)handle, uv_last_error_);
if (req->error.code != UV_OK) {
/* An error occurred doing the 0-read. */
/* Stop reading and report error. */
handle->flags &= ~UV_HANDLE_READING;
uv_last_error_ = req->error;
buf.base = 0;
buf.len = 0;
((uv_read_cb)handle->read_cb)(handle, -1, buf);
break;
}
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb(handle, 65536);
assert(buf.len > 0);
@ -1093,47 +1128,57 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
uv_last_error_.code = UV_EOF;
uv_last_error_.sys_errno_ = ERROR_SUCCESS;
((uv_read_cb)handle->read_cb)(handle, -1, buf);
uv_want_endgame((uv_handle_t*)handle);
break;
}
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
/* 0-byte read */
/* Read buffer was completely empty, report a 0-byte read. */
uv_set_sys_error(WSAEWOULDBLOCK);
((uv_read_cb)handle->read_cb)(handle, 0, buf);
} else {
/* Ouch! serious error. */
uv_set_sys_error(err);
uv_close_error((uv_handle_t*)handle, uv_last_error_);
((uv_read_cb)handle->read_cb)(handle, -1, buf);
}
break;
}
}
/* Post another 0-read if still reading and not closing */
/* Post another 0-read if still reading and not closing. */
if (!(handle->flags & UV_HANDLE_CLOSING) &&
!(handle->flags & UV_HANDLE_EOF) &&
handle->flags & UV_HANDLE_READING) {
uv_queue_read(handle);
}
break;
case UV_ACCEPT:
assert(handle->accept_socket != INVALID_SOCKET);
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
success = success && (setsockopt(handle->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0);
if (success) {
/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
/* callback. */
if (handle->accept_socket == INVALID_SOCKET) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
((uv_connection_cb)handle->connection_cb)(handle);
uv_last_error_ = req->error;
((uv_connection_cb)handle->connection_cb)(handle, -1);
}
break;
}
if (req->error.code == UV_OK &&
setsockopt(handle->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
((uv_connection_cb)handle->connection_cb)(handle, 0);
}
} else {
/* Errorneous accept is ignored if the listen socket is still healthy. */
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
if (!(handle->flags & UV_HANDLE_CLOSING)) {
uv_queue_accept(handle);
@ -1143,11 +1188,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
case UV_CONNECT:
if (req->cb) {
success = GetOverlappedResult(handle->handle,
&req->overlapped,
&bytes,
FALSE);
if (success) {
if (req->error.code == UV_OK) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
@ -1160,7 +1201,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
((uv_connect_cb)req->cb)(req, -1);
}
} else {
uv_set_sys_error(WSAGetLastError());
uv_last_error_ = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
}
@ -1500,56 +1541,31 @@ static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) {
}
static void uv_poll() {
BOOL success;
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
static void uv_process_reqs() {
uv_req_t* req;
uv_timer_t* timer;
uv_handle_t* handle;
DWORD timeout;
int64_t delta;
/* Call all pending close callbacks. */
/* TODO: ugly, fixme. */
uv_call_endgames();
if (uv_refs_ == 0)
return;
while (req = uv_remove_pending_req()) {
handle = req->handle;
uv_loop_invoke(uv_prepare_handles_);
switch (handle->type) {
case UV_TCP:
uv_tcp_return_req((uv_tcp_t*)handle, req);
break;
uv_update_time();
case UV_ASYNC:
uv_async_return_req((uv_async_t*)handle, req);
break;
/* Check if there are any running timers */
timer = RB_MIN(uv_timer_tree_s, &uv_timers_);
if (timer) {
delta = timer->due - uv_now_;
if (delta >= UINT_MAX) {
/* Can't have a timeout greater than UINT_MAX, and a timeout value of */
/* UINT_MAX means infinite, so that's no good either. */
timeout = UINT_MAX - 1;
} else if (delta < 0) {
/* Negative timeout values are not allowed */
timeout = 0;
} else {
timeout = (DWORD)delta;
default:
assert(0);
}
} else {
/* No timers */
timeout = INFINITE;
}
}
success = GetQueuedCompletionStatus(uv_iocp_,
&bytes,
&key,
&overlapped,
timeout);
uv_update_time();
/* Call check callbacks */
uv_loop_invoke(uv_check_handles_);
static void uv_process_timers() {
uv_timer_t* timer;
/* Call timer callbacks */
for (timer = RB_MIN(uv_timer_tree_s, &uv_timers_);
@ -1573,38 +1589,99 @@ static void uv_poll() {
((uv_loop_cb) timer->timer_cb)((uv_handle_t*)timer, 0);
}
}
/* Only if a iocp package was dequeued... */
if (overlapped) {
req = uv_overlapped_to_req(overlapped);
handle = req->handle;
switch (handle->type) {
case UV_TCP:
uv_tcp_return_req((uv_tcp_t*)handle, req);
break;
static DWORD uv_get_poll_timeout() {
uv_timer_t* timer;
int64_t delta;
case UV_ASYNC:
uv_async_return_req((uv_async_t*)handle, req);
break;
/* Check if there are any running timers */
timer = RB_MIN(uv_timer_tree_s, &uv_timers_);
if (timer) {
uv_update_time();
default:
assert(0);
delta = timer->due - uv_now_;
if (delta >= UINT_MAX) {
/* Can't have a timeout greater than UINT_MAX, and a timeout value of */
/* UINT_MAX means infinite, so that's no good either. */
return UINT_MAX - 1;
} else if (delta < 0) {
/* Negative timeout values are not allowed */
return 0;
} else {
return (DWORD)delta;
}
} /* if (overlapped) */
} else {
/* No timers */
return INFINITE;
}
}
/* Call idle callbacks */
while (uv_idle_handles_) {
uv_call_endgames();
uv_loop_invoke(uv_idle_handles_);
static void uv_poll() {
BOOL success;
DWORD bytes;
ULONG_PTR key;
OVERLAPPED* overlapped;
uv_req_t* req;
success = GetQueuedCompletionStatus(uv_iocp_,
&bytes,
&key,
&overlapped,
uv_get_poll_timeout());
uv_update_time();
if (overlapped) {
/* Package was dequeued */
req = uv_overlapped_to_req(overlapped);
if (success) {
req->error = uv_ok_;
} else {
req->error = uv_new_sys_error(GetLastError());
}
uv_insert_pending_req(req);
} else if (GetLastError() != WAIT_TIMEOUT) {
/* Serious error */
uv_fatal_error(GetLastError(), "GetQueuedCompletionStatus");
}
}
int uv_run() {
while (uv_refs_ > 0) {
while (1) {
uv_update_time();
uv_process_timers();
/* Terrible: please fix me! */
while (uv_refs_ > 0 &&
(uv_idle_handles_ || uv_pending_reqs_tail_ || uv_endgame_handles_)) {
/* Terrible: please fix me! */
while (uv_pending_reqs_tail_ || uv_endgame_handles_) {
uv_process_endgames();
uv_process_reqs();
}
/* Call idle callbacks */
uv_loop_invoke(uv_idle_handles_);
}
if (uv_refs_ <= 0) {
break;
}
uv_loop_invoke(uv_prepare_handles_);
uv_poll();
uv_loop_invoke(uv_check_handles_);
}
assert(uv_refs_ == 0);
return 0;
}

View File

@ -49,7 +49,9 @@ typedef struct uv_buf_t {
size_t queued_bytes; \
}; \
}; \
int flags;
int flags; \
uv_err_t error; \
struct uv_req_s* next_req;
#define uv_tcp_connection_fields \
uv_alloc_cb alloc_cb; \