From e58a1abff02d7bacf89a56de9050e27690a97bc5 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Tue, 7 Jun 2011 18:07:28 +0200 Subject: [PATCH] Refactor request handling on Windows - keep a list of completed reqs to be returned. - do not implicitly close a socket after error or eof+shutdown. --- uv-win.c | 313 ++++++++++++++++++++++++++++++++++--------------------- uv-win.h | 4 +- 2 files changed, 198 insertions(+), 119 deletions(-) diff --git a/uv-win.c b/uv-win.c index a8f1463b..38a44218 100644 --- a/uv-win.c +++ b/uv-win.c @@ -164,6 +164,11 @@ static uv_handle_t* uv_next_loop_handle_ = NULL; static uv_handle_t* uv_endgame_handles_ = NULL; +/* Tail of a single-linked circular queue of pending reqs. If the queue is */ +/* empty, tail_ is NULL. If there is only one item, tail_->next_req == tail_ */ +static uv_req_t* uv_pending_reqs_tail_ = NULL; + + /* The current time according to the event loop. in msecs. */ static int64_t uv_now_ = 0; static int64_t uv_ticks_per_msec_ = 0; @@ -417,6 +422,39 @@ static uv_req_t* uv_overlapped_to_req(OVERLAPPED* overlapped) { } +static void uv_insert_pending_req(uv_req_t* req) { + req->next_req = NULL; + if (uv_pending_reqs_tail_) { + req->next_req = uv_pending_reqs_tail_->next_req; + uv_pending_reqs_tail_ = req; + } else { + req->next_req = req; + uv_pending_reqs_tail_ = req; + } +} + + +static uv_req_t* uv_remove_pending_req() { + uv_req_t* req; + + if (uv_pending_reqs_tail_) { + req = uv_pending_reqs_tail_->next_req; + + if (req == uv_pending_reqs_tail_) { + uv_pending_reqs_tail_ = NULL; + } else { + uv_pending_reqs_tail_->next_req = req->next_req; + } + + return req; + + } else { + /* queue empty */ + return NULL; + } +} + + static int uv_tcp_init_socket(uv_tcp_t* handle, uv_close_cb close_cb, void* data, SOCKET socket) { DWORD yes = 1; @@ -509,15 +547,6 @@ static void uv_tcp_endgame(uv_tcp_t* handle) { handle->reqs_pending--; } - if (handle->flags & UV_HANDLE_EOF && - handle->flags & UV_HANDLE_SHUT && - !(handle->flags & UV_HANDLE_CLOSING)) { - /* Because uv_close will add the handle to the endgame_handles list, */ - /* return here and call the close cb the next time. */ - uv_close((uv_handle_t*)handle); - return; - } - if (handle->flags & UV_HANDLE_CLOSING && handle->reqs_pending == 0) { assert(!(handle->flags & UV_HANDLE_CLOSED)); @@ -576,7 +605,7 @@ static void uv_async_endgame(uv_async_t* handle) { } -static void uv_call_endgames() { +static void uv_process_endgames() { uv_handle_t* handle; while (uv_endgame_handles_) { @@ -731,17 +760,21 @@ static void uv_queue_accept(uv_tcp_t* handle) { assert(handle->flags & UV_HANDLE_LISTENING); assert(handle->accept_socket == INVALID_SOCKET); - accept_socket = socket(AF_INET, SOCK_STREAM, 0); - if (accept_socket == INVALID_SOCKET) { - uv_close_error((uv_handle_t*)handle, uv_new_sys_error(WSAGetLastError())); - return; - } - - /* Prepare the uv_req and OVERLAPPED structures. */ + /* Prepare the uv_req structure. */ req = &handle->accept_req; assert(!(req->flags & UV_REQ_PENDING)); req->type = UV_ACCEPT; req->flags |= UV_REQ_PENDING; + + /* Open a socket for the accepted connection. */ + accept_socket = socket(AF_INET, SOCK_STREAM, 0); + if (accept_socket == INVALID_SOCKET) { + req->error = uv_new_sys_error(WSAGetLastError()); + uv_insert_pending_req(req); + return; + } + + /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); success = pAcceptEx(handle->socket, @@ -754,11 +787,11 @@ static void uv_queue_accept(uv_tcp_t* handle) { &req->overlapped); if (!success && WSAGetLastError() != ERROR_IO_PENDING) { - uv_set_sys_error(WSAGetLastError()); - /* destroy the preallocated client handle */ + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(WSAGetLastError()); + uv_insert_pending_req(req); + /* Destroy the preallocated client socket. */ closesocket(accept_socket); - /* destroy ourselves */ - uv_close_error((uv_handle_t*)handle, uv_last_error_); return; } @@ -794,8 +827,9 @@ static void uv_queue_read(uv_tcp_t* handle) { &req->overlapped, NULL); if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) { - uv_set_sys_error(WSAGetLastError()); - uv_close_error((uv_handle_t*)handle, uv_last_error_); + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(WSAGetLastError()); + uv_insert_pending_req(req); return; } @@ -1034,7 +1068,6 @@ int uv_shutdown(uv_req_t* req) { static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { - BOOL success; DWORD bytes, flags, err; uv_buf_t buf; @@ -1045,29 +1078,31 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { switch (req->type) { case UV_WRITE: - success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); handle->write_queue_size -= req->queued_bytes; - if (!success) { - uv_set_sys_error(GetLastError()); - uv_close_error((uv_handle_t*)handle, uv_last_error_); - } if (req->cb) { - ((uv_write_cb)req->cb)(req, success ? 0 : -1); + uv_last_error_ = req->error; + ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); } handle->write_reqs_pending--; - if (success && - handle->write_reqs_pending == 0 && + if (handle->write_reqs_pending == 0 && handle->flags & UV_HANDLE_SHUTTING) { uv_want_endgame((uv_handle_t*)handle); } break; case UV_READ: - success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); - if (!success) { - uv_set_sys_error(GetLastError()); - uv_close_error((uv_handle_t*)handle, uv_last_error_); + if (req->error.code != UV_OK) { + /* An error occurred doing the 0-read. */ + /* Stop reading and report error. */ + handle->flags &= ~UV_HANDLE_READING; + uv_last_error_ = req->error; + buf.base = 0; + buf.len = 0; + ((uv_read_cb)handle->read_cb)(handle, -1, buf); + break; } + + /* Do nonblocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { buf = handle->alloc_cb(handle, 65536); assert(buf.len > 0); @@ -1093,47 +1128,57 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { uv_last_error_.code = UV_EOF; uv_last_error_.sys_errno_ = ERROR_SUCCESS; ((uv_read_cb)handle->read_cb)(handle, -1, buf); - uv_want_endgame((uv_handle_t*)handle); break; } } else { err = WSAGetLastError(); if (err == WSAEWOULDBLOCK) { - /* 0-byte read */ + /* Read buffer was completely empty, report a 0-byte read. */ uv_set_sys_error(WSAEWOULDBLOCK); ((uv_read_cb)handle->read_cb)(handle, 0, buf); } else { /* Ouch! serious error. */ uv_set_sys_error(err); - uv_close_error((uv_handle_t*)handle, uv_last_error_); + ((uv_read_cb)handle->read_cb)(handle, -1, buf); } break; } } - /* Post another 0-read if still reading and not closing */ + /* Post another 0-read if still reading and not closing. */ if (!(handle->flags & UV_HANDLE_CLOSING) && - !(handle->flags & UV_HANDLE_EOF) && handle->flags & UV_HANDLE_READING) { uv_queue_read(handle); } break; case UV_ACCEPT: - assert(handle->accept_socket != INVALID_SOCKET); - - success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); - success = success && (setsockopt(handle->accept_socket, - SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&handle->socket, - sizeof(handle->socket)) == 0); - - if (success) { + /* If handle->accepted_socket is not a valid socket, then */ + /* uv_queue_accept must have failed. This is a serious error. We stop */ + /* accepting connections and report this error to the connection */ + /* callback. */ + if (handle->accept_socket == INVALID_SOCKET) { + handle->flags &= ~UV_HANDLE_LISTENING; if (handle->connection_cb) { - ((uv_connection_cb)handle->connection_cb)(handle); + uv_last_error_ = req->error; + ((uv_connection_cb)handle->connection_cb)(handle, -1); + } + break; + } + + if (req->error.code == UV_OK && + setsockopt(handle->accept_socket, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&handle->socket, + sizeof(handle->socket)) == 0) { + /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */ + if (handle->connection_cb) { + ((uv_connection_cb)handle->connection_cb)(handle, 0); } } else { - /* Errorneous accept is ignored if the listen socket is still healthy. */ + /* Error related to accepted socket is ignored because the server */ + /* socket may still be healthy. If the server socket is broken + /* uv_queue_accept will detect it. */ closesocket(handle->accept_socket); if (!(handle->flags & UV_HANDLE_CLOSING)) { uv_queue_accept(handle); @@ -1143,11 +1188,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { case UV_CONNECT: if (req->cb) { - success = GetOverlappedResult(handle->handle, - &req->overlapped, - &bytes, - FALSE); - if (success) { + if (req->error.code == UV_OK) { if (setsockopt(handle->socket, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, @@ -1160,7 +1201,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { ((uv_connect_cb)req->cb)(req, -1); } } else { - uv_set_sys_error(WSAGetLastError()); + uv_last_error_ = req->error; ((uv_connect_cb)req->cb)(req, -1); } } @@ -1500,56 +1541,31 @@ static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) { } -static void uv_poll() { - BOOL success; - DWORD bytes; - ULONG_PTR key; - OVERLAPPED* overlapped; +static void uv_process_reqs() { uv_req_t* req; - uv_timer_t* timer; uv_handle_t* handle; - DWORD timeout; - int64_t delta; - /* Call all pending close callbacks. */ - /* TODO: ugly, fixme. */ - uv_call_endgames(); - if (uv_refs_ == 0) - return; + while (req = uv_remove_pending_req()) { + handle = req->handle; - uv_loop_invoke(uv_prepare_handles_); + switch (handle->type) { + case UV_TCP: + uv_tcp_return_req((uv_tcp_t*)handle, req); + break; - uv_update_time(); + case UV_ASYNC: + uv_async_return_req((uv_async_t*)handle, req); + break; - /* Check if there are any running timers */ - timer = RB_MIN(uv_timer_tree_s, &uv_timers_); - if (timer) { - delta = timer->due - uv_now_; - if (delta >= UINT_MAX) { - /* Can't have a timeout greater than UINT_MAX, and a timeout value of */ - /* UINT_MAX means infinite, so that's no good either. */ - timeout = UINT_MAX - 1; - } else if (delta < 0) { - /* Negative timeout values are not allowed */ - timeout = 0; - } else { - timeout = (DWORD)delta; + default: + assert(0); } - } else { - /* No timers */ - timeout = INFINITE; } +} - success = GetQueuedCompletionStatus(uv_iocp_, - &bytes, - &key, - &overlapped, - timeout); - uv_update_time(); - - /* Call check callbacks */ - uv_loop_invoke(uv_check_handles_); +static void uv_process_timers() { + uv_timer_t* timer; /* Call timer callbacks */ for (timer = RB_MIN(uv_timer_tree_s, &uv_timers_); @@ -1573,38 +1589,99 @@ static void uv_poll() { ((uv_loop_cb) timer->timer_cb)((uv_handle_t*)timer, 0); } +} - /* Only if a iocp package was dequeued... */ - if (overlapped) { - req = uv_overlapped_to_req(overlapped); - handle = req->handle; - switch (handle->type) { - case UV_TCP: - uv_tcp_return_req((uv_tcp_t*)handle, req); - break; +static DWORD uv_get_poll_timeout() { + uv_timer_t* timer; + int64_t delta; - case UV_ASYNC: - uv_async_return_req((uv_async_t*)handle, req); - break; + /* Check if there are any running timers */ + timer = RB_MIN(uv_timer_tree_s, &uv_timers_); + if (timer) { + uv_update_time(); - default: - assert(0); + delta = timer->due - uv_now_; + if (delta >= UINT_MAX) { + /* Can't have a timeout greater than UINT_MAX, and a timeout value of */ + /* UINT_MAX means infinite, so that's no good either. */ + return UINT_MAX - 1; + } else if (delta < 0) { + /* Negative timeout values are not allowed */ + return 0; + } else { + return (DWORD)delta; } - } /* if (overlapped) */ + } else { + /* No timers */ + return INFINITE; + } +} - /* Call idle callbacks */ - while (uv_idle_handles_) { - uv_call_endgames(); - uv_loop_invoke(uv_idle_handles_); + +static void uv_poll() { + BOOL success; + DWORD bytes; + ULONG_PTR key; + OVERLAPPED* overlapped; + uv_req_t* req; + + success = GetQueuedCompletionStatus(uv_iocp_, + &bytes, + &key, + &overlapped, + uv_get_poll_timeout()); + + uv_update_time(); + + if (overlapped) { + /* Package was dequeued */ + req = uv_overlapped_to_req(overlapped); + + if (success) { + req->error = uv_ok_; + } else { + req->error = uv_new_sys_error(GetLastError()); + } + + uv_insert_pending_req(req); + + } else if (GetLastError() != WAIT_TIMEOUT) { + /* Serious error */ + uv_fatal_error(GetLastError(), "GetQueuedCompletionStatus"); } } int uv_run() { - while (uv_refs_ > 0) { + while (1) { + uv_update_time(); + uv_process_timers(); + + /* Terrible: please fix me! */ + while (uv_refs_ > 0 && + (uv_idle_handles_ || uv_pending_reqs_tail_ || uv_endgame_handles_)) { + /* Terrible: please fix me! */ + while (uv_pending_reqs_tail_ || uv_endgame_handles_) { + uv_process_endgames(); + uv_process_reqs(); + } + + /* Call idle callbacks */ + uv_loop_invoke(uv_idle_handles_); + } + + if (uv_refs_ <= 0) { + break; + } + + uv_loop_invoke(uv_prepare_handles_); + uv_poll(); + + uv_loop_invoke(uv_check_handles_); } + assert(uv_refs_ == 0); return 0; } diff --git a/uv-win.h b/uv-win.h index 4418698a..e87c5828 100644 --- a/uv-win.h +++ b/uv-win.h @@ -49,7 +49,9 @@ typedef struct uv_buf_t { size_t queued_bytes; \ }; \ }; \ - int flags; + int flags; \ + uv_err_t error; \ + struct uv_req_s* next_req; #define uv_tcp_connection_fields \ uv_alloc_cb alloc_cb; \