unix: abstract away async pipe infrastructure

This commit lays the groundwork for the switch to eventfds on Linux.
This commit is contained in:
Ben Noordhuis 2013-02-25 01:32:19 +01:00
parent e89aced8d6
commit 92151658eb
4 changed files with 143 additions and 82 deletions

View File

@ -54,9 +54,6 @@
# include "uv-bsd.h"
#endif
struct uv__io_s;
struct uv_loop_s;
#ifndef UV_IO_PRIVATE_PLATFORM_FIELDS
# define UV_IO_PRIVATE_PLATFORM_FIELDS /* empty */
#endif
@ -64,6 +61,10 @@ struct uv_loop_s;
#define UV_IO_PRIVATE_FIELDS \
UV_IO_PRIVATE_PLATFORM_FIELDS \
struct uv__io_s;
struct uv__async;
struct uv_loop_s;
typedef void (*uv__io_cb)(struct uv_loop_s* loop,
struct uv__io_s* w,
unsigned int events);
@ -79,6 +80,16 @@ struct uv__io_s {
UV_IO_PRIVATE_FIELDS
};
typedef void (*uv__async_cb)(struct uv_loop_s* loop,
struct uv__async* w,
unsigned int nevents);
struct uv__async {
uv__async_cb cb;
uv__io_t io_watcher;
int wfd;
};
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
@ -167,8 +178,7 @@ typedef struct {
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
ngx_queue_t async_handles; \
uv__io_t async_watcher; \
int async_pipefd[2]; \
struct uv__async async_watcher; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { \
struct uv_timer_s* rbh_root; \
@ -252,9 +262,9 @@ typedef struct {
ngx_queue_t queue;
#define UV_ASYNC_PRIVATE_FIELDS \
volatile sig_atomic_t pending; \
uv_async_cb async_cb; \
ngx_queue_t queue;
ngx_queue_t queue; \
int pending; \
#define UV_TIMER_PRIVATE_FIELDS \
/* RB_ENTRY(uv_timer_s) tree_entry; */ \

View File

@ -18,6 +18,10 @@
* IN THE SOFTWARE.
*/
/* This file contains both the uv__async internal infrastructure and the
* user-facing uv_async_t functions.
*/
#include "uv.h"
#include "internal.h"
@ -26,41 +30,14 @@
#include <stdlib.h>
#include <unistd.h>
static int uv__async_init(uv_loop_t* loop);
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static int uv__async_make_pending(volatile sig_atomic_t* ptr) {
/* Do a cheap read first. */
if (*ptr)
return 1;
/* Micro-optimization: use atomic memory operations to detect if we've been
* preempted by another thread and don't have to make an expensive syscall.
* This speeds up the heavily contended case by about 1-2% and has little
* if any impact on the non-contended case.
*
* Use XCHG instead of the CMPXCHG that __sync_val_compare_and_swap() emits
* on x86, it's about 4x faster. It probably makes zero difference in the
* grand scheme of things but I'm OCD enough not to let this one pass.
*/
#if defined(__i386__) || defined(__x86_64__)
{
unsigned int val = 1;
__asm__ __volatile__("xchgl %0, %1" : "+r" (val) : "m" (*ptr));
return val != 0;
}
#elif defined(__GNUC__) && (__GNUC__ > 4 || __GNUC__ == 4 && __GNUC_MINOR__ > 0)
return __sync_val_compare_and_swap(ptr, 0, 1) != 0;
#else
*ptr = 1;
return 0;
#endif
}
static void uv__async_event(uv_loop_t* loop,
struct uv__async* w,
unsigned int nevents);
static int uv__async_make_pending(int* pending);
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
if (uv__async_init(loop))
if (uv__async_start(loop, &loop->async_watcher, uv__async_event))
return uv__set_sys_error(loop, errno);
uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
@ -75,19 +52,8 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
int uv_async_send(uv_async_t* handle) {
int r;
if (uv__async_make_pending(&handle->pending))
return 0; /* already pending */
do
r = write(handle->loop->async_pipefd[1], "x", 1);
while (r == -1 && errno == EINTR);
assert(r == -1 || r == 1);
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
return uv__set_sys_error(handle->loop, errno);
if (uv__async_make_pending(&handle->pending) == 0)
uv__async_send(&handle->loop->async_watcher);
return 0;
}
@ -99,28 +65,64 @@ void uv__async_close(uv_async_t* handle) {
}
static int uv__async_init(uv_loop_t* loop) {
if (loop->async_pipefd[0] != -1)
return 0;
static void uv__async_event(uv_loop_t* loop,
struct uv__async* w,
unsigned int nevents) {
ngx_queue_t* q;
uv_async_t* h;
if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK))
return -1;
ngx_queue_foreach(q, &loop->async_handles) {
h = ngx_queue_data(q, uv_async_t, queue);
if (!h->pending) continue;
h->pending = 0;
h->async_cb(h, 0);
}
}
uv__io_init(&loop->async_watcher, uv__async_io, loop->async_pipefd[0]);
uv__io_start(loop, &loop->async_watcher, UV__POLLIN);
static int uv__async_make_pending(int* pending) {
/* Do a cheap read first. */
if (ACCESS_ONCE(int, *pending) != 0)
return 1;
/* Micro-optimization: use atomic memory operations to detect if we've been
* preempted by another thread and don't have to make an expensive syscall.
* This speeds up the heavily contended case by about 1-2% and has little
* if any impact on the non-contended case.
*
* Use XCHG instead of the CMPXCHG that __sync_val_compare_and_swap() emits
* on x86, it's about 4x faster. It probably makes zero difference in the
* grand scheme of things but I'm OCD enough not to let this one pass.
*/
#if defined(__i386__) || defined(__x86_64__)
{
unsigned int val = 1;
__asm__ __volatile__ ("xchgl %0, %1"
: "+r" (val)
: "m" (*pending));
return val != 0;
}
#elif defined(__GNUC__) && (__GNUC__ > 4 || __GNUC__ == 4 && __GNUC_MINOR__ > 0)
return __sync_val_compare_and_swap(pending, 0, 1) != 0;
#else
ACCESS_ONCE(int, *pending) = 1;
return 0;
#endif
}
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
struct uv__async* wa;
char buf[1024];
ngx_queue_t* q;
uv_async_t* h;
unsigned n;
ssize_t r;
while (1) {
r = read(loop->async_pipefd[0], buf, sizeof(buf));
n = 0;
for (;;) {
r = read(w->fd, buf, sizeof(buf));
if (r > 0)
n += r;
if (r == sizeof(buf))
continue;
@ -137,10 +139,61 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
abort();
}
ngx_queue_foreach(q, &loop->async_handles) {
h = ngx_queue_data(q, uv_async_t, queue);
if (!h->pending) continue;
h->pending = 0;
h->async_cb(h, 0);
}
wa = container_of(w, struct uv__async, io_watcher);
wa->cb(loop, wa, n);
}
void uv__async_send(struct uv__async* wa) {
int r;
do
r = write(wa->wfd, "", 1);
while (r == -1 && errno == EINTR);
if (r == 1)
return;
if (r == -1)
if (errno == EAGAIN || errno == EWOULDBLOCK)
return;
abort();
}
void uv__async_init(struct uv__async* wa) {
wa->io_watcher.fd = -1;
wa->wfd = -1;
}
int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb) {
int pipefd[2];
if (wa->io_watcher.fd != -1)
return 0;
if (uv__make_pipe(pipefd, UV__F_NONBLOCK))
return -1;
uv__io_init(&wa->io_watcher, uv__async_io, pipefd[0]);
uv__io_start(loop, &wa->io_watcher, UV__POLLIN);
wa->wfd = pipefd[1];
wa->cb = cb;
return 0;
}
void uv__async_stop(uv_loop_t* loop, struct uv__async* wa) {
if (wa->io_watcher.fd == -1)
return;
uv__io_stop(loop, &wa->io_watcher, UV__POLLIN);
close(wa->io_watcher.fd);
close(wa->wfd);
wa->io_watcher.fd = -1;
wa->wfd = -1;
}

View File

@ -45,6 +45,9 @@
# include <CoreServices/CoreServices.h>
#endif
#define ACCESS_ONCE(type, var) \
(*(volatile type*) &(var))
#define UNREACHABLE() \
do { \
assert(0 && "unreachable code"); \
@ -111,7 +114,6 @@ int uv__nonblock(int fd, int set);
int uv__cloexec(int fd, int set);
int uv__socket(int domain, int type, int protocol);
int uv__dup(int fd);
int uv_async_stop(uv_async_t* handle);
void uv__make_close_pending(uv_handle_t* handle);
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
@ -122,6 +124,12 @@ void uv__io_feed(uv_loop_t* loop, uv__io_t* w);
int uv__io_active(const uv__io_t* w, unsigned int events);
void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */
/* async */
void uv__async_send(struct uv__async* wa);
void uv__async_init(struct uv__async* wa);
int uv__async_start(uv_loop_t* loop, struct uv__async* wa, uv__async_cb cb);
void uv__async_stop(uv_loop_t* loop, struct uv__async* wa);
/* loop */
int uv__loop_init(uv_loop_t* loop, int default_loop);
void uv__loop_delete(uv_loop_t* loop);

View File

@ -50,8 +50,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
loop->closing_handles = NULL;
loop->time = uv__hrtime() / 1000000;
loop->async_pipefd[0] = -1;
loop->async_pipefd[1] = -1;
uv__async_init(&loop->async_watcher);
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
@ -85,16 +84,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
void uv__loop_delete(uv_loop_t* loop) {
uv__signal_loop_cleanup(loop);
uv__platform_loop_delete(loop);
if (loop->async_pipefd[0] != -1) {
close(loop->async_pipefd[0]);
loop->async_pipefd[0] = -1;
}
if (loop->async_pipefd[1] != -1) {
close(loop->async_pipefd[1]);
loop->async_pipefd[1] = -1;
}
uv__async_stop(loop, &loop->async_watcher);
if (loop->emfile_fd != -1) {
close(loop->emfile_fd);