diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index fe32c4b8..729082e0 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -54,9 +54,6 @@ # include "uv-bsd.h" #endif -struct uv__io_s; -struct uv_loop_s; - #ifndef UV_IO_PRIVATE_PLATFORM_FIELDS # define UV_IO_PRIVATE_PLATFORM_FIELDS /* empty */ #endif @@ -64,6 +61,10 @@ struct uv_loop_s; #define UV_IO_PRIVATE_FIELDS \ UV_IO_PRIVATE_PLATFORM_FIELDS \ +struct uv__io_s; +struct uv__async; +struct uv_loop_s; + typedef void (*uv__io_cb)(struct uv_loop_s* loop, struct uv__io_s* w, unsigned int events); @@ -79,6 +80,16 @@ struct uv__io_s { UV_IO_PRIVATE_FIELDS }; +typedef void (*uv__async_cb)(struct uv_loop_s* loop, + struct uv__async* w, + unsigned int nevents); + +struct uv__async { + uv__async_cb cb; + uv__io_t io_watcher; + int wfd; +}; + struct uv__work { void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); @@ -167,8 +178,7 @@ typedef struct { ngx_queue_t check_handles; \ ngx_queue_t idle_handles; \ ngx_queue_t async_handles; \ - uv__io_t async_watcher; \ - int async_pipefd[2]; \ + struct uv__async async_watcher; \ /* RB_HEAD(uv__timers, uv_timer_s) */ \ struct uv__timers { \ struct uv_timer_s* rbh_root; \ @@ -252,9 +262,9 @@ typedef struct { ngx_queue_t queue; #define UV_ASYNC_PRIVATE_FIELDS \ - volatile sig_atomic_t pending; \ uv_async_cb async_cb; \ - ngx_queue_t queue; + ngx_queue_t queue; \ + int pending; \ #define UV_TIMER_PRIVATE_FIELDS \ /* RB_ENTRY(uv_timer_s) tree_entry; */ \ diff --git a/src/unix/async.c b/src/unix/async.c index 322167c6..36d94402 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -18,6 +18,10 @@ * IN THE SOFTWARE. */ +/* This file contains both the uv__async internal infrastructure and the + * user-facing uv_async_t functions. + */ + #include "uv.h" #include "internal.h" @@ -26,41 +30,14 @@ #include #include -static int uv__async_init(uv_loop_t* loop); -static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events); - - -static int uv__async_make_pending(volatile sig_atomic_t* ptr) { - /* Do a cheap read first. */ - if (*ptr) - return 1; - - /* Micro-optimization: use atomic memory operations to detect if we've been - * preempted by another thread and don't have to make an expensive syscall. - * This speeds up the heavily contended case by about 1-2% and has little - * if any impact on the non-contended case. - * - * Use XCHG instead of the CMPXCHG that __sync_val_compare_and_swap() emits - * on x86, it's about 4x faster. It probably makes zero difference in the - * grand scheme of things but I'm OCD enough not to let this one pass. - */ -#if defined(__i386__) || defined(__x86_64__) - { - unsigned int val = 1; - __asm__ __volatile__("xchgl %0, %1" : "+r" (val) : "m" (*ptr)); - return val != 0; - } -#elif defined(__GNUC__) && (__GNUC__ > 4 || __GNUC__ == 4 && __GNUC_MINOR__ > 0) - return __sync_val_compare_and_swap(ptr, 0, 1) != 0; -#else - *ptr = 1; - return 0; -#endif -} +static void uv__async_event(uv_loop_t* loop, + struct uv__async* w, + unsigned int nevents); +static int uv__async_make_pending(int* pending); int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { - if (uv__async_init(loop)) + if (uv__async_start(loop, &loop->async_watcher, uv__async_event)) return uv__set_sys_error(loop, errno); uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); @@ -75,19 +52,8 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { int uv_async_send(uv_async_t* handle) { - int r; - - if (uv__async_make_pending(&handle->pending)) - return 0; /* already pending */ - - do - r = write(handle->loop->async_pipefd[1], "x", 1); - while (r == -1 && errno == EINTR); - - assert(r == -1 || r == 1); - - if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) - return uv__set_sys_error(handle->loop, errno); + if (uv__async_make_pending(&handle->pending) == 0) + uv__async_send(&handle->loop->async_watcher); return 0; } @@ -99,28 +65,64 @@ void uv__async_close(uv_async_t* handle) { } -static int uv__async_init(uv_loop_t* loop) { - if (loop->async_pipefd[0] != -1) - return 0; +static void uv__async_event(uv_loop_t* loop, + struct uv__async* w, + unsigned int nevents) { + ngx_queue_t* q; + uv_async_t* h; - if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK)) - return -1; + ngx_queue_foreach(q, &loop->async_handles) { + h = ngx_queue_data(q, uv_async_t, queue); + if (!h->pending) continue; + h->pending = 0; + h->async_cb(h, 0); + } +} - uv__io_init(&loop->async_watcher, uv__async_io, loop->async_pipefd[0]); - uv__io_start(loop, &loop->async_watcher, UV__POLLIN); +static int uv__async_make_pending(int* pending) { + /* Do a cheap read first. */ + if (ACCESS_ONCE(int, *pending) != 0) + return 1; + + /* Micro-optimization: use atomic memory operations to detect if we've been + * preempted by another thread and don't have to make an expensive syscall. + * This speeds up the heavily contended case by about 1-2% and has little + * if any impact on the non-contended case. + * + * Use XCHG instead of the CMPXCHG that __sync_val_compare_and_swap() emits + * on x86, it's about 4x faster. It probably makes zero difference in the + * grand scheme of things but I'm OCD enough not to let this one pass. + */ +#if defined(__i386__) || defined(__x86_64__) + { + unsigned int val = 1; + __asm__ __volatile__ ("xchgl %0, %1" + : "+r" (val) + : "m" (*pending)); + return val != 0; + } +#elif defined(__GNUC__) && (__GNUC__ > 4 || __GNUC__ == 4 && __GNUC_MINOR__ > 0) + return __sync_val_compare_and_swap(pending, 0, 1) != 0; +#else + ACCESS_ONCE(int, *pending) = 1; return 0; +#endif } static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { + struct uv__async* wa; char buf[1024]; - ngx_queue_t* q; - uv_async_t* h; + unsigned n; ssize_t r; - while (1) { - r = read(loop->async_pipefd[0], buf, sizeof(buf)); + n = 0; + for (;;) { + r = read(w->fd, buf, sizeof(buf)); + + if (r > 0) + n += r; if (r == sizeof(buf)) continue; @@ -137,10 +139,61 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { abort(); } - ngx_queue_foreach(q, &loop->async_handles) { - h = ngx_queue_data(q, uv_async_t, queue); - if (!h->pending) continue; - h->pending = 0; - h->async_cb(h, 0); - } + wa = container_of(w, struct uv__async, io_watcher); + wa->cb(loop, wa, n); +} + + +void uv__async_send(struct uv__async* wa) { + int r; + + do + r = write(wa->wfd, "", 1); + while (r == -1 && errno == EINTR); + + if (r == 1) + return; + + if (r == -1) + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + abort(); +} + + +void uv__async_init(struct uv__async* wa) { + wa->io_watcher.fd = -1; + wa->wfd = -1; +} + + +int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb) { + int pipefd[2]; + + if (wa->io_watcher.fd != -1) + return 0; + + if (uv__make_pipe(pipefd, UV__F_NONBLOCK)) + return -1; + + uv__io_init(&wa->io_watcher, uv__async_io, pipefd[0]); + uv__io_start(loop, &wa->io_watcher, UV__POLLIN); + wa->wfd = pipefd[1]; + wa->cb = cb; + + return 0; +} + + +void uv__async_stop(uv_loop_t* loop, struct uv__async* wa) { + if (wa->io_watcher.fd == -1) + return; + + uv__io_stop(loop, &wa->io_watcher, UV__POLLIN); + close(wa->io_watcher.fd); + close(wa->wfd); + + wa->io_watcher.fd = -1; + wa->wfd = -1; } diff --git a/src/unix/internal.h b/src/unix/internal.h index d6d2e45f..7d08c1da 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -45,6 +45,9 @@ # include #endif +#define ACCESS_ONCE(type, var) \ + (*(volatile type*) &(var)) + #define UNREACHABLE() \ do { \ assert(0 && "unreachable code"); \ @@ -111,7 +114,6 @@ int uv__nonblock(int fd, int set); int uv__cloexec(int fd, int set); int uv__socket(int domain, int type, int protocol); int uv__dup(int fd); -int uv_async_stop(uv_async_t* handle); void uv__make_close_pending(uv_handle_t* handle); void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd); @@ -122,6 +124,12 @@ void uv__io_feed(uv_loop_t* loop, uv__io_t* w); int uv__io_active(const uv__io_t* w, unsigned int events); void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */ +/* async */ +void uv__async_send(struct uv__async* wa); +void uv__async_init(struct uv__async* wa); +int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb); +void uv__async_stop(uv_loop_t* loop, struct uv__async* wa); + /* loop */ int uv__loop_init(uv_loop_t* loop, int default_loop); void uv__loop_delete(uv_loop_t* loop); diff --git a/src/unix/loop.c b/src/unix/loop.c index 99353107..61360727 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -50,8 +50,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { loop->closing_handles = NULL; loop->time = uv__hrtime() / 1000000; - loop->async_pipefd[0] = -1; - loop->async_pipefd[1] = -1; + uv__async_init(&loop->async_watcher); loop->signal_pipefd[0] = -1; loop->signal_pipefd[1] = -1; loop->backend_fd = -1; @@ -85,16 +84,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { void uv__loop_delete(uv_loop_t* loop) { uv__signal_loop_cleanup(loop); uv__platform_loop_delete(loop); - - if (loop->async_pipefd[0] != -1) { - close(loop->async_pipefd[0]); - loop->async_pipefd[0] = -1; - } - - if (loop->async_pipefd[1] != -1) { - close(loop->async_pipefd[1]); - loop->async_pipefd[1] = -1; - } + uv__async_stop(loop, &loop->async_watcher); if (loop->emfile_fd != -1) { close(loop->emfile_fd);