diff --git a/oio-win.c b/oio-win.c index f307f8d3..013cabae 100644 --- a/oio-win.c +++ b/oio-win.c @@ -127,7 +127,7 @@ static LPFN_TRANSMITFILE pTransmitFile; * Private oio_handle flags */ #define OIO_HANDLE_CLOSING 0x01 -#define OIO_HANDLE_CLOSED 0x03 +#define OIO_HANDLE_CLOSED 0x02 #define OIO_HANDLE_BOUND 0x04 /* @@ -159,6 +159,10 @@ RB_PROTOTYPE_STATIC(oio_timer_s, oio_req_s, tree_entry, oio_timer_compare); static struct oio_timer_s oio_timers_ = RB_INITIALIZER(oio_timers_); +/* Head of a single-linked list of closed handles */ +static oio_handle* oio_closed_handles_ = NULL; + + /* The current time according to the event loop. in msecs. */ static int64_t oio_now_ = 0; static int64_t oio_ticks_per_msec_ = 0; @@ -433,9 +437,17 @@ int oio_accept(oio_handle* server, oio_handle* client, } -static int oio_close_error(oio_handle* handle, oio_err e) { - oio_req *req; +static void oio_close_ready(oio_handle* handle) { + assert(handle->flags & OIO_HANDLE_CLOSING); + assert(!(handle->flags & OIO_HANDLE_CLOSED)); + assert(handle->reqs_pending == 0); + handle->closed_next = oio_closed_handles_; + oio_closed_handles_ = handle; +} + + +static int oio_close_error(oio_handle* handle, oio_err e) { if (handle->flags & OIO_HANDLE_CLOSING) { return 0; } @@ -445,25 +457,14 @@ static int oio_close_error(oio_handle* handle, oio_err e) { switch (handle->type) { case OIO_TCP: closesocket(handle->socket); - if (handle->reqs_pending == 0) { - /* If there are no operations queued for this socket, queue one */ - /* manually, so oio_poll will call close_cb. */ - req = (oio_req*)malloc(sizeof(*req)); - req->handle = handle; - req->type = OIO_CLOSE; - req->flags = 0; - if (!PostQueuedCompletionStatus(oio_iocp_, - 0, - (ULONG_PTR)handle, - &req->overlapped)) { - oio_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); - } - req->flags |= OIO_REQ_PENDING; - handle->reqs_pending++; - } - - /* After all packets to come out, oio_poll will call close_cb. */ handle->flags |= OIO_HANDLE_CLOSING; + + /* If there are no pending requests for this handle, enqueue the close */ + /* callback immediately. Otherwise oio_poll will do it after the last */ + /* request returns. */ + if (handle->reqs_pending == 0) { + oio_close_ready(handle); + } return 0; default: @@ -479,6 +480,31 @@ int oio_close(oio_handle* handle) { } +static void oio_call_close_cbs() { + oio_handle *handle; + + while (oio_closed_handles_) { + handle = oio_closed_handles_; + oio_closed_handles_ = handle->closed_next; + + assert(handle->flags & OIO_HANDLE_CLOSING); + assert(!(handle->flags & OIO_HANDLE_CLOSED)); + assert(handle->reqs_pending == 0); + + handle->flags |= OIO_HANDLE_CLOSED; + oio_refs_--; + + if (handle->accept_reqs) { + free(handle->accept_reqs); + } + if (handle->close_cb) { + oio_last_error_ = handle->error; + handle->close_cb(handle, handle->error.code == OIO_OK ? 0 : 1); + } + } +} + + struct sockaddr_in oio_ip4_addr(char *ip, int port) { struct sockaddr_in addr; @@ -758,6 +784,12 @@ static void oio_poll() { DWORD timeout; int64_t delta; + /* Call all pending close callbacks. */ + /* TODO: ugly, fixme. */ + oio_call_close_cbs(); + if (oio_refs_ == 0) + return; + oio_update_time(); /* Check if there are any running timers */ @@ -796,125 +828,105 @@ static void oio_poll() { ((oio_timer_cb)req->cb)(req, req->due - oio_now_, 0); } - /* Quit if there were no io requests dequeued. */ - if (!success && !overlapped) - return; + /* Only if a iocp package was dequeued... */ + if (overlapped) { + req = oio_overlapped_to_req(overlapped); + handle = req->handle; - req = oio_overlapped_to_req(overlapped); - handle = req->handle; + /* Mark the request non-pending */ + req->flags &= ~OIO_REQ_PENDING; - /* Mark the request non-pending */ - req->flags &= ~OIO_REQ_PENDING; - handle->reqs_pending--; + switch (req->type) { + case OIO_WRITE: + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); + if (!success) { + oio_set_sys_error(GetLastError()); + oio_close_error(handle, oio_last_error_); + } + if (req->cb) { + ((oio_write_cb)req->cb)(req, success ? 0 : -1); + } + break; - /* If the related socket got closed in the meantime, disregard this */ - /* result. If this is the last request pending, call the handle's close */ - /* callback. */ - if (handle->flags & OIO_HANDLE_CLOSING) { - /* If we reserved a socket handle to accept, free it. */ - if (req->type == OIO_ACCEPT) { - accept_req = (oio_accept_req*)req; - if (accept_req->socket != INVALID_SOCKET) { - closesocket(accept_req->socket); - } - } - if (handle->reqs_pending == 0) { - handle->flags |= OIO_HANDLE_CLOSED; - if (handle->accept_reqs) { - free(handle->accept_reqs); - } - if (handle->close_cb) { - oio_last_error_ = handle->error; - handle->close_cb(handle, handle->error.code == OIO_OK ? 0 : 1); - } - oio_refs_--; - } - return; - } + case OIO_READ: + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); + if (!success) { + oio_set_sys_error(GetLastError()); + oio_close_error(handle, oio_last_error_); + } + if (req->cb) { + ((oio_read_cb)req->cb)(req, bytes, success ? 0 : -1); + } + break; - switch (req->type) { - case OIO_WRITE: - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - if (!success) { - oio_set_sys_error(GetLastError()); - oio_close_error(handle, oio_last_error_); - } - if (req->cb) { - ((oio_write_cb)req->cb)(req, success ? 0 : -1); - } - return; + case OIO_ACCEPT: + accept_req = (oio_accept_req*)req; + assert(accept_req->socket != INVALID_SOCKET); + assert(handle->accepted_socket == INVALID_SOCKET); - case OIO_READ: - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - if (!success) { - oio_set_sys_error(GetLastError()); - oio_close_error(handle, oio_last_error_); - } - if (req->cb) { - ((oio_read_cb)req->cb)(req, bytes, success ? 0 : -1); - } - return; + handle->accepted_socket = accept_req->socket; - case OIO_ACCEPT: - accept_req = (oio_accept_req*)req; - assert(accept_req->socket != INVALID_SOCKET); - assert(handle->accepted_socket == INVALID_SOCKET); - - handle->accepted_socket = accept_req->socket; - - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - if (success) { - if (setsockopt(handle->accepted_socket, - SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&handle->socket, - sizeof(handle->socket)) == 0) { - if (req->cb) { - ((oio_accept_cb)req->cb)(handle); + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); + if (success) { + if (setsockopt(handle->accepted_socket, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&handle->socket, + sizeof(handle->socket)) == 0) { + if (req->cb) { + ((oio_accept_cb)req->cb)(handle); + } } } - } - /* accept_cb should call oio_accept_handle which sets data->socket */ - /* to INVALID_SOCKET. */ - /* Errorneous accept is ignored if the listen socket is still healthy. */ - if (handle->accepted_socket != INVALID_SOCKET) { - closesocket(handle->accepted_socket); - handle->accepted_socket = INVALID_SOCKET; - } + /* accept_cb should call oio_accept_handle which sets data->socket */ + /* to INVALID_SOCKET. */ + /* Errorneous accept is ignored if the listen socket is still healthy. */ + if (handle->accepted_socket != INVALID_SOCKET) { + closesocket(handle->accepted_socket); + handle->accepted_socket = INVALID_SOCKET; + } - /* Queue another accept */ - oio_queue_accept(accept_req, handle); - return; + /* Queue another accept */ + if (!handle->flags & OIO_HANDLE_CLOSING) + oio_queue_accept(accept_req, handle); + break; - case OIO_CONNECT: - if (req->cb) { - success = GetOverlappedResult(handle->handle, - overlapped, - &bytes, - FALSE); - if (success) { - if (setsockopt(handle->socket, - SOL_SOCKET, - SO_UPDATE_CONNECT_CONTEXT, - NULL, - 0) == 0) { - ((oio_connect_cb)req->cb)(req, 0); + case OIO_CONNECT: + if (req->cb) { + success = GetOverlappedResult(handle->handle, + overlapped, + &bytes, + FALSE); + if (success) { + if (setsockopt(handle->socket, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + NULL, + 0) == 0) { + ((oio_connect_cb)req->cb)(req, 0); + } else { + oio_set_sys_error(WSAGetLastError()); + ((oio_connect_cb)req->cb)(req, -1); + } } else { oio_set_sys_error(WSAGetLastError()); ((oio_connect_cb)req->cb)(req, -1); } - } else { - oio_set_sys_error(WSAGetLastError()); - ((oio_connect_cb)req->cb)(req, -1); } - } - return; + break; + } - case OIO_CLOSE: - /* Should never get here */ - assert(0); - } + /* The number of pending requests is now down by one */ + handle->reqs_pending--; + + /* Queue the handle's close callback if it is closing and there are no */ + /* more pending requests. */ + if (handle->flags & OIO_HANDLE_CLOSING && + handle->reqs_pending == 0) { + oio_close_ready(handle); + } + } /* if (overlapped) */ } diff --git a/oio-win.h b/oio-win.h index fe9d4e67..e5cf66d1 100644 --- a/oio-win.h +++ b/oio-win.h @@ -54,6 +54,7 @@ typedef struct oio_buf { int flags; #define oio_handle_private_fields \ + oio_handle* closed_next; \ union { \ SOCKET socket; \ HANDLE handle; \ diff --git a/oio.h b/oio.h index 2e696c2b..0e4fa4ae 100644 --- a/oio.h +++ b/oio.h @@ -105,7 +105,6 @@ typedef enum { OIO_READ, OIO_WRITE, OIO_SHUTDOWN, - OIO_CLOSE, OIO_TIMEOUT } oio_req_type;