unix: rework pending handle/req logic

This commit is contained in:
Ben Noordhuis 2012-05-30 00:08:22 +02:00
parent 12ee388cd9
commit 58a272e556
6 changed files with 75 additions and 110 deletions

View File

@ -106,7 +106,7 @@ struct uv__io_s {
uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \
uv_handle_t* pending_handles; \
uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
@ -144,7 +144,7 @@ struct uv__io_s {
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
int flags; \
uv_handle_t* next_pending; \
uv_handle_t* next_closing; \
#define UV_STREAM_PRIVATE_FIELDS \

View File

@ -59,8 +59,6 @@
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;
static void uv__finish_close(uv_handle_t* handle);
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
handle->close_cb = close_cb;
@ -116,7 +114,72 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
}
handle->flags |= UV_CLOSING;
uv__make_pending(handle);
handle->next_closing = handle->loop->closing_handles;
handle->loop->closing_handles = handle;
}
static void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;
switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;
case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;
case UV_FS_EVENT:
break;
case UV_POLL:
break;
default:
assert(0);
break;
}
if (handle->close_cb) {
handle->close_cb(handle);
}
uv__handle_unref(handle);
}
static void uv__run_closing_handles(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;
p = loop->closing_handles;
loop->closing_handles = NULL;
while (p) {
q = p->next_closing;
uv__finish_close(p);
p = q;
}
}
@ -163,36 +226,6 @@ void uv_loop_delete(uv_loop_t* loop) {
}
static void uv__run_pending(uv_loop_t* loop) {
uv_handle_t* p;
uv_handle_t* q;
if (!loop->pending_handles)
return;
for (p = loop->pending_handles, loop->pending_handles = NULL; p; p = q) {
q = p->next_pending;
p->next_pending = NULL;
p->flags &= ~UV__PENDING;
if (p->flags & UV_CLOSING) {
uv__finish_close(p);
continue;
}
switch (p->type) {
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
uv__stream_pending((uv_stream_t*)p);
break;
default:
abort();
}
}
}
static void uv__poll(uv_loop_t* loop, int block) {
/* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping
@ -210,7 +243,6 @@ static int uv__should_block(uv_loop_t* loop) {
static int uv__run(uv_loop_t* loop) {
uv__run_idle(loop);
uv__run_pending(loop);
if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
uv__run_prepare(loop);
@ -221,9 +253,9 @@ static int uv__run(uv_loop_t* loop) {
uv__run_check(loop);
}
return uv__has_pending_handles(loop)
|| uv__has_active_handles(loop)
|| uv__has_active_reqs(loop);
uv__run_closing_handles(loop);
return uv__has_active_handles(loop) || uv__has_active_reqs(loop);
}
@ -245,55 +277,7 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
handle->loop = loop;
handle->type = type;
handle->flags = UV__REF; /* ref the loop when active */
handle->next_pending = NULL;
}
void uv__finish_close(uv_handle_t* handle) {
assert(!uv__is_active(handle));
assert(handle->flags & UV_CLOSING);
assert(!(handle->flags & UV_CLOSED));
handle->flags |= UV_CLOSED;
switch (handle->type) {
case UV_PREPARE:
case UV_CHECK:
case UV_IDLE:
case UV_ASYNC:
case UV_TIMER:
case UV_PROCESS:
break;
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;
case UV_UDP:
uv__udp_finish_close((uv_udp_t*)handle);
break;
case UV_FS_EVENT:
break;
case UV_POLL:
break;
default:
assert(0);
break;
}
if (handle->close_cb) {
handle->close_cb(handle);
}
uv__handle_unref(handle);
handle->next_closing = NULL;
}

View File

@ -98,18 +98,6 @@ enum {
UV__PENDING = 0x800
};
inline static int uv__has_pending_handles(const uv_loop_t* loop) {
return loop->pending_handles != NULL;
}
inline static void uv__make_pending(uv_handle_t* h) {
if (h->flags & UV__PENDING) return;
h->next_pending = h->loop->pending_handles;
h->loop->pending_handles = h;
h->flags |= UV__PENDING;
}
#define uv__make_pending(h) uv__make_pending((uv_handle_t*)(h))
inline static void uv__req_init(uv_loop_t* loop,
uv_req_t* req,
uv_req_type type) {
@ -180,8 +168,6 @@ void uv__timer_close(uv_timer_t* handle);
void uv__udp_close(uv_udp_t* handle);
void uv__udp_finish_close(uv_udp_t* handle);
void uv__stream_pending(uv_stream_t* handle);
#define UV__F_IPC (1 << 0)
#define UV__F_NONBLOCK (1 << 1)
int uv__make_socketpair(int fds[2], int flags);

View File

@ -41,7 +41,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
loop->pending_handles = NULL;
loop->closing_handles = NULL;
loop->channel = NULL;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);

View File

@ -218,7 +218,7 @@ out:
ngx_queue_init(&req->queue);
/* Run callback on next tick. */
uv__make_pending(handle);
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
/* Mimic the Windows pipe implementation, always
* return 0 and let the callback handle errors.

View File

@ -718,11 +718,6 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
}
void uv__stream_pending(uv_stream_t* handle) {
uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE);
}
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
uv_stream_t* stream;
@ -859,7 +854,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
uv__io_start(stream->loop, &stream->write_watcher);
if (stream->delayed_error)
uv__make_pending(stream);
uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);
return 0;
}