unix: implement timers in libuv

* replace libev backed timers with a pure libuv implementation
* gut ev_run() and make it take a timeout instead of flags

Incidentally speeds up the loop_count_timed benchmark by about 100%.
This commit is contained in:
Ben Noordhuis 2012-05-31 00:47:04 +02:00
parent 3f37ba859f
commit c9396dd57e
7 changed files with 128 additions and 167 deletions

View File

@ -623,7 +623,7 @@ enum {
};
#if EV_PROTOTYPES
void ev_run (EV_P_ int flags EV_CPP (= 0));
void ev_run (EV_P_ ev_tstamp waittime);
void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */
/*

View File

@ -110,6 +110,9 @@ struct uv__io_s {
ngx_queue_t prepare_handles; \
ngx_queue_t check_handles; \
ngx_queue_t idle_handles; \
/* RB_HEAD(uv__timers, uv_timer_s) */ \
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
uint64_t time; \
UV_LOOP_PRIVATE_PLATFORM_FIELDS
#define UV_REQ_BUFSML_SIZE (4)
@ -186,7 +189,7 @@ struct uv__io_s {
uv__io_t io_watcher;
/* UV_PREPARE */ \
/* UV_PREPARE */
#define UV_PREPARE_PRIVATE_FIELDS \
uv_prepare_cb prepare_cb; \
ngx_queue_t queue;
@ -212,8 +215,16 @@ struct uv__io_s {
/* UV_TIMER */
#define UV_TIMER_PRIVATE_FIELDS \
ev_timer timer_watcher; \
uv_timer_cb timer_cb;
/* RB_ENTRY(uv_timer_s) node; */ \
struct { \
struct uv_timer_s* rbe_left; \
struct uv_timer_s* rbe_right; \
struct uv_timer_s* rbe_parent; \
int rbe_color; \
} tree_entry; \
uv_timer_cb timer_cb; \
uint64_t timeout; \
uint64_t repeat;
#define UV_GETADDRINFO_PRIVATE_FIELDS \
uv_getaddrinfo_cb cb; \

View File

@ -226,22 +226,30 @@ void uv_loop_delete(uv_loop_t* loop) {
}
static void uv__poll(uv_loop_t* loop, int block) {
static void uv__poll(uv_loop_t* loop, unsigned int timeout) {
/* bump the loop's refcount, otherwise libev does
* a zero timeout poll and we end up busy looping
*/
ev_ref(loop->ev);
ev_run(loop->ev, block ? EVRUN_ONCE : EVRUN_NOWAIT);
ev_run(loop->ev, timeout / 1000.);
ev_unref(loop->ev);
}
static int uv__should_block(uv_loop_t* loop) {
return loop->active_handles && ngx_queue_empty(&loop->idle_handles);
static unsigned int uv__poll_timeout(uv_loop_t* loop) {
if (!uv__has_active_handles(loop))
return 0;
if (!ngx_queue_empty(&loop->idle_handles))
return 0;
return uv__next_timeout(loop);
}
static int uv__run(uv_loop_t* loop) {
uv_update_time(loop);
uv__run_timers(loop);
uv__run_idle(loop);
if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
@ -249,7 +257,7 @@ static int uv__run(uv_loop_t* loop) {
/* Need to poll even if there are no active handles left, otherwise
* uv_work_t reqs won't complete...
*/
uv__poll(loop, uv__should_block(loop));
uv__poll(loop, uv__poll_timeout(loop));
uv__run_check(loop);
}
@ -283,12 +291,12 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
void uv_update_time(uv_loop_t* loop) {
ev_now_update(loop->ev);
loop->time = uv_hrtime() / 1000000;
}
int64_t uv_now(uv_loop_t* loop) {
return (int64_t)(ev_now(loop->ev) * 1000);
return loop->time;
}

View File

@ -2389,7 +2389,7 @@ time_update (EV_P_ ev_tstamp max_block)
}
void
ev_run (EV_P_ int flags)
ev_run (EV_P_ ev_tstamp waittime)
{
#if EV_FEATURE_API
++loop_depth;
@ -2426,15 +2426,6 @@ ev_run (EV_P_ int flags)
}
#endif
#if EV_PREPARE_ENABLE
/* queue prepare watchers (and execute them) */
if (expect_false (preparecnt))
{
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
EV_INVOKE_PENDING;
}
#endif
if (expect_false (loop_done))
break;
@ -2445,55 +2436,6 @@ ev_run (EV_P_ int flags)
/* update fd-related kernel structures */
fd_reify (EV_A);
/* calculate blocking time */
{
ev_tstamp waittime = 0.;
ev_tstamp sleeptime = 0.;
/* remember old timestamp for io_blocktime calculation */
ev_tstamp prev_mn_now = mn_now;
/* update time to cancel out callback processing overhead */
time_update (EV_A_ 1e100);
if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt)))
{
waittime = MAX_BLOCKTIME;
if (timercnt)
{
ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge;
if (waittime > to) waittime = to;
}
#if EV_PERIODIC_ENABLE
if (periodiccnt)
{
ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now + backend_fudge;
if (waittime > to) waittime = to;
}
#endif
/* don't let timeouts decrease the waittime below timeout_blocktime */
if (expect_false (waittime < timeout_blocktime))
waittime = timeout_blocktime;
/* extra check because io_blocktime is commonly 0 */
if (expect_false (io_blocktime))
{
sleeptime = io_blocktime - (mn_now - prev_mn_now);
if (sleeptime > waittime - backend_fudge)
sleeptime = waittime - backend_fudge;
if (expect_true (sleeptime > 0.))
{
ev_sleep (sleeptime);
waittime -= sleeptime;
}
}
}
#if EV_FEATURE_API
++loop_count;
#endif
@ -2501,34 +2443,9 @@ ev_run (EV_P_ int flags)
backend_poll (EV_A_ waittime);
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
/* update ev_rt_now, do magic */
time_update (EV_A_ waittime + sleeptime);
}
/* queue pending timers and reschedule them */
timers_reify (EV_A); /* relative timers called last */
#if EV_PERIODIC_ENABLE
periodics_reify (EV_A); /* absolute timers called first */
#endif
#if EV_IDLE_ENABLE
/* queue idle watchers unless other events are pending */
idle_reify (EV_A);
#endif
#if EV_CHECK_ENABLE
/* queue check watchers, to be executed first */
if (expect_false (checkcnt))
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
#endif
EV_INVOKE_PENDING;
}
while (expect_true (
activecnt
&& !loop_done
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
));
while (0);
if (loop_done == EVBREAK_ONE)
loop_done = EVBREAK_CANCEL;

View File

@ -93,8 +93,7 @@ enum {
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */
UV_TIMER_REPEAT = 0x100
UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */
};
inline static void uv__req_init(uv_loop_t* loop,
@ -151,8 +150,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
/* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
/* poll */
void uv__poll_close(uv_poll_t* handle);
/* timer */
void uv__run_timers(uv_loop_t* loop);
unsigned int uv__next_timeout(uv_loop_t* loop);
/* various */
void uv__async_close(uv_async_t* handle);
@ -160,6 +160,7 @@ void uv__check_close(uv_check_t* handle);
void uv__fs_event_close(uv_fs_event_t* handle);
void uv__idle_close(uv_idle_t* handle);
void uv__pipe_close(uv_pipe_t* handle);
void uv__poll_close(uv_poll_t* handle);
void uv__prepare_close(uv_prepare_t* handle);
void uv__process_close(uv_process_t* handle);
void uv__stream_close(uv_stream_t* handle);

View File

@ -37,6 +37,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
memset(loop, 0, sizeof(*loop));
RB_INIT(&loop->ares_handles);
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->active_reqs);
ngx_queue_init(&loop->idle_handles);
ngx_queue_init(&loop->check_handles);
@ -44,6 +45,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->handle_queue);
loop->closing_handles = NULL;
loop->channel = NULL;
loop->time = uv_hrtime() / 1000000;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);
eio_channel_init(&loop->uv_eio_channel, loop);

View File

@ -22,92 +22,114 @@
#include "internal.h"
#include <assert.h>
static int uv__timer_repeating(const uv_timer_t* timer) {
return timer->flags & UV_TIMER_REPEAT;
static int uv__timer_cmp(const uv_timer_t* a, const uv_timer_t* b) {
if (a->timeout < b->timeout)
return -1;
if (a->timeout > b->timeout)
return 1;
if (a < b)
return -1;
if (a > b)
return 1;
return 0;
}
static void uv__timer_cb(EV_P_ ev_timer* w, int revents) {
uv_timer_t* timer = container_of(w, uv_timer_t, timer_watcher);
if (!uv__is_active(timer))
return;
if (!uv__timer_repeating(timer))
uv__handle_stop(timer);
if (timer->timer_cb)
timer->timer_cb(timer, 0);
}
RB_GENERATE_STATIC(uv__timers, uv_timer_s, tree_entry, uv__timer_cmp)
int uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) {
uv__handle_init(loop, (uv_handle_t*)timer, UV_TIMER);
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
loop->counters.timer_init++;
ev_init(&timer->timer_watcher, uv__timer_cb);
uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
handle->timer_cb = NULL;
return 0;
}
int uv_timer_start(uv_timer_t* timer,
int uv_timer_start(uv_timer_t* handle,
uv_timer_cb cb,
int64_t timeout,
int64_t repeat) {
if (uv__is_active(timer))
uv_timer_stop(timer);
assert(timeout >= 0);
assert(repeat >= 0);
timer->timer_cb = cb;
if (uv__is_active(handle))
uv_timer_stop(handle);
if (repeat)
timer->flags |= UV_TIMER_REPEAT;
else
timer->flags &= ~UV_TIMER_REPEAT;
handle->timer_cb = cb;
handle->timeout = handle->loop->time + timeout;
handle->repeat = repeat;
ev_timer_set(&timer->timer_watcher, timeout / 1000.0, repeat / 1000.0);
ev_timer_start(timer->loop->ev, &timer->timer_watcher);
uv__handle_start(timer);
RB_INSERT(uv__timers, &handle->loop->timer_handles, handle);
uv__handle_start(handle);
return 0;
}
int uv_timer_stop(uv_timer_t* timer) {
timer->flags &= ~UV_TIMER_REPEAT;
ev_timer_stop(timer->loop->ev, &timer->timer_watcher);
uv__handle_stop(timer);
int uv_timer_stop(uv_timer_t* handle) {
if (!uv__is_active(handle))
return 0;
RB_REMOVE(uv__timers, &handle->loop->timer_handles, handle);
uv__handle_stop(handle);
return 0;
}
int uv_timer_again(uv_timer_t* timer) {
if (!uv__is_active(timer)) {
uv__set_artificial_error(timer->loop, UV_EINVAL);
return -1;
int uv_timer_again(uv_timer_t* handle) {
if (handle->timer_cb == NULL)
return uv__set_artificial_error(handle->loop, UV_EINVAL);
if (handle->repeat) {
uv_timer_stop(handle);
uv_timer_start(handle, handle->timer_cb, handle->repeat, handle->repeat);
}
assert(uv__timer_repeating(timer));
ev_timer_again(timer->loop->ev, &timer->timer_watcher);
return 0;
}
void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat) {
assert(timer->type == UV_TIMER);
timer->timer_watcher.repeat = repeat / 1000.0;
if (repeat)
timer->flags |= UV_TIMER_REPEAT;
else
timer->flags &= ~UV_TIMER_REPEAT;
void uv_timer_set_repeat(uv_timer_t* handle, int64_t repeat) {
assert(repeat >= 0);
handle->repeat = repeat;
}
int64_t uv_timer_get_repeat(uv_timer_t* timer) {
assert(timer->type == UV_TIMER);
return (int64_t)(1000 * timer->timer_watcher.repeat);
int64_t uv_timer_get_repeat(uv_timer_t* handle) {
return handle->repeat;
}
unsigned int uv__next_timeout(uv_loop_t* loop) {
uv_timer_t* handle;
handle = RB_MIN(uv__timers, &loop->timer_handles);
if (handle == NULL)
return (unsigned int) -1; /* block indefinitely */
if (handle->timeout <= loop->time)
return 0;
return handle->timeout - loop->time;
}
void uv__run_timers(uv_loop_t* loop) {
uv_timer_t* handle;
while ((handle = RB_MIN(uv__timers, &loop->timer_handles))) {
if (handle->timeout > loop->time)
break;
uv_timer_stop(handle);
uv_timer_again(handle);
handle->timer_cb(handle, 0);
}
}