Windows: better infrastructure for closing handles.

This commit is contained in:
Bert Belder 2011-04-21 17:19:25 +02:00
parent 9468c6570a
commit 2fdf161e61
3 changed files with 136 additions and 124 deletions

258
oio-win.c
View File

@ -127,7 +127,7 @@ static LPFN_TRANSMITFILE pTransmitFile;
* Private oio_handle flags
*/
#define OIO_HANDLE_CLOSING 0x01
#define OIO_HANDLE_CLOSED 0x03
#define OIO_HANDLE_CLOSED 0x02
#define OIO_HANDLE_BOUND 0x04
/*
@ -159,6 +159,10 @@ RB_PROTOTYPE_STATIC(oio_timer_s, oio_req_s, tree_entry, oio_timer_compare);
static struct oio_timer_s oio_timers_ = RB_INITIALIZER(oio_timers_);
/* Head of a single-linked list of closed handles */
static oio_handle* oio_closed_handles_ = NULL;
/* The current time according to the event loop. in msecs. */
static int64_t oio_now_ = 0;
static int64_t oio_ticks_per_msec_ = 0;
@ -433,9 +437,17 @@ int oio_accept(oio_handle* server, oio_handle* client,
}
static int oio_close_error(oio_handle* handle, oio_err e) {
oio_req *req;
static void oio_close_ready(oio_handle* handle) {
assert(handle->flags & OIO_HANDLE_CLOSING);
assert(!(handle->flags & OIO_HANDLE_CLOSED));
assert(handle->reqs_pending == 0);
handle->closed_next = oio_closed_handles_;
oio_closed_handles_ = handle;
}
static int oio_close_error(oio_handle* handle, oio_err e) {
if (handle->flags & OIO_HANDLE_CLOSING) {
return 0;
}
@ -445,25 +457,14 @@ static int oio_close_error(oio_handle* handle, oio_err e) {
switch (handle->type) {
case OIO_TCP:
closesocket(handle->socket);
if (handle->reqs_pending == 0) {
/* If there are no operations queued for this socket, queue one */
/* manually, so oio_poll will call close_cb. */
req = (oio_req*)malloc(sizeof(*req));
req->handle = handle;
req->type = OIO_CLOSE;
req->flags = 0;
if (!PostQueuedCompletionStatus(oio_iocp_,
0,
(ULONG_PTR)handle,
&req->overlapped)) {
oio_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
req->flags |= OIO_REQ_PENDING;
handle->reqs_pending++;
}
/* After all packets to come out, oio_poll will call close_cb. */
handle->flags |= OIO_HANDLE_CLOSING;
/* If there are no pending requests for this handle, enqueue the close */
/* callback immediately. Otherwise oio_poll will do it after the last */
/* request returns. */
if (handle->reqs_pending == 0) {
oio_close_ready(handle);
}
return 0;
default:
@ -479,6 +480,31 @@ int oio_close(oio_handle* handle) {
}
static void oio_call_close_cbs() {
oio_handle *handle;
while (oio_closed_handles_) {
handle = oio_closed_handles_;
oio_closed_handles_ = handle->closed_next;
assert(handle->flags & OIO_HANDLE_CLOSING);
assert(!(handle->flags & OIO_HANDLE_CLOSED));
assert(handle->reqs_pending == 0);
handle->flags |= OIO_HANDLE_CLOSED;
oio_refs_--;
if (handle->accept_reqs) {
free(handle->accept_reqs);
}
if (handle->close_cb) {
oio_last_error_ = handle->error;
handle->close_cb(handle, handle->error.code == OIO_OK ? 0 : 1);
}
}
}
struct sockaddr_in oio_ip4_addr(char *ip, int port) {
struct sockaddr_in addr;
@ -758,6 +784,12 @@ static void oio_poll() {
DWORD timeout;
int64_t delta;
/* Call all pending close callbacks. */
/* TODO: ugly, fixme. */
oio_call_close_cbs();
if (oio_refs_ == 0)
return;
oio_update_time();
/* Check if there are any running timers */
@ -796,125 +828,105 @@ static void oio_poll() {
((oio_timer_cb)req->cb)(req, req->due - oio_now_, 0);
}
/* Quit if there were no io requests dequeued. */
if (!success && !overlapped)
return;
/* Only if a iocp package was dequeued... */
if (overlapped) {
req = oio_overlapped_to_req(overlapped);
handle = req->handle;
req = oio_overlapped_to_req(overlapped);
handle = req->handle;
/* Mark the request non-pending */
req->flags &= ~OIO_REQ_PENDING;
/* Mark the request non-pending */
req->flags &= ~OIO_REQ_PENDING;
handle->reqs_pending--;
switch (req->type) {
case OIO_WRITE:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_write_cb)req->cb)(req, success ? 0 : -1);
}
break;
/* If the related socket got closed in the meantime, disregard this */
/* result. If this is the last request pending, call the handle's close */
/* callback. */
if (handle->flags & OIO_HANDLE_CLOSING) {
/* If we reserved a socket handle to accept, free it. */
if (req->type == OIO_ACCEPT) {
accept_req = (oio_accept_req*)req;
if (accept_req->socket != INVALID_SOCKET) {
closesocket(accept_req->socket);
}
}
if (handle->reqs_pending == 0) {
handle->flags |= OIO_HANDLE_CLOSED;
if (handle->accept_reqs) {
free(handle->accept_reqs);
}
if (handle->close_cb) {
oio_last_error_ = handle->error;
handle->close_cb(handle, handle->error.code == OIO_OK ? 0 : 1);
}
oio_refs_--;
}
return;
}
case OIO_READ:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_read_cb)req->cb)(req, bytes, success ? 0 : -1);
}
break;
switch (req->type) {
case OIO_WRITE:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_write_cb)req->cb)(req, success ? 0 : -1);
}
return;
case OIO_ACCEPT:
accept_req = (oio_accept_req*)req;
assert(accept_req->socket != INVALID_SOCKET);
assert(handle->accepted_socket == INVALID_SOCKET);
case OIO_READ:
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (!success) {
oio_set_sys_error(GetLastError());
oio_close_error(handle, oio_last_error_);
}
if (req->cb) {
((oio_read_cb)req->cb)(req, bytes, success ? 0 : -1);
}
return;
handle->accepted_socket = accept_req->socket;
case OIO_ACCEPT:
accept_req = (oio_accept_req*)req;
assert(accept_req->socket != INVALID_SOCKET);
assert(handle->accepted_socket == INVALID_SOCKET);
handle->accepted_socket = accept_req->socket;
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (success) {
if (setsockopt(handle->accepted_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
if (req->cb) {
((oio_accept_cb)req->cb)(handle);
success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE);
if (success) {
if (setsockopt(handle->accepted_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
if (req->cb) {
((oio_accept_cb)req->cb)(handle);
}
}
}
}
/* accept_cb should call oio_accept_handle which sets data->socket */
/* to INVALID_SOCKET. */
/* Errorneous accept is ignored if the listen socket is still healthy. */
if (handle->accepted_socket != INVALID_SOCKET) {
closesocket(handle->accepted_socket);
handle->accepted_socket = INVALID_SOCKET;
}
/* accept_cb should call oio_accept_handle which sets data->socket */
/* to INVALID_SOCKET. */
/* Errorneous accept is ignored if the listen socket is still healthy. */
if (handle->accepted_socket != INVALID_SOCKET) {
closesocket(handle->accepted_socket);
handle->accepted_socket = INVALID_SOCKET;
}
/* Queue another accept */
oio_queue_accept(accept_req, handle);
return;
/* Queue another accept */
if (!handle->flags & OIO_HANDLE_CLOSING)
oio_queue_accept(accept_req, handle);
break;
case OIO_CONNECT:
if (req->cb) {
success = GetOverlappedResult(handle->handle,
overlapped,
&bytes,
FALSE);
if (success) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
((oio_connect_cb)req->cb)(req, 0);
case OIO_CONNECT:
if (req->cb) {
success = GetOverlappedResult(handle->handle,
overlapped,
&bytes,
FALSE);
if (success) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
((oio_connect_cb)req->cb)(req, 0);
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
} else {
oio_set_sys_error(WSAGetLastError());
((oio_connect_cb)req->cb)(req, -1);
}
}
return;
break;
}
case OIO_CLOSE:
/* Should never get here */
assert(0);
}
/* The number of pending requests is now down by one */
handle->reqs_pending--;
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */
if (handle->flags & OIO_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
oio_close_ready(handle);
}
} /* if (overlapped) */
}

View File

@ -54,6 +54,7 @@ typedef struct oio_buf {
int flags;
#define oio_handle_private_fields \
oio_handle* closed_next; \
union { \
SOCKET socket; \
HANDLE handle; \

1
oio.h
View File

@ -105,7 +105,6 @@ typedef enum {
OIO_READ,
OIO_WRITE,
OIO_SHUTDOWN,
OIO_CLOSE,
OIO_TIMEOUT
} oio_req_type;