unix: implement timers in libuv

* replace libev backed timers with a pure libuv implementation
* gut ev_run() and make it take a timeout instead of flags

Incidentally speeds up the loop_count_timed benchmark by about 100%.
This commit is contained in:
Ben Noordhuis 2012-05-31 00:47:04 +02:00
parent 3f37ba859f
commit c9396dd57e
7 changed files with 128 additions and 167 deletions

View File

@ -623,7 +623,7 @@ enum {
}; };
#if EV_PROTOTYPES #if EV_PROTOTYPES
void ev_run (EV_P_ int flags EV_CPP (= 0)); void ev_run (EV_P_ ev_tstamp waittime);
void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */ void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */
/* /*

View File

@ -98,18 +98,21 @@ struct uv__io_s {
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS # define UV_LOOP_PRIVATE_PLATFORM_FIELDS
#endif #endif
#define UV_LOOP_PRIVATE_FIELDS \ #define UV_LOOP_PRIVATE_FIELDS \
/* Poll result queue */ \ /* Poll result queue */ \
eio_channel uv_eio_channel; \ eio_channel uv_eio_channel; \
struct ev_loop* ev; \ struct ev_loop* ev; \
/* Various thing for libeio. */ \ /* Various thing for libeio. */ \
uv_async_t uv_eio_want_poll_notifier; \ uv_async_t uv_eio_want_poll_notifier; \
uv_async_t uv_eio_done_poll_notifier; \ uv_async_t uv_eio_done_poll_notifier; \
uv_idle_t uv_eio_poller; \ uv_idle_t uv_eio_poller; \
uv_handle_t* closing_handles; \ uv_handle_t* closing_handles; \
ngx_queue_t prepare_handles; \ ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \ ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \ ngx_queue_t idle_handles; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
uint64_t time; \
UV_LOOP_PRIVATE_PLATFORM_FIELDS UV_LOOP_PRIVATE_PLATFORM_FIELDS
#define UV_REQ_BUFSML_SIZE (4) #define UV_REQ_BUFSML_SIZE (4)
@ -186,7 +189,7 @@ struct uv__io_s {
uv__io_t io_watcher; uv__io_t io_watcher;
/* UV_PREPARE */ \ /* UV_PREPARE */
#define UV_PREPARE_PRIVATE_FIELDS \ #define UV_PREPARE_PRIVATE_FIELDS \
uv_prepare_cb prepare_cb; \ uv_prepare_cb prepare_cb; \
ngx_queue_t queue; ngx_queue_t queue;
@ -211,9 +214,17 @@ struct uv__io_s {
/* UV_TIMER */ /* UV_TIMER */
#define UV_TIMER_PRIVATE_FIELDS \ #define UV_TIMER_PRIVATE_FIELDS \
ev_timer timer_watcher; \ /* RB_ENTRY(uv_timer_s) node; */ \
uv_timer_cb timer_cb; struct { \
struct uv_timer_s* rbe_left; \
struct uv_timer_s* rbe_right; \
struct uv_timer_s* rbe_parent; \
int rbe_color; \
} tree_entry; \
uv_timer_cb timer_cb; \
uint64_t timeout; \
uint64_t repeat;
#define UV_GETADDRINFO_PRIVATE_FIELDS \ #define UV_GETADDRINFO_PRIVATE_FIELDS \
uv_getaddrinfo_cb cb; \ uv_getaddrinfo_cb cb; \

View File

@ -226,22 +226,30 @@ void uv_loop_delete(uv_loop_t* loop) {
} }
static void uv__poll(uv_loop_t* loop, int block) { static void uv__poll(uv_loop_t* loop, unsigned int timeout) {
/* bump the loop's refcount, otherwise libev does /* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping * a zero timeout poll and we end up busy looping
*/ */
ev_ref(loop->ev); ev_ref(loop->ev);
ev_run(loop->ev, block ? EVRUN_ONCE : EVRUN_NOWAIT); ev_run(loop->ev, timeout / 1000.);
ev_unref(loop->ev); ev_unref(loop->ev);
} }
static int uv__should_block(uv_loop_t* loop) { static unsigned int uv__poll_timeout(uv_loop_t* loop) {
return loop->active_handles && ngx_queue_empty(&loop->idle_handles); if (!uv__has_active_handles(loop))
return 0;
if (!ngx_queue_empty(&loop->idle_handles))
return 0;
return uv__next_timeout(loop);
} }
static int uv__run(uv_loop_t* loop) { static int uv__run(uv_loop_t* loop) {
uv_update_time(loop);
uv__run_timers(loop);
uv__run_idle(loop); uv__run_idle(loop);
if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) { if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
@ -249,7 +257,7 @@ static int uv__run(uv_loop_t* loop) {
/* Need to poll even if there are no active handles left, otherwise /* Need to poll even if there are no active handles left, otherwise
* uv_work_t reqs won't complete... * uv_work_t reqs won't complete...
*/ */
uv__poll(loop, uv__should_block(loop)); uv__poll(loop, uv__poll_timeout(loop));
uv__run_check(loop); uv__run_check(loop);
} }
@ -283,12 +291,12 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
void uv_update_time(uv_loop_t* loop) { void uv_update_time(uv_loop_t* loop) {
ev_now_update(loop->ev); loop->time = uv_hrtime() / 1000000;
} }
int64_t uv_now(uv_loop_t* loop) { int64_t uv_now(uv_loop_t* loop) {
return (int64_t)(ev_now(loop->ev) * 1000); return loop->time;
} }

View File

@ -2389,7 +2389,7 @@ time_update (EV_P_ ev_tstamp max_block)
} }
void void
ev_run (EV_P_ int flags) ev_run (EV_P_ ev_tstamp waittime)
{ {
#if EV_FEATURE_API #if EV_FEATURE_API
++loop_depth; ++loop_depth;
@ -2426,15 +2426,6 @@ ev_run (EV_P_ int flags)
} }
#endif #endif
#if EV_PREPARE_ENABLE
/* queue prepare watchers (and execute them) */
if (expect_false (preparecnt))
{
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
EV_INVOKE_PENDING;
}
#endif
if (expect_false (loop_done)) if (expect_false (loop_done))
break; break;
@ -2445,90 +2436,16 @@ ev_run (EV_P_ int flags)
/* update fd-related kernel structures */ /* update fd-related kernel structures */
fd_reify (EV_A); fd_reify (EV_A);
/* calculate blocking time */
{
ev_tstamp waittime = 0.;
ev_tstamp sleeptime = 0.;
/* remember old timestamp for io_blocktime calculation */
ev_tstamp prev_mn_now = mn_now;
/* update time to cancel out callback processing overhead */
time_update (EV_A_ 1e100);
if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt)))
{
waittime = MAX_BLOCKTIME;
if (timercnt)
{
ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge;
if (waittime > to) waittime = to;
}
#if EV_PERIODIC_ENABLE
if (periodiccnt)
{
ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now + backend_fudge;
if (waittime > to) waittime = to;
}
#endif
/* don't let timeouts decrease the waittime below timeout_blocktime */
if (expect_false (waittime < timeout_blocktime))
waittime = timeout_blocktime;
/* extra check because io_blocktime is commonly 0 */
if (expect_false (io_blocktime))
{
sleeptime = io_blocktime - (mn_now - prev_mn_now);
if (sleeptime > waittime - backend_fudge)
sleeptime = waittime - backend_fudge;
if (expect_true (sleeptime > 0.))
{
ev_sleep (sleeptime);
waittime -= sleeptime;
}
}
}
#if EV_FEATURE_API #if EV_FEATURE_API
++loop_count; ++loop_count;
#endif
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
backend_poll (EV_A_ waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
/* update ev_rt_now, do magic */
time_update (EV_A_ waittime + sleeptime);
}
/* queue pending timers and reschedule them */
timers_reify (EV_A); /* relative timers called last */
#if EV_PERIODIC_ENABLE
periodics_reify (EV_A); /* absolute timers called first */
#endif
#if EV_IDLE_ENABLE
/* queue idle watchers unless other events are pending */
idle_reify (EV_A);
#endif
#if EV_CHECK_ENABLE
/* queue check watchers, to be executed first */
if (expect_false (checkcnt))
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
#endif #endif
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
backend_poll (EV_A_ waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
EV_INVOKE_PENDING; EV_INVOKE_PENDING;
} }
while (expect_true ( while (0);
activecnt
&& !loop_done
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
));
if (loop_done == EVBREAK_ONE) if (loop_done == EVBREAK_ONE)
loop_done = EVBREAK_CANCEL; loop_done = EVBREAK_CANCEL;

View File

@ -93,8 +93,7 @@ enum {
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */ UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */ UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
UV_TCP_NODELAY = 0x100, /* Disable Nagle. */ UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */ UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */
UV_TIMER_REPEAT = 0x100
}; };
inline static void uv__req_init(uv_loop_t* loop, inline static void uv__req_init(uv_loop_t* loop,
@ -151,8 +150,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
/* pipe */ /* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
/* poll */ /* timer */
void uv__poll_close(uv_poll_t* handle); void uv__run_timers(uv_loop_t* loop);
unsigned int uv__next_timeout(uv_loop_t* loop);
/* various */ /* various */
void uv__async_close(uv_async_t* handle); void uv__async_close(uv_async_t* handle);
@ -160,6 +160,7 @@ void uv__check_close(uv_check_t* handle);
void uv__fs_event_close(uv_fs_event_t* handle); void uv__fs_event_close(uv_fs_event_t* handle);
void uv__idle_close(uv_idle_t* handle); void uv__idle_close(uv_idle_t* handle);
void uv__pipe_close(uv_pipe_t* handle); void uv__pipe_close(uv_pipe_t* handle);
void uv__poll_close(uv_poll_t* handle);
void uv__prepare_close(uv_prepare_t* handle); void uv__prepare_close(uv_prepare_t* handle);
void uv__process_close(uv_process_t* handle); void uv__process_close(uv_process_t* handle);
void uv__stream_close(uv_stream_t* handle); void uv__stream_close(uv_stream_t* handle);

View File

@ -37,6 +37,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
memset(loop, 0, sizeof(*loop)); memset(loop, 0, sizeof(*loop));
RB_INIT(&loop->ares_handles); RB_INIT(&loop->ares_handles);
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->active_reqs); ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles); ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles); ngx_queue_init(&loop->check_handles);
@ -44,6 +45,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->handle_queue); ngx_queue_init(&loop->handle_queue);
loop->closing_handles = NULL; loop->closing_handles = NULL;
loop->channel = NULL; loop->channel = NULL;
loop->time = uv_hrtime() / 1000000;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags); loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop); ev_set_userdata(loop->ev, loop);
eio_channel_init(&loop->uv_eio_channel, loop); eio_channel_init(&loop->uv_eio_channel, loop);

View File

@ -22,92 +22,114 @@
#include "internal.h" #include "internal.h"
#include <assert.h> #include <assert.h>
static int uv__timer_cmp(const uv_timer_t* a, const uv_timer_t* b) {
static int uv__timer_repeating(const uv_timer_t* timer) { if (a->timeout < b->timeout)
return timer->flags & UV_TIMER_REPEAT; return -1;
if (a->timeout > b->timeout)
return 1;
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
} }
static void uv__timer_cb(EV_P_ ev_timer* w, int revents) { RB_GENERATE_STATIC(uv__timers, uv_timer_s, tree_entry, uv__timer_cmp)
uv_timer_t* timer = container_of(w, uv_timer_t, timer_watcher);
if (!uv__is_active(timer))
return;
if (!uv__timer_repeating(timer))
uv__handle_stop(timer);
if (timer->timer_cb)
timer->timer_cb(timer, 0);
}
int uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) { int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
uv__handle_init(loop, (uv_handle_t*)timer, UV_TIMER);
loop->counters.timer_init++; loop->counters.timer_init++;
ev_init(&timer->timer_watcher, uv__timer_cb); uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
handle->timer_cb = NULL;
return 0; return 0;
} }
int uv_timer_start(uv_timer_t* timer, int uv_timer_start(uv_timer_t* handle,
uv_timer_cb cb, uv_timer_cb cb,
int64_t timeout, int64_t timeout,
int64_t repeat) { int64_t repeat) {
if (uv__is_active(timer)) assert(timeout >= 0);
uv_timer_stop(timer); assert(repeat >= 0);
timer->timer_cb = cb; if (uv__is_active(handle))
uv_timer_stop(handle);
if (repeat) handle->timer_cb = cb;
timer->flags |= UV_TIMER_REPEAT; handle->timeout = handle->loop->time + timeout;
else handle->repeat = repeat;
timer->flags &= ~UV_TIMER_REPEAT;
ev_timer_set(&timer->timer_watcher, timeout / 1000.0, repeat / 1000.0); RB_INSERT(uv__timers, &handle->loop->timer_handles, handle);
ev_timer_start(timer->loop->ev, &timer->timer_watcher); uv__handle_start(handle);
uv__handle_start(timer);
return 0; return 0;
} }
int uv_timer_stop(uv_timer_t* timer) { int uv_timer_stop(uv_timer_t* handle) {
timer->flags &= ~UV_TIMER_REPEAT; if (!uv__is_active(handle))
ev_timer_stop(timer->loop->ev, &timer->timer_watcher); return 0;
uv__handle_stop(timer);
RB_REMOVE(uv__timers, &handle->loop->timer_handles, handle);
uv__handle_stop(handle);
return 0; return 0;
} }
int uv_timer_again(uv_timer_t* timer) { int uv_timer_again(uv_timer_t* handle) {
if (!uv__is_active(timer)) { if (handle->timer_cb == NULL)
uv__set_artificial_error(timer->loop, UV_EINVAL); return uv__set_artificial_error(handle->loop, UV_EINVAL);
return -1;
if (handle->repeat) {
uv_timer_stop(handle);
uv_timer_start(handle, handle->timer_cb, handle->repeat, handle->repeat);
} }
assert(uv__timer_repeating(timer));
ev_timer_again(timer->loop->ev, &timer->timer_watcher);
return 0; return 0;
} }
void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat) { void uv_timer_set_repeat(uv_timer_t* handle, int64_t repeat) {
assert(timer->type == UV_TIMER); assert(repeat >= 0);
timer->timer_watcher.repeat = repeat / 1000.0; handle->repeat = repeat;
if (repeat)
timer->flags |= UV_TIMER_REPEAT;
else
timer->flags &= ~UV_TIMER_REPEAT;
} }
int64_t uv_timer_get_repeat(uv_timer_t* timer) { int64_t uv_timer_get_repeat(uv_timer_t* handle) {
assert(timer->type == UV_TIMER); return handle->repeat;
return (int64_t)(1000 * timer->timer_watcher.repeat); }
unsigned int uv__next_timeout(uv_loop_t* loop) {
uv_timer_t* handle;
handle = RB_MIN(uv__timers, &loop->timer_handles);
if (handle == NULL)
return (unsigned int) -1; /* block indefinitely */
if (handle->timeout <= loop->time)
return 0;
return handle->timeout - loop->time;
}
void uv__run_timers(uv_loop_t* loop) {
uv_timer_t* handle;
while ((handle = RB_MIN(uv__timers, &loop->timer_handles))) {
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle, 0);
}
} }