diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index f756262a..8f1c5e5c 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -151,7 +151,6 @@ typedef struct { ngx_queue_t async_handles; \ uv__io_t async_watcher; \ int async_pipefd[2]; \ - volatile sig_atomic_t async_sweep_needed; \ /* RB_HEAD(uv__timers, uv_timer_s) */ \ struct uv__timers { \ struct uv_timer_s* rbh_root; \ diff --git a/src/unix/async.c b/src/unix/async.c index 6abc8125..4d3cd94f 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -24,7 +24,6 @@ #include #include #include -#include #include static int uv__async_init(uv_loop_t* loop); @@ -82,25 +81,11 @@ int uv_async_send(uv_async_t* handle) { return 0; /* already pending */ do - r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle)); + r = write(handle->loop->async_pipefd[1], "x", 1); while (r == -1 && errno == EINTR); - if (r == -1) { - /* If pipe is full, event loop should sweep all handles */ - if (errno == EAGAIN || errno == EWOULDBLOCK) { - handle->loop->async_sweep_needed = 1; - - /* Pipe might be drained at this stage, which means that - * async_sweep_needed flag wasn't seen yet. Try writing pointer - * once again to ensure that pipe isn't drained yet, or to invoke - * uv__async_io. - */ - do - r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle)); - while (r == -1 && errno == EINTR); - } else - return uv__set_sys_error(handle->loop, errno); - } + if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) + return uv__set_sys_error(handle->loop, errno); return 0; } @@ -129,75 +114,34 @@ static int uv__async_init(uv_loop_t* loop) { } -#define ASYNC_CB(a) \ - if (((a)->flags & (UV_CLOSING | UV_CLOSED)) == 0 && (a)->pending) { \ - (a)->pending = 0; \ - (a)->async_cb((a), 0); \ - } - - static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) { char buf[1024]; ngx_queue_t* q; uv_async_t* h; ssize_t r; - ssize_t i; - ssize_t bytes; - ssize_t end; - bytes = 0; while (1) { - r = read(loop->async_pipefd[0], buf + bytes, sizeof(buf) - bytes); + r = read(loop->async_pipefd[0], buf, sizeof(buf)); - if (r == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - /* Spin if there're partial data in buffer already */ - if (bytes) - continue; - break; - } - - if (errno == EINTR) - continue; - - abort(); - } - - bytes += r; - - /* Round up end */ - end = (bytes / sizeof(h)) * sizeof(h); - - /* If we need to sweep all handles anyway - skip this loop */ - if (!loop->async_sweep_needed) { - for (i = 0; i < end; i += sizeof(h)) { - h = *((uv_async_t**) (buf + i)); - ASYNC_CB(h) - } - } - - bytes -= end; - - /* Partial read happened */ - if (bytes) { - memmove(buf, buf + end, bytes); - /* Spin */ + if (r == sizeof(buf)) continue; - } - if (r != sizeof(buf)) + if (r != -1) break; + + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + + if (errno == EINTR) + continue; + + abort(); } - /* Fast path: no sweep is required */ - if (!loop->async_sweep_needed) - return; - - /* Slow path: sweep all handles */ - loop->async_sweep_needed = 0; - ngx_queue_foreach(q, &loop->async_handles) { h = ngx_queue_data(q, uv_async_t, queue); - ASYNC_CB(h) + if (!h->pending) continue; + h->pending = 0; + h->async_cb(h, 0); } } diff --git a/src/unix/loop.c b/src/unix/loop.c index 7e90a0ab..c70513f3 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -52,7 +52,6 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { loop->time = uv_hrtime() / 1000000; loop->async_pipefd[0] = -1; loop->async_pipefd[1] = -1; - loop->async_sweep_needed = 0; loop->signal_pipefd[0] = -1; loop->signal_pipefd[1] = -1; loop->emfile_fd = -1;