diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 8f1c5e5c..f756262a 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -151,6 +151,7 @@ typedef struct { ngx_queue_t async_handles; \ uv__io_t async_watcher; \ int async_pipefd[2]; \ + volatile sig_atomic_t async_sweep_needed; \ /* RB_HEAD(uv__timers, uv_timer_s) */ \ struct uv__timers { \ struct uv_timer_s* rbh_root; \ diff --git a/src/unix/async.c b/src/unix/async.c index 4d3cd94f..8ab3c903 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -81,11 +81,25 @@ int uv_async_send(uv_async_t* handle) { return 0; /* already pending */ do - r = write(handle->loop->async_pipefd[1], "x", 1); + r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle)); while (r == -1 && errno == EINTR); - if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) - return uv__set_sys_error(handle->loop, errno); + if (r == -1) { + /* If pipe is full, event loop should sweep all handles */ + if (errno == EAGAIN || errno == EWOULDBLOCK) { + handle->loop->async_sweep_needed = 1; + + /* Pipe might be drained at this stage, which means that + * async_sweep_needed flag wasn't seen yet. Try writing pointer + * once again to ensure that pipe isn't drained yet, or to invoke + * uv__async_io. + */ + do + r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle)); + while (r == -1 && errno == EINTR); + } else + return uv__set_sys_error(handle->loop, errno); + } return 0; } @@ -114,34 +128,75 @@ static int uv__async_init(uv_loop_t* loop) { } +#define ASYNC_CB(a) \ + if (((a)->flags & (UV_CLOSING | UV_CLOSED)) == 0 && (a)->pending) { \ + (a)->pending = 0; \ + (a)->async_cb((a), 0); \ + } + + static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) { char buf[1024]; ngx_queue_t* q; uv_async_t* h; ssize_t r; + ssize_t i; + ssize_t bytes; + ssize_t end; + bytes = 0; while (1) { - r = read(loop->async_pipefd[0], buf, sizeof(buf)); + r = read(loop->async_pipefd[0], buf + bytes, sizeof(buf) - bytes); - if (r == sizeof(buf)) + if (r == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + /* Spin if there're partial data in buffer already */ + if (bytes) + continue; + break; + } + + if (errno == EINTR) + continue; + + abort(); + } + + bytes += r; + + /* Round up end */ + end = (bytes / sizeof(h)) * sizeof(h); + + /* If we need to sweep all handles anyway - skip this loop */ + if (!loop->async_sweep_needed) { + for (i = 0; i < end; i += sizeof(h)) { + h = *((uv_async_t**) (buf + i)); + ASYNC_CB(h) + } + } + + bytes -= end; + + /* Partial read happened */ + if (bytes) { + memmove(buf, buf + end, bytes); + /* Spin */ continue; + } - if (r != -1) + if (r != sizeof(buf)) break; - - if (errno == EAGAIN || errno == EWOULDBLOCK) - break; - - if (errno == EINTR) - continue; - - abort(); } + /* Fast path: no sweep is required */ + if (!loop->async_sweep_needed) + return; + + /* Slow path: sweep all handles */ + loop->async_sweep_needed = 0; + ngx_queue_foreach(q, &loop->async_handles) { h = ngx_queue_data(q, uv_async_t, queue); - if (!h->pending) continue; - h->pending = 0; - h->async_cb(h, 0); + ASYNC_CB(h) } } diff --git a/src/unix/loop.c b/src/unix/loop.c index c70513f3..7e90a0ab 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -52,6 +52,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { loop->time = uv_hrtime() / 1000000; loop->async_pipefd[0] = -1; loop->async_pipefd[1] = -1; + loop->async_sweep_needed = 0; loop->signal_pipefd[0] = -1; loop->signal_pipefd[1] = -1; loop->emfile_fd = -1;