unix: simplify async watcher dispatch logic

Remove the unused `nevents` parameter from `uv__async_event()`
and remove the indirection of having a separate `uv__async`
type.  There is only one instance per event loop these days.

This incidentally removes the `assert(n == sizeof(val))` in a
Linux-specific code path that some users seem to hit from time
to time.  The cause is not well-understood and I've never been
able to reproduce it myself.  Presumably libuv gets an EAGAIN
when trying to read from the eventfd but when and why that
happens is unclear.

Since the byte count is unused, removing the assert seems safe.
Worst case, libuv sometimes iterates over the async watcher list
when it doesn't have to.

Fixes: https://github.com/libuv/libuv/issues/1171
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
Reviewed-By: Saúl Ibarra Corretgé <saghul@gmail.com>
Supersedes: https://github.com/libuv/libuv/pull/1214
This commit is contained in:
Ben Noordhuis 2017-03-14 10:01:35 +01:00
parent d7cf771072
commit 5fc8aecd81
4 changed files with 46 additions and 88 deletions

View File

@ -79,7 +79,6 @@
#endif
struct uv__io_s;
struct uv__async;
struct uv_loop_s;
typedef void (*uv__io_cb)(struct uv_loop_s* loop,
@ -97,16 +96,6 @@ struct uv__io_s {
UV_IO_PRIVATE_PLATFORM_FIELDS
};
typedef void (*uv__async_cb)(struct uv_loop_s* loop,
struct uv__async* w,
unsigned int nevents);
struct uv__async {
uv__async_cb cb;
uv__io_t io_watcher;
int wfd;
};
#ifndef UV_PLATFORM_SEM_T
# define UV_PLATFORM_SEM_T sem_t
#endif
@ -216,7 +205,9 @@ typedef struct {
void* check_handles[2]; \
void* idle_handles[2]; \
void* async_handles[2]; \
struct uv__async async_watcher; \
void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \
uv__io_t async_io_watcher; \
int async_wfd; \
struct { \
void* min; \
unsigned int nelts; \

View File

@ -33,16 +33,15 @@
#include <string.h>
#include <unistd.h>
static void uv__async_event(uv_loop_t* loop,
struct uv__async* w,
unsigned int nevents);
static void uv__async_send(uv_loop_t* loop);
static int uv__async_start(uv_loop_t* loop);
static int uv__async_eventfd(void);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int err;
err = uv__async_start(loop, &loop->async_watcher, uv__async_event);
err = uv__async_start(loop);
if (err)
return err;
@ -63,7 +62,7 @@ int uv_async_send(uv_async_t* handle) {
return 0;
if (cmpxchgi(&handle->pending, 0, 1) == 0)
uv__async_send(&handle->loop->async_watcher);
uv__async_send(handle->loop);
return 0;
}
@ -75,44 +74,18 @@ void uv__async_close(uv_async_t* handle) {
}
static void uv__async_event(uv_loop_t* loop,
struct uv__async* w,
unsigned int nevents) {
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ssize_t r;
QUEUE queue;
QUEUE* q;
uv_async_t* h;
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
assert(w == &loop->async_io_watcher);
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;
if (h->async_cb == NULL)
continue;
h->async_cb(h);
}
}
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
struct uv__async* wa;
char buf[1024];
unsigned n;
ssize_t r;
n = 0;
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r > 0)
n += r;
if (r == sizeof(buf))
continue;
@ -128,23 +101,26 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
abort();
}
wa = container_of(w, struct uv__async, io_watcher);
QUEUE_MOVE(&loop->async_handles, &queue);
while (!QUEUE_EMPTY(&queue)) {
q = QUEUE_HEAD(&queue);
h = QUEUE_DATA(q, uv_async_t, queue);
#if defined(__linux__)
if (wa->wfd == -1) {
uint64_t val;
assert(n == sizeof(val));
memcpy(&val, buf, sizeof(val)); /* Avoid alignment issues. */
wa->cb(loop, wa, val);
return;
QUEUE_REMOVE(q);
QUEUE_INSERT_TAIL(&loop->async_handles, q);
if (cmpxchgi(&h->pending, 1, 0) == 0)
continue;
if (h->async_cb == NULL)
continue;
h->async_cb(h);
}
#endif
wa->cb(loop, wa, n);
}
void uv__async_send(struct uv__async* wa) {
static void uv__async_send(uv_loop_t* loop) {
const void* buf;
ssize_t len;
int fd;
@ -152,14 +128,14 @@ void uv__async_send(struct uv__async* wa) {
buf = "";
len = 1;
fd = wa->wfd;
fd = loop->async_wfd;
#if defined(__linux__)
if (fd == -1) {
static const uint64_t val = 1;
buf = &val;
len = sizeof(val);
fd = wa->io_watcher.fd; /* eventfd */
fd = loop->async_io_watcher.fd; /* eventfd */
}
#endif
@ -178,17 +154,11 @@ void uv__async_send(struct uv__async* wa) {
}
void uv__async_init(struct uv__async* wa) {
wa->io_watcher.fd = -1;
wa->wfd = -1;
}
int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb) {
static int uv__async_start(uv_loop_t* loop) {
int pipefd[2];
int err;
if (wa->io_watcher.fd != -1)
if (loop->async_io_watcher.fd != -1)
return 0;
err = uv__async_eventfd();
@ -222,28 +192,27 @@ int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb) {
if (err < 0)
return err;
uv__io_init(&wa->io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &wa->io_watcher, POLLIN);
wa->wfd = pipefd[1];
wa->cb = cb;
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &loop->async_io_watcher, POLLIN);
loop->async_wfd = pipefd[1];
return 0;
}
void uv__async_stop(uv_loop_t* loop, struct uv__async* wa) {
if (wa->io_watcher.fd == -1)
void uv__async_stop(uv_loop_t* loop) {
if (loop->async_io_watcher.fd == -1)
return;
if (wa->wfd != -1) {
if (wa->wfd != wa->io_watcher.fd)
uv__close(wa->wfd);
wa->wfd = -1;
if (loop->async_wfd != -1) {
if (loop->async_wfd != loop->async_io_watcher.fd)
uv__close(loop->async_wfd);
loop->async_wfd = -1;
}
uv__io_stop(loop, &wa->io_watcher, POLLIN);
uv__close(wa->io_watcher.fd);
wa->io_watcher.fd = -1;
uv__io_stop(loop, &loop->async_io_watcher, POLLIN);
uv__close(loop->async_io_watcher.fd);
loop->async_io_watcher.fd = -1;
}

View File

@ -194,10 +194,7 @@ int uv__io_check_fd(uv_loop_t* loop, int fd);
void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */
/* async */
void uv__async_send(struct uv__async* wa);
void uv__async_init(struct uv__async* wa);
int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb);
void uv__async_stop(uv_loop_t* loop, struct uv__async* wa);
void uv__async_stop(uv_loop_t* loop);
/* loop */
void uv__run_idle(uv_loop_t* loop);

View File

@ -54,7 +54,8 @@ int uv_loop_init(uv_loop_t* loop) {
loop->closing_handles = NULL;
uv__update_time(loop);
uv__async_init(&loop->async_watcher);
loop->async_io_watcher.fd = -1;
loop->async_wfd = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
@ -111,7 +112,7 @@ fail_signal_init:
void uv__loop_close(uv_loop_t* loop) {
uv__signal_loop_cleanup(loop);
uv__platform_loop_delete(loop);
uv__async_stop(loop, &loop->async_watcher);
uv__async_stop(loop);
if (loop->emfile_fd != -1) {
uv__close(loop->emfile_fd);