Windows: reorganize returned req processing

This commit is contained in:
Bert Belder 2011-05-09 16:57:12 +02:00
parent 8b09c88f1f
commit ad64eb4856

314
oio-win.c
View File

@ -992,6 +992,154 @@ int oio_shutdown(oio_req_t* req) {
}
static void oio_tcp_return_req(oio_handle_t* handle, oio_req_t* req) {
BOOL success;
DWORD bytes, flags, err;
oio_buf buf;
assert(handle->type == OIO_TCP);
/* Mark the request non-pending */
req->flags &= ~OIO_REQ_PENDING;
switch (req->type) {
case OIO_WRITE:
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_write_cb)req->cb)(req, success ? 0 : -1);
}
handle->write_reqs_pending--;
if (success &&
handle->write_reqs_pending == 0 &&
handle->flags & OIO_HANDLE_SHUTTING) {
oio_want_endgame(handle);
}
break;
case OIO_READ:
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
while (handle->flags & OIO_HANDLE_READING) {
buf = oio_alloc_(handle, 65536);
assert(buf.len > 0);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
/* Successful read */
((oio_read_cb)handle->read_cb)(handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~OIO_HANDLE_READING;
handle->flags |= OIO_HANDLE_EOF;
oio_last_error_.code = OIO_EOF;
oio_last_error_.sys_errno_ = ERROR_SUCCESS;
((oio_read_cb)handle->read_cb)(handle, -1, buf);
oio_want_endgame(handle);
break;
}
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
/* 0-byte read */
oio_set_sys_error(WSAEWOULDBLOCK);
((oio_read_cb)handle->read_cb)(handle, 0, buf);
} else {
/* Ouch! serious error. */
oio_set_sys_error(err);
oio_close_error(handle, oio_last_error_);
}
break;
}
}
/* Post another 0-read if still reading and not closing */
if (!(handle->flags & OIO_HANDLE_CLOSING) &&
!(handle->flags & OIO_HANDLE_EOF) &&
handle->flags & OIO_HANDLE_READING) {
oio_queue_read(handle);
}
break;
case OIO_ACCEPT:
assert(handle->accept_socket != INVALID_SOCKET);
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
success = success && (setsockopt(handle->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0);
if (success) {
if (handle->accept_cb) {
((oio_accept_cb)handle->accept_cb)(handle);
}
} else {
/* Errorneous accept is ignored if the listen socket is still healthy. */
closesocket(handle->accept_socket);
if (!(handle->flags & OIO_HANDLE_CLOSING)) {
oio_queue_accept(handle);
}
}
break;
case OIO_CONNECT:
if (req->cb) {
success = GetOverlappedResult(handle->handle,
&req->overlapped,
&bytes,
FALSE);
if (success) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
oio_tcp_init_connection(handle);
((oio_connect_cb)req->cb)(req, 0);
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
}
break;
default:
assert(0);
}
/* The number of pending requests is now down by one */
handle->reqs_pending--;
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */
if (handle->flags & OIO_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
oio_want_endgame(handle);
}
}
static int oio_timer_compare(oio_req_t* a, oio_req_t* b) {
if (a->due < b->due)
return -1;
@ -1216,6 +1364,20 @@ int oio_async_send(oio_handle_t* handle) {
}
static void oio_async_return_req(oio_handle_t* handle, oio_req_t* req) {
assert(handle->type == OIO_ASYNC);
assert(req->type == OIO_WAKEUP);
handle->async_sent = 0;
if (req->cb) {
((oio_async_cb)req->cb)(handle, 0);
}
if (handle->flags & OIO_HANDLE_CLOSING) {
oio_want_endgame(handle);
}
}
static void oio_poll() {
BOOL success;
DWORD bytes;
@ -1223,10 +1385,7 @@ static void oio_poll() {
OVERLAPPED* overlapped;
oio_req_t* req;
oio_handle_t* handle;
oio_buf buf;
DWORD timeout;
DWORD flags;
DWORD err;
int64_t delta;
/* Call all pending close callbacks. */
@ -1284,157 +1443,18 @@ static void oio_poll() {
req = oio_overlapped_to_req(overlapped);
handle = req->handle;
/* Mark the request non-pending */
req->flags &= ~OIO_REQ_PENDING;
switch (req->type) {
case OIO_WRITE:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_write_cb)req->cb)(req, success ? 0 : -1);
}
handle->write_reqs_pending--;
if (success &&
handle->write_reqs_pending == 0 &&
handle->flags & OIO_HANDLE_SHUTTING) {
oio_want_endgame(handle);
}
switch (handle->type) {
case OIO_TCP:
oio_tcp_return_req(handle, req);
break;
case OIO_READ:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
while (handle->flags & OIO_HANDLE_READING) {
buf = oio_alloc_(handle, 65536);
assert(buf.len > 0);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
/* Successful read */
((oio_read_cb)handle->read_cb)(handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~OIO_HANDLE_READING;
handle->flags |= OIO_HANDLE_EOF;
oio_last_error_.code = OIO_EOF;
oio_last_error_.sys_errno_ = ERROR_SUCCESS;
((oio_read_cb)handle->read_cb)(handle, -1, buf);
oio_want_endgame(handle);
break;
}
} else {
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
/* 0-byte read */
oio_set_sys_error(WSAEWOULDBLOCK);
((oio_read_cb)handle->read_cb)(handle, 0, buf);
} else {
/* Ouch! serious error. */
oio_set_sys_error(err);
oio_close_error(handle, oio_last_error_);
}
break;
}
}
/* Post another 0-read if still reading and not closing */
if (!(handle->flags & OIO_HANDLE_CLOSING) &&
!(handle->flags & OIO_HANDLE_EOF) &&
handle->flags & OIO_HANDLE_READING) {
oio_queue_read(handle);
}
break;
case OIO_ACCEPT:
assert(handle->accept_socket != INVALID_SOCKET);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
success = success && (setsockopt(handle->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0);
if (success) {
if (handle->accept_cb) {
((oio_accept_cb)handle->accept_cb)(handle);
}
} else {
/* Errorneous accept is ignored if the listen socket is still healthy. */
closesocket(handle->accept_socket);
if (!(handle->flags & OIO_HANDLE_CLOSING)) {
oio_queue_accept(handle);
}
}
break;
case OIO_CONNECT:
if (req->cb) {
success = GetOverlappedResult(handle->handle,
overlapped,
&bytes,
FALSE);
if (success) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
oio_tcp_init_connection(handle);
((oio_connect_cb)req->cb)(req, 0);
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
}
break;
case OIO_WAKEUP:
handle->async_sent = 0;
if (req->cb) {
((oio_async_cb)req->cb)(handle, 0);
}
if (handle->flags & OIO_HANDLE_CLOSING) {
oio_want_endgame(handle);
}
case OIO_ASYNC:
oio_async_return_req(handle, req);
break;
default:
assert(0);
}
/* TODO: reorganize this */
if (handle->type == OIO_TCP) {
/* The number of pending requests is now down by one */
handle->reqs_pending--;
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */
if (handle->flags & OIO_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
oio_want_endgame(handle);
}
}
} /* if (overlapped) */
/* Call idle callbacks */