From 78bc0d61342d9c0f78c0a53d25668c53cb52fa00 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Mon, 11 Jun 2012 04:13:57 +0200 Subject: [PATCH] unix: implement async handles in libuv Replace libev backed async handles with a pure libuv implementation. --- include/uv-private/uv-unix.h | 11 +++-- src/unix/async.c | 96 +++++++++++++++++++++++++++++------- src/unix/loop.c | 3 ++ 3 files changed, 89 insertions(+), 21 deletions(-) diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 3b9619c8..62bb0aa4 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -38,6 +38,7 @@ #include #include +#include #if __sun # include @@ -113,6 +114,9 @@ struct uv__io_s { ngx_queue_t prepare_handles; \ ngx_queue_t check_handles; \ ngx_queue_t idle_handles; \ + ngx_queue_t async_handles; \ + uv__io_t async_watcher; \ + int async_pipefd[2]; \ /* RB_HEAD(uv__timers, uv_timer_s) */ \ struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \ uint64_t time; \ @@ -211,9 +215,10 @@ struct uv__io_s { /* UV_ASYNC */ -#define UV_ASYNC_PRIVATE_FIELDS \ - ev_async async_watcher; \ - uv_async_cb async_cb; +#define UV_ASYNC_PRIVATE_FIELDS \ + volatile sig_atomic_t pending; \ + uv_async_cb async_cb; \ + ngx_queue_t queue; /* UV_TIMER */ diff --git a/src/unix/async.c b/src/unix/async.c index 8bbf1625..f782e158 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -21,39 +21,99 @@ #include "uv.h" #include "internal.h" +#include +#include +#include +#include -static void uv__async(EV_P_ ev_async* w, int revents) { - uv_async_t* async = container_of(w, uv_async_t, async_watcher); - - if (async->async_cb) { - async->async_cb(async, 0); - } -} +static int uv__async_init(uv_loop_t* loop); +static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events); -int uv_async_init(uv_loop_t* loop, uv_async_t* async, uv_async_cb async_cb) { - uv__handle_init(loop, (uv_handle_t*)async, UV_ASYNC); +int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { + if (uv__async_init(loop)) + return uv__set_sys_error(loop, errno); + + uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); loop->counters.async_init++; - ev_async_init(&async->async_watcher, uv__async); - async->async_cb = async_cb; + handle->async_cb = async_cb; + handle->pending = 0; - /* Note: This does not have symmetry with the other libev wrappers. */ - ev_async_start(loop->ev, &async->async_watcher); - uv__handle_start(async); + ngx_queue_insert_tail(&loop->async_handles, &handle->queue); + uv__handle_start(handle); return 0; } -int uv_async_send(uv_async_t* async) { - ev_async_send(async->loop->ev, &async->async_watcher); +int uv_async_send(uv_async_t* handle) { + int r; + + handle->pending = 1; /* XXX needs a memory barrier? */ + + do + r = write(handle->loop->async_pipefd[1], "x", 1); + while (r == -1 && errno == EINTR); + + if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) + return uv__set_sys_error(handle->loop, errno); + return 0; } void uv__async_close(uv_async_t* handle) { - ev_async_stop(handle->loop->ev, &handle->async_watcher); - uv__handle_ref(handle); + ngx_queue_remove(&handle->queue); uv__handle_stop(handle); } + + +static int uv__async_init(uv_loop_t* loop) { + if (loop->async_pipefd[0] != -1) + return 0; + + if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK)) + return -1; + + uv__io_init(&loop->async_watcher, + uv__async_io, + loop->async_pipefd[0], + UV__IO_READ); + uv__io_start(loop, &loop->async_watcher); + + return 0; +} + + +static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) { + char buf[1024]; + ngx_queue_t* q; + uv_async_t* h; + ssize_t r; + + while (1) { + r = read(loop->async_pipefd[0], buf, sizeof(buf)); + + if (r == sizeof(buf)) + continue; + + if (r != -1) + break; + + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + + if (errno == EINTR) + continue; + + abort(); + } + + ngx_queue_foreach(q, &loop->async_handles) { + h = ngx_queue_data(q, uv_async_t, queue); + if (!h->pending) continue; + h->pending = 0; + h->async_cb(h, 0); + } +} diff --git a/src/unix/loop.c b/src/unix/loop.c index 08985d63..75fad436 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -40,12 +40,15 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { RB_INIT(&loop->timer_handles); ngx_queue_init(&loop->active_reqs); ngx_queue_init(&loop->idle_handles); + ngx_queue_init(&loop->async_handles); ngx_queue_init(&loop->check_handles); ngx_queue_init(&loop->prepare_handles); ngx_queue_init(&loop->handle_queue); loop->closing_handles = NULL; loop->channel = NULL; loop->time = uv_hrtime() / 1000000; + loop->async_pipefd[0] = -1; + loop->async_pipefd[1] = -1; loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags); ev_set_userdata(loop->ev, loop); eio_channel_init(&loop->uv_eio_channel, loop);