diff --git a/oio-win.c b/oio-win.c index e4065457..4cf81861 100644 --- a/oio-win.c +++ b/oio-win.c @@ -992,6 +992,154 @@ int oio_shutdown(oio_req_t* req) { } +static void oio_tcp_return_req(oio_handle_t* handle, oio_req_t* req) { + BOOL success; + DWORD bytes, flags, err; + oio_buf buf; + + assert(handle->type == OIO_TCP); + + /* Mark the request non-pending */ + req->flags &= ~OIO_REQ_PENDING; + + switch (req->type) { + case OIO_WRITE: + success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); + if (!success) { + oio_set_sys_error(GetLastError()); + oio_close_error(handle, oio_last_error_); + } + if (req->cb) { + ((oio_write_cb)req->cb)(req, success ? 0 : -1); + } + handle->write_reqs_pending--; + if (success && + handle->write_reqs_pending == 0 && + handle->flags & OIO_HANDLE_SHUTTING) { + oio_want_endgame(handle); + } + break; + + case OIO_READ: + success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); + if (!success) { + oio_set_sys_error(GetLastError()); + oio_close_error(handle, oio_last_error_); + } + while (handle->flags & OIO_HANDLE_READING) { + buf = oio_alloc_(handle, 65536); + assert(buf.len > 0); + flags = 0; + if (WSARecv(handle->socket, + (WSABUF*)&buf, + 1, + &bytes, + &flags, + NULL, + NULL) != SOCKET_ERROR) { + if (bytes > 0) { + /* Successful read */ + ((oio_read_cb)handle->read_cb)(handle, bytes, buf); + /* Read again only if bytes == buf.len */ + if (bytes < buf.len) { + break; + } + } else { + /* Connection closed */ + handle->flags &= ~OIO_HANDLE_READING; + handle->flags |= OIO_HANDLE_EOF; + oio_last_error_.code = OIO_EOF; + oio_last_error_.sys_errno_ = ERROR_SUCCESS; + ((oio_read_cb)handle->read_cb)(handle, -1, buf); + oio_want_endgame(handle); + break; + } + } else { + err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK) { + /* 0-byte read */ + oio_set_sys_error(WSAEWOULDBLOCK); + ((oio_read_cb)handle->read_cb)(handle, 0, buf); + } else { + /* Ouch! serious error. */ + oio_set_sys_error(err); + oio_close_error(handle, oio_last_error_); + } + break; + } + } + /* Post another 0-read if still reading and not closing */ + if (!(handle->flags & OIO_HANDLE_CLOSING) && + !(handle->flags & OIO_HANDLE_EOF) && + handle->flags & OIO_HANDLE_READING) { + oio_queue_read(handle); + } + break; + + case OIO_ACCEPT: + assert(handle->accept_socket != INVALID_SOCKET); + + success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE); + success = success && (setsockopt(handle->accept_socket, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&handle->socket, + sizeof(handle->socket)) == 0); + + if (success) { + if (handle->accept_cb) { + ((oio_accept_cb)handle->accept_cb)(handle); + } + } else { + /* Errorneous accept is ignored if the listen socket is still healthy. */ + closesocket(handle->accept_socket); + if (!(handle->flags & OIO_HANDLE_CLOSING)) { + oio_queue_accept(handle); + } + } + break; + + case OIO_CONNECT: + if (req->cb) { + success = GetOverlappedResult(handle->handle, + &req->overlapped, + &bytes, + FALSE); + if (success) { + if (setsockopt(handle->socket, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + NULL, + 0) == 0) { + oio_tcp_init_connection(handle); + ((oio_connect_cb)req->cb)(req, 0); + } else { + oio_set_sys_error(WSAGetLastError()); + ((oio_connect_cb)req->cb)(req, -1); + } + } else { + oio_set_sys_error(WSAGetLastError()); + ((oio_connect_cb)req->cb)(req, -1); + } + } + break; + + default: + assert(0); + } + + /* The number of pending requests is now down by one */ + handle->reqs_pending--; + + /* Queue the handle's close callback if it is closing and there are no */ + /* more pending requests. */ + if (handle->flags & OIO_HANDLE_CLOSING && + handle->reqs_pending == 0) { + oio_want_endgame(handle); + } +} + + static int oio_timer_compare(oio_req_t* a, oio_req_t* b) { if (a->due < b->due) return -1; @@ -1216,6 +1364,20 @@ int oio_async_send(oio_handle_t* handle) { } +static void oio_async_return_req(oio_handle_t* handle, oio_req_t* req) { + assert(handle->type == OIO_ASYNC); + assert(req->type == OIO_WAKEUP); + + handle->async_sent = 0; + if (req->cb) { + ((oio_async_cb)req->cb)(handle, 0); + } + if (handle->flags & OIO_HANDLE_CLOSING) { + oio_want_endgame(handle); + } +} + + static void oio_poll() { BOOL success; DWORD bytes; @@ -1223,10 +1385,7 @@ static void oio_poll() { OVERLAPPED* overlapped; oio_req_t* req; oio_handle_t* handle; - oio_buf buf; DWORD timeout; - DWORD flags; - DWORD err; int64_t delta; /* Call all pending close callbacks. */ @@ -1284,157 +1443,18 @@ static void oio_poll() { req = oio_overlapped_to_req(overlapped); handle = req->handle; - /* Mark the request non-pending */ - req->flags &= ~OIO_REQ_PENDING; - - switch (req->type) { - case OIO_WRITE: - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - if (!success) { - oio_set_sys_error(GetLastError()); - oio_close_error(handle, oio_last_error_); - } - if (req->cb) { - ((oio_write_cb)req->cb)(req, success ? 0 : -1); - } - handle->write_reqs_pending--; - if (success && - handle->write_reqs_pending == 0 && - handle->flags & OIO_HANDLE_SHUTTING) { - oio_want_endgame(handle); - } + switch (handle->type) { + case OIO_TCP: + oio_tcp_return_req(handle, req); break; - case OIO_READ: - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - if (!success) { - oio_set_sys_error(GetLastError()); - oio_close_error(handle, oio_last_error_); - } - while (handle->flags & OIO_HANDLE_READING) { - buf = oio_alloc_(handle, 65536); - assert(buf.len > 0); - flags = 0; - if (WSARecv(handle->socket, - (WSABUF*)&buf, - 1, - &bytes, - &flags, - NULL, - NULL) != SOCKET_ERROR) { - if (bytes > 0) { - /* Successful read */ - ((oio_read_cb)handle->read_cb)(handle, bytes, buf); - /* Read again only if bytes == buf.len */ - if (bytes < buf.len) { - break; - } - } else { - /* Connection closed */ - handle->flags &= ~OIO_HANDLE_READING; - handle->flags |= OIO_HANDLE_EOF; - oio_last_error_.code = OIO_EOF; - oio_last_error_.sys_errno_ = ERROR_SUCCESS; - ((oio_read_cb)handle->read_cb)(handle, -1, buf); - oio_want_endgame(handle); - break; - } - } else { - err = WSAGetLastError(); - if (err == WSAEWOULDBLOCK) { - /* 0-byte read */ - oio_set_sys_error(WSAEWOULDBLOCK); - ((oio_read_cb)handle->read_cb)(handle, 0, buf); - } else { - /* Ouch! serious error. */ - oio_set_sys_error(err); - oio_close_error(handle, oio_last_error_); - } - break; - } - } - /* Post another 0-read if still reading and not closing */ - if (!(handle->flags & OIO_HANDLE_CLOSING) && - !(handle->flags & OIO_HANDLE_EOF) && - handle->flags & OIO_HANDLE_READING) { - oio_queue_read(handle); - } - break; - - case OIO_ACCEPT: - assert(handle->accept_socket != INVALID_SOCKET); - - success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); - success = success && (setsockopt(handle->accept_socket, - SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&handle->socket, - sizeof(handle->socket)) == 0); - - if (success) { - if (handle->accept_cb) { - ((oio_accept_cb)handle->accept_cb)(handle); - } - } else { - /* Errorneous accept is ignored if the listen socket is still healthy. */ - closesocket(handle->accept_socket); - if (!(handle->flags & OIO_HANDLE_CLOSING)) { - oio_queue_accept(handle); - } - } - break; - - case OIO_CONNECT: - if (req->cb) { - success = GetOverlappedResult(handle->handle, - overlapped, - &bytes, - FALSE); - if (success) { - if (setsockopt(handle->socket, - SOL_SOCKET, - SO_UPDATE_CONNECT_CONTEXT, - NULL, - 0) == 0) { - oio_tcp_init_connection(handle); - ((oio_connect_cb)req->cb)(req, 0); - } else { - oio_set_sys_error(WSAGetLastError()); - ((oio_connect_cb)req->cb)(req, -1); - } - } else { - oio_set_sys_error(WSAGetLastError()); - ((oio_connect_cb)req->cb)(req, -1); - } - } - break; - - case OIO_WAKEUP: - handle->async_sent = 0; - if (req->cb) { - ((oio_async_cb)req->cb)(handle, 0); - } - if (handle->flags & OIO_HANDLE_CLOSING) { - oio_want_endgame(handle); - } + case OIO_ASYNC: + oio_async_return_req(handle, req); break; default: assert(0); } - - /* TODO: reorganize this */ - if (handle->type == OIO_TCP) { - /* The number of pending requests is now down by one */ - handle->reqs_pending--; - - /* Queue the handle's close callback if it is closing and there are no */ - /* more pending requests. */ - if (handle->flags & OIO_HANDLE_CLOSING && - handle->reqs_pending == 0) { - oio_want_endgame(handle); - } - } } /* if (overlapped) */ /* Call idle callbacks */