unix: implement timers in libuv
* replace libev backed timers with a pure libuv implementation * gut ev_run() and make it take a timeout instead of flags Incidentally speeds up the loop_count_timed benchmark by about 100%.
This commit is contained in:
parent
3f37ba859f
commit
c9396dd57e
@ -623,7 +623,7 @@ enum {
|
||||
};
|
||||
|
||||
#if EV_PROTOTYPES
|
||||
void ev_run (EV_P_ int flags EV_CPP (= 0));
|
||||
void ev_run (EV_P_ ev_tstamp waittime);
|
||||
void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */
|
||||
|
||||
/*
|
||||
|
||||
@ -98,18 +98,21 @@ struct uv__io_s {
|
||||
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS
|
||||
#endif
|
||||
|
||||
#define UV_LOOP_PRIVATE_FIELDS \
|
||||
/* Poll result queue */ \
|
||||
eio_channel uv_eio_channel; \
|
||||
struct ev_loop* ev; \
|
||||
/* Various thing for libeio. */ \
|
||||
uv_async_t uv_eio_want_poll_notifier; \
|
||||
uv_async_t uv_eio_done_poll_notifier; \
|
||||
uv_idle_t uv_eio_poller; \
|
||||
uv_handle_t* closing_handles; \
|
||||
ngx_queue_t prepare_handles; \
|
||||
ngx_queue_t check_handles; \
|
||||
ngx_queue_t idle_handles; \
|
||||
#define UV_LOOP_PRIVATE_FIELDS \
|
||||
/* Poll result queue */ \
|
||||
eio_channel uv_eio_channel; \
|
||||
struct ev_loop* ev; \
|
||||
/* Various thing for libeio. */ \
|
||||
uv_async_t uv_eio_want_poll_notifier; \
|
||||
uv_async_t uv_eio_done_poll_notifier; \
|
||||
uv_idle_t uv_eio_poller; \
|
||||
uv_handle_t* closing_handles; \
|
||||
ngx_queue_t prepare_handles; \
|
||||
ngx_queue_t check_handles; \
|
||||
ngx_queue_t idle_handles; \
|
||||
/* RB_HEAD(uv__timers, uv_timer_s) */ \
|
||||
struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \
|
||||
uint64_t time; \
|
||||
UV_LOOP_PRIVATE_PLATFORM_FIELDS
|
||||
|
||||
#define UV_REQ_BUFSML_SIZE (4)
|
||||
@ -186,7 +189,7 @@ struct uv__io_s {
|
||||
uv__io_t io_watcher;
|
||||
|
||||
|
||||
/* UV_PREPARE */ \
|
||||
/* UV_PREPARE */
|
||||
#define UV_PREPARE_PRIVATE_FIELDS \
|
||||
uv_prepare_cb prepare_cb; \
|
||||
ngx_queue_t queue;
|
||||
@ -211,9 +214,17 @@ struct uv__io_s {
|
||||
|
||||
|
||||
/* UV_TIMER */
|
||||
#define UV_TIMER_PRIVATE_FIELDS \
|
||||
ev_timer timer_watcher; \
|
||||
uv_timer_cb timer_cb;
|
||||
#define UV_TIMER_PRIVATE_FIELDS \
|
||||
/* RB_ENTRY(uv_timer_s) node; */ \
|
||||
struct { \
|
||||
struct uv_timer_s* rbe_left; \
|
||||
struct uv_timer_s* rbe_right; \
|
||||
struct uv_timer_s* rbe_parent; \
|
||||
int rbe_color; \
|
||||
} tree_entry; \
|
||||
uv_timer_cb timer_cb; \
|
||||
uint64_t timeout; \
|
||||
uint64_t repeat;
|
||||
|
||||
#define UV_GETADDRINFO_PRIVATE_FIELDS \
|
||||
uv_getaddrinfo_cb cb; \
|
||||
|
||||
@ -226,22 +226,30 @@ void uv_loop_delete(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__poll(uv_loop_t* loop, int block) {
|
||||
static void uv__poll(uv_loop_t* loop, unsigned int timeout) {
|
||||
/* bump the loop's refcount, otherwise libev does
|
||||
* a zero timeout poll and we end up busy looping
|
||||
*/
|
||||
ev_ref(loop->ev);
|
||||
ev_run(loop->ev, block ? EVRUN_ONCE : EVRUN_NOWAIT);
|
||||
ev_run(loop->ev, timeout / 1000.);
|
||||
ev_unref(loop->ev);
|
||||
}
|
||||
|
||||
|
||||
static int uv__should_block(uv_loop_t* loop) {
|
||||
return loop->active_handles && ngx_queue_empty(&loop->idle_handles);
|
||||
static unsigned int uv__poll_timeout(uv_loop_t* loop) {
|
||||
if (!uv__has_active_handles(loop))
|
||||
return 0;
|
||||
|
||||
if (!ngx_queue_empty(&loop->idle_handles))
|
||||
return 0;
|
||||
|
||||
return uv__next_timeout(loop);
|
||||
}
|
||||
|
||||
|
||||
static int uv__run(uv_loop_t* loop) {
|
||||
uv_update_time(loop);
|
||||
uv__run_timers(loop);
|
||||
uv__run_idle(loop);
|
||||
|
||||
if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) {
|
||||
@ -249,7 +257,7 @@ static int uv__run(uv_loop_t* loop) {
|
||||
/* Need to poll even if there are no active handles left, otherwise
|
||||
* uv_work_t reqs won't complete...
|
||||
*/
|
||||
uv__poll(loop, uv__should_block(loop));
|
||||
uv__poll(loop, uv__poll_timeout(loop));
|
||||
uv__run_check(loop);
|
||||
}
|
||||
|
||||
@ -283,12 +291,12 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
|
||||
|
||||
|
||||
void uv_update_time(uv_loop_t* loop) {
|
||||
ev_now_update(loop->ev);
|
||||
loop->time = uv_hrtime() / 1000000;
|
||||
}
|
||||
|
||||
|
||||
int64_t uv_now(uv_loop_t* loop) {
|
||||
return (int64_t)(ev_now(loop->ev) * 1000);
|
||||
return loop->time;
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -2389,7 +2389,7 @@ time_update (EV_P_ ev_tstamp max_block)
|
||||
}
|
||||
|
||||
void
|
||||
ev_run (EV_P_ int flags)
|
||||
ev_run (EV_P_ ev_tstamp waittime)
|
||||
{
|
||||
#if EV_FEATURE_API
|
||||
++loop_depth;
|
||||
@ -2426,15 +2426,6 @@ ev_run (EV_P_ int flags)
|
||||
}
|
||||
#endif
|
||||
|
||||
#if EV_PREPARE_ENABLE
|
||||
/* queue prepare watchers (and execute them) */
|
||||
if (expect_false (preparecnt))
|
||||
{
|
||||
queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE);
|
||||
EV_INVOKE_PENDING;
|
||||
}
|
||||
#endif
|
||||
|
||||
if (expect_false (loop_done))
|
||||
break;
|
||||
|
||||
@ -2445,90 +2436,16 @@ ev_run (EV_P_ int flags)
|
||||
/* update fd-related kernel structures */
|
||||
fd_reify (EV_A);
|
||||
|
||||
/* calculate blocking time */
|
||||
{
|
||||
ev_tstamp waittime = 0.;
|
||||
ev_tstamp sleeptime = 0.;
|
||||
|
||||
/* remember old timestamp for io_blocktime calculation */
|
||||
ev_tstamp prev_mn_now = mn_now;
|
||||
|
||||
/* update time to cancel out callback processing overhead */
|
||||
time_update (EV_A_ 1e100);
|
||||
|
||||
if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt)))
|
||||
{
|
||||
waittime = MAX_BLOCKTIME;
|
||||
|
||||
if (timercnt)
|
||||
{
|
||||
ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge;
|
||||
if (waittime > to) waittime = to;
|
||||
}
|
||||
|
||||
#if EV_PERIODIC_ENABLE
|
||||
if (periodiccnt)
|
||||
{
|
||||
ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now + backend_fudge;
|
||||
if (waittime > to) waittime = to;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* don't let timeouts decrease the waittime below timeout_blocktime */
|
||||
if (expect_false (waittime < timeout_blocktime))
|
||||
waittime = timeout_blocktime;
|
||||
|
||||
/* extra check because io_blocktime is commonly 0 */
|
||||
if (expect_false (io_blocktime))
|
||||
{
|
||||
sleeptime = io_blocktime - (mn_now - prev_mn_now);
|
||||
|
||||
if (sleeptime > waittime - backend_fudge)
|
||||
sleeptime = waittime - backend_fudge;
|
||||
|
||||
if (expect_true (sleeptime > 0.))
|
||||
{
|
||||
ev_sleep (sleeptime);
|
||||
waittime -= sleeptime;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if EV_FEATURE_API
|
||||
++loop_count;
|
||||
#endif
|
||||
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
|
||||
backend_poll (EV_A_ waittime);
|
||||
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
|
||||
|
||||
/* update ev_rt_now, do magic */
|
||||
time_update (EV_A_ waittime + sleeptime);
|
||||
}
|
||||
|
||||
/* queue pending timers and reschedule them */
|
||||
timers_reify (EV_A); /* relative timers called last */
|
||||
#if EV_PERIODIC_ENABLE
|
||||
periodics_reify (EV_A); /* absolute timers called first */
|
||||
#endif
|
||||
|
||||
#if EV_IDLE_ENABLE
|
||||
/* queue idle watchers unless other events are pending */
|
||||
idle_reify (EV_A);
|
||||
#endif
|
||||
|
||||
#if EV_CHECK_ENABLE
|
||||
/* queue check watchers, to be executed first */
|
||||
if (expect_false (checkcnt))
|
||||
queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK);
|
||||
++loop_count;
|
||||
#endif
|
||||
assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */
|
||||
backend_poll (EV_A_ waittime);
|
||||
assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */
|
||||
|
||||
EV_INVOKE_PENDING;
|
||||
}
|
||||
while (expect_true (
|
||||
activecnt
|
||||
&& !loop_done
|
||||
&& !(flags & (EVRUN_ONCE | EVRUN_NOWAIT))
|
||||
));
|
||||
while (0);
|
||||
|
||||
if (loop_done == EVBREAK_ONE)
|
||||
loop_done = EVBREAK_CANCEL;
|
||||
|
||||
@ -93,8 +93,7 @@ enum {
|
||||
UV_STREAM_WRITABLE = 0x40, /* The stream is writable */
|
||||
UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */
|
||||
UV_TCP_NODELAY = 0x100, /* Disable Nagle. */
|
||||
UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */
|
||||
UV_TIMER_REPEAT = 0x100
|
||||
UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */
|
||||
};
|
||||
|
||||
inline static void uv__req_init(uv_loop_t* loop,
|
||||
@ -151,8 +150,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
|
||||
/* pipe */
|
||||
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
|
||||
|
||||
/* poll */
|
||||
void uv__poll_close(uv_poll_t* handle);
|
||||
/* timer */
|
||||
void uv__run_timers(uv_loop_t* loop);
|
||||
unsigned int uv__next_timeout(uv_loop_t* loop);
|
||||
|
||||
/* various */
|
||||
void uv__async_close(uv_async_t* handle);
|
||||
@ -160,6 +160,7 @@ void uv__check_close(uv_check_t* handle);
|
||||
void uv__fs_event_close(uv_fs_event_t* handle);
|
||||
void uv__idle_close(uv_idle_t* handle);
|
||||
void uv__pipe_close(uv_pipe_t* handle);
|
||||
void uv__poll_close(uv_poll_t* handle);
|
||||
void uv__prepare_close(uv_prepare_t* handle);
|
||||
void uv__process_close(uv_process_t* handle);
|
||||
void uv__stream_close(uv_stream_t* handle);
|
||||
|
||||
@ -37,6 +37,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
|
||||
memset(loop, 0, sizeof(*loop));
|
||||
|
||||
RB_INIT(&loop->ares_handles);
|
||||
RB_INIT(&loop->timer_handles);
|
||||
ngx_queue_init(&loop->active_reqs);
|
||||
ngx_queue_init(&loop->idle_handles);
|
||||
ngx_queue_init(&loop->check_handles);
|
||||
@ -44,6 +45,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
|
||||
ngx_queue_init(&loop->handle_queue);
|
||||
loop->closing_handles = NULL;
|
||||
loop->channel = NULL;
|
||||
loop->time = uv_hrtime() / 1000000;
|
||||
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
|
||||
ev_set_userdata(loop->ev, loop);
|
||||
eio_channel_init(&loop->uv_eio_channel, loop);
|
||||
|
||||
122
src/unix/timer.c
122
src/unix/timer.c
@ -22,92 +22,114 @@
|
||||
#include "internal.h"
|
||||
#include <assert.h>
|
||||
|
||||
|
||||
static int uv__timer_repeating(const uv_timer_t* timer) {
|
||||
return timer->flags & UV_TIMER_REPEAT;
|
||||
static int uv__timer_cmp(const uv_timer_t* a, const uv_timer_t* b) {
|
||||
if (a->timeout < b->timeout)
|
||||
return -1;
|
||||
if (a->timeout > b->timeout)
|
||||
return 1;
|
||||
if (a < b)
|
||||
return -1;
|
||||
if (a > b)
|
||||
return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static void uv__timer_cb(EV_P_ ev_timer* w, int revents) {
|
||||
uv_timer_t* timer = container_of(w, uv_timer_t, timer_watcher);
|
||||
|
||||
if (!uv__is_active(timer))
|
||||
return;
|
||||
|
||||
if (!uv__timer_repeating(timer))
|
||||
uv__handle_stop(timer);
|
||||
|
||||
if (timer->timer_cb)
|
||||
timer->timer_cb(timer, 0);
|
||||
}
|
||||
RB_GENERATE_STATIC(uv__timers, uv_timer_s, tree_entry, uv__timer_cmp)
|
||||
|
||||
|
||||
int uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) {
|
||||
uv__handle_init(loop, (uv_handle_t*)timer, UV_TIMER);
|
||||
int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) {
|
||||
loop->counters.timer_init++;
|
||||
|
||||
ev_init(&timer->timer_watcher, uv__timer_cb);
|
||||
uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER);
|
||||
handle->timer_cb = NULL;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_timer_start(uv_timer_t* timer,
|
||||
int uv_timer_start(uv_timer_t* handle,
|
||||
uv_timer_cb cb,
|
||||
int64_t timeout,
|
||||
int64_t repeat) {
|
||||
if (uv__is_active(timer))
|
||||
uv_timer_stop(timer);
|
||||
assert(timeout >= 0);
|
||||
assert(repeat >= 0);
|
||||
|
||||
timer->timer_cb = cb;
|
||||
if (uv__is_active(handle))
|
||||
uv_timer_stop(handle);
|
||||
|
||||
if (repeat)
|
||||
timer->flags |= UV_TIMER_REPEAT;
|
||||
else
|
||||
timer->flags &= ~UV_TIMER_REPEAT;
|
||||
handle->timer_cb = cb;
|
||||
handle->timeout = handle->loop->time + timeout;
|
||||
handle->repeat = repeat;
|
||||
|
||||
ev_timer_set(&timer->timer_watcher, timeout / 1000.0, repeat / 1000.0);
|
||||
ev_timer_start(timer->loop->ev, &timer->timer_watcher);
|
||||
uv__handle_start(timer);
|
||||
RB_INSERT(uv__timers, &handle->loop->timer_handles, handle);
|
||||
uv__handle_start(handle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_timer_stop(uv_timer_t* timer) {
|
||||
timer->flags &= ~UV_TIMER_REPEAT;
|
||||
ev_timer_stop(timer->loop->ev, &timer->timer_watcher);
|
||||
uv__handle_stop(timer);
|
||||
int uv_timer_stop(uv_timer_t* handle) {
|
||||
if (!uv__is_active(handle))
|
||||
return 0;
|
||||
|
||||
RB_REMOVE(uv__timers, &handle->loop->timer_handles, handle);
|
||||
uv__handle_stop(handle);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_timer_again(uv_timer_t* timer) {
|
||||
if (!uv__is_active(timer)) {
|
||||
uv__set_artificial_error(timer->loop, UV_EINVAL);
|
||||
return -1;
|
||||
int uv_timer_again(uv_timer_t* handle) {
|
||||
if (handle->timer_cb == NULL)
|
||||
return uv__set_artificial_error(handle->loop, UV_EINVAL);
|
||||
|
||||
if (handle->repeat) {
|
||||
uv_timer_stop(handle);
|
||||
uv_timer_start(handle, handle->timer_cb, handle->repeat, handle->repeat);
|
||||
}
|
||||
|
||||
assert(uv__timer_repeating(timer));
|
||||
ev_timer_again(timer->loop->ev, &timer->timer_watcher);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat) {
|
||||
assert(timer->type == UV_TIMER);
|
||||
timer->timer_watcher.repeat = repeat / 1000.0;
|
||||
|
||||
if (repeat)
|
||||
timer->flags |= UV_TIMER_REPEAT;
|
||||
else
|
||||
timer->flags &= ~UV_TIMER_REPEAT;
|
||||
void uv_timer_set_repeat(uv_timer_t* handle, int64_t repeat) {
|
||||
assert(repeat >= 0);
|
||||
handle->repeat = repeat;
|
||||
}
|
||||
|
||||
|
||||
int64_t uv_timer_get_repeat(uv_timer_t* timer) {
|
||||
assert(timer->type == UV_TIMER);
|
||||
return (int64_t)(1000 * timer->timer_watcher.repeat);
|
||||
int64_t uv_timer_get_repeat(uv_timer_t* handle) {
|
||||
return handle->repeat;
|
||||
}
|
||||
|
||||
|
||||
unsigned int uv__next_timeout(uv_loop_t* loop) {
|
||||
uv_timer_t* handle;
|
||||
|
||||
handle = RB_MIN(uv__timers, &loop->timer_handles);
|
||||
|
||||
if (handle == NULL)
|
||||
return (unsigned int) -1; /* block indefinitely */
|
||||
|
||||
if (handle->timeout <= loop->time)
|
||||
return 0;
|
||||
|
||||
return handle->timeout - loop->time;
|
||||
}
|
||||
|
||||
|
||||
void uv__run_timers(uv_loop_t* loop) {
|
||||
uv_timer_t* handle;
|
||||
|
||||
while ((handle = RB_MIN(uv__timers, &loop->timer_handles))) {
|
||||
if (handle->timeout > loop->time)
|
||||
break;
|
||||
|
||||
uv_timer_stop(handle);
|
||||
uv_timer_again(handle);
|
||||
handle->timer_cb(handle, 0);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user