diff --git a/include/uv-private/ev.h b/include/uv-private/ev.h index 11e81cda..7709bc34 100644 --- a/include/uv-private/ev.h +++ b/include/uv-private/ev.h @@ -623,7 +623,7 @@ enum { }; #if EV_PROTOTYPES -void ev_run (EV_P_ int flags EV_CPP (= 0)); +void ev_run (EV_P_ ev_tstamp waittime); void ev_break (EV_P_ int how EV_CPP (= EVBREAK_ONE)); /* break out of the loop */ /* diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 69352fe1..ae9f5fb3 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -98,18 +98,21 @@ struct uv__io_s { # define UV_LOOP_PRIVATE_PLATFORM_FIELDS #endif -#define UV_LOOP_PRIVATE_FIELDS \ - /* Poll result queue */ \ - eio_channel uv_eio_channel; \ - struct ev_loop* ev; \ - /* Various thing for libeio. */ \ - uv_async_t uv_eio_want_poll_notifier; \ - uv_async_t uv_eio_done_poll_notifier; \ - uv_idle_t uv_eio_poller; \ - uv_handle_t* closing_handles; \ - ngx_queue_t prepare_handles; \ - ngx_queue_t check_handles; \ - ngx_queue_t idle_handles; \ +#define UV_LOOP_PRIVATE_FIELDS \ + /* Poll result queue */ \ + eio_channel uv_eio_channel; \ + struct ev_loop* ev; \ + /* Various thing for libeio. */ \ + uv_async_t uv_eio_want_poll_notifier; \ + uv_async_t uv_eio_done_poll_notifier; \ + uv_idle_t uv_eio_poller; \ + uv_handle_t* closing_handles; \ + ngx_queue_t prepare_handles; \ + ngx_queue_t check_handles; \ + ngx_queue_t idle_handles; \ + /* RB_HEAD(uv__timers, uv_timer_s) */ \ + struct uv__timers { struct uv_timer_s* rbh_root; } timer_handles; \ + uint64_t time; \ UV_LOOP_PRIVATE_PLATFORM_FIELDS #define UV_REQ_BUFSML_SIZE (4) @@ -186,7 +189,7 @@ struct uv__io_s { uv__io_t io_watcher; -/* UV_PREPARE */ \ +/* UV_PREPARE */ #define UV_PREPARE_PRIVATE_FIELDS \ uv_prepare_cb prepare_cb; \ ngx_queue_t queue; @@ -211,9 +214,17 @@ struct uv__io_s { /* UV_TIMER */ -#define UV_TIMER_PRIVATE_FIELDS \ - ev_timer timer_watcher; \ - uv_timer_cb timer_cb; +#define UV_TIMER_PRIVATE_FIELDS \ + /* RB_ENTRY(uv_timer_s) node; */ \ + struct { \ + struct uv_timer_s* rbe_left; \ + struct uv_timer_s* rbe_right; \ + struct uv_timer_s* rbe_parent; \ + int rbe_color; \ + } tree_entry; \ + uv_timer_cb timer_cb; \ + uint64_t timeout; \ + uint64_t repeat; #define UV_GETADDRINFO_PRIVATE_FIELDS \ uv_getaddrinfo_cb cb; \ diff --git a/src/unix/core.c b/src/unix/core.c index e4e909db..1802d2f1 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -226,22 +226,30 @@ void uv_loop_delete(uv_loop_t* loop) { } -static void uv__poll(uv_loop_t* loop, int block) { +static void uv__poll(uv_loop_t* loop, unsigned int timeout) { /* bump the loop's refcount, otherwise libev does * a zero timeout poll and we end up busy looping */ ev_ref(loop->ev); - ev_run(loop->ev, block ? EVRUN_ONCE : EVRUN_NOWAIT); + ev_run(loop->ev, timeout / 1000.); ev_unref(loop->ev); } -static int uv__should_block(uv_loop_t* loop) { - return loop->active_handles && ngx_queue_empty(&loop->idle_handles); +static unsigned int uv__poll_timeout(uv_loop_t* loop) { + if (!uv__has_active_handles(loop)) + return 0; + + if (!ngx_queue_empty(&loop->idle_handles)) + return 0; + + return uv__next_timeout(loop); } static int uv__run(uv_loop_t* loop) { + uv_update_time(loop); + uv__run_timers(loop); uv__run_idle(loop); if (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) { @@ -249,7 +257,7 @@ static int uv__run(uv_loop_t* loop) { /* Need to poll even if there are no active handles left, otherwise * uv_work_t reqs won't complete... */ - uv__poll(loop, uv__should_block(loop)); + uv__poll(loop, uv__poll_timeout(loop)); uv__run_check(loop); } @@ -283,12 +291,12 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle, void uv_update_time(uv_loop_t* loop) { - ev_now_update(loop->ev); + loop->time = uv_hrtime() / 1000000; } int64_t uv_now(uv_loop_t* loop) { - return (int64_t)(ev_now(loop->ev) * 1000); + return loop->time; } diff --git a/src/unix/ev/ev.c b/src/unix/ev/ev.c index a432bfbf..a722981b 100644 --- a/src/unix/ev/ev.c +++ b/src/unix/ev/ev.c @@ -2389,7 +2389,7 @@ time_update (EV_P_ ev_tstamp max_block) } void -ev_run (EV_P_ int flags) +ev_run (EV_P_ ev_tstamp waittime) { #if EV_FEATURE_API ++loop_depth; @@ -2426,15 +2426,6 @@ ev_run (EV_P_ int flags) } #endif -#if EV_PREPARE_ENABLE - /* queue prepare watchers (and execute them) */ - if (expect_false (preparecnt)) - { - queue_events (EV_A_ (W *)prepares, preparecnt, EV_PREPARE); - EV_INVOKE_PENDING; - } -#endif - if (expect_false (loop_done)) break; @@ -2445,90 +2436,16 @@ ev_run (EV_P_ int flags) /* update fd-related kernel structures */ fd_reify (EV_A); - /* calculate blocking time */ - { - ev_tstamp waittime = 0.; - ev_tstamp sleeptime = 0.; - - /* remember old timestamp for io_blocktime calculation */ - ev_tstamp prev_mn_now = mn_now; - - /* update time to cancel out callback processing overhead */ - time_update (EV_A_ 1e100); - - if (expect_true (!(flags & EVRUN_NOWAIT || idleall || !activecnt))) - { - waittime = MAX_BLOCKTIME; - - if (timercnt) - { - ev_tstamp to = ANHE_at (timers [HEAP0]) - mn_now + backend_fudge; - if (waittime > to) waittime = to; - } - -#if EV_PERIODIC_ENABLE - if (periodiccnt) - { - ev_tstamp to = ANHE_at (periodics [HEAP0]) - ev_rt_now + backend_fudge; - if (waittime > to) waittime = to; - } -#endif - - /* don't let timeouts decrease the waittime below timeout_blocktime */ - if (expect_false (waittime < timeout_blocktime)) - waittime = timeout_blocktime; - - /* extra check because io_blocktime is commonly 0 */ - if (expect_false (io_blocktime)) - { - sleeptime = io_blocktime - (mn_now - prev_mn_now); - - if (sleeptime > waittime - backend_fudge) - sleeptime = waittime - backend_fudge; - - if (expect_true (sleeptime > 0.)) - { - ev_sleep (sleeptime); - waittime -= sleeptime; - } - } - } - #if EV_FEATURE_API - ++loop_count; -#endif - assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */ - backend_poll (EV_A_ waittime); - assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */ - - /* update ev_rt_now, do magic */ - time_update (EV_A_ waittime + sleeptime); - } - - /* queue pending timers and reschedule them */ - timers_reify (EV_A); /* relative timers called last */ -#if EV_PERIODIC_ENABLE - periodics_reify (EV_A); /* absolute timers called first */ -#endif - -#if EV_IDLE_ENABLE - /* queue idle watchers unless other events are pending */ - idle_reify (EV_A); -#endif - -#if EV_CHECK_ENABLE - /* queue check watchers, to be executed first */ - if (expect_false (checkcnt)) - queue_events (EV_A_ (W *)checks, checkcnt, EV_CHECK); + ++loop_count; #endif + assert ((loop_done = EVBREAK_RECURSE, 1)); /* assert for side effect */ + backend_poll (EV_A_ waittime); + assert ((loop_done = EVBREAK_CANCEL, 1)); /* assert for side effect */ EV_INVOKE_PENDING; } - while (expect_true ( - activecnt - && !loop_done - && !(flags & (EVRUN_ONCE | EVRUN_NOWAIT)) - )); + while (0); if (loop_done == EVBREAK_ONE) loop_done = EVBREAK_CANCEL; diff --git a/src/unix/internal.h b/src/unix/internal.h index 0eb6d82b..8a28b2aa 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -93,8 +93,7 @@ enum { UV_STREAM_WRITABLE = 0x40, /* The stream is writable */ UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */ UV_TCP_NODELAY = 0x100, /* Disable Nagle. */ - UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */ - UV_TIMER_REPEAT = 0x100 + UV_TCP_KEEPALIVE = 0x200 /* Turn on keep-alive. */ }; inline static void uv__req_init(uv_loop_t* loop, @@ -151,8 +150,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay); /* pipe */ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); -/* poll */ -void uv__poll_close(uv_poll_t* handle); +/* timer */ +void uv__run_timers(uv_loop_t* loop); +unsigned int uv__next_timeout(uv_loop_t* loop); /* various */ void uv__async_close(uv_async_t* handle); @@ -160,6 +160,7 @@ void uv__check_close(uv_check_t* handle); void uv__fs_event_close(uv_fs_event_t* handle); void uv__idle_close(uv_idle_t* handle); void uv__pipe_close(uv_pipe_t* handle); +void uv__poll_close(uv_poll_t* handle); void uv__prepare_close(uv_prepare_t* handle); void uv__process_close(uv_process_t* handle); void uv__stream_close(uv_stream_t* handle); diff --git a/src/unix/loop.c b/src/unix/loop.c index 43f7e810..08985d63 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -37,6 +37,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { memset(loop, 0, sizeof(*loop)); RB_INIT(&loop->ares_handles); + RB_INIT(&loop->timer_handles); ngx_queue_init(&loop->active_reqs); ngx_queue_init(&loop->idle_handles); ngx_queue_init(&loop->check_handles); @@ -44,6 +45,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) { ngx_queue_init(&loop->handle_queue); loop->closing_handles = NULL; loop->channel = NULL; + loop->time = uv_hrtime() / 1000000; loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags); ev_set_userdata(loop->ev, loop); eio_channel_init(&loop->uv_eio_channel, loop); diff --git a/src/unix/timer.c b/src/unix/timer.c index e50a35ab..0d81997b 100644 --- a/src/unix/timer.c +++ b/src/unix/timer.c @@ -22,92 +22,114 @@ #include "internal.h" #include - -static int uv__timer_repeating(const uv_timer_t* timer) { - return timer->flags & UV_TIMER_REPEAT; +static int uv__timer_cmp(const uv_timer_t* a, const uv_timer_t* b) { + if (a->timeout < b->timeout) + return -1; + if (a->timeout > b->timeout) + return 1; + if (a < b) + return -1; + if (a > b) + return 1; + return 0; } -static void uv__timer_cb(EV_P_ ev_timer* w, int revents) { - uv_timer_t* timer = container_of(w, uv_timer_t, timer_watcher); - - if (!uv__is_active(timer)) - return; - - if (!uv__timer_repeating(timer)) - uv__handle_stop(timer); - - if (timer->timer_cb) - timer->timer_cb(timer, 0); -} +RB_GENERATE_STATIC(uv__timers, uv_timer_s, tree_entry, uv__timer_cmp) -int uv_timer_init(uv_loop_t* loop, uv_timer_t* timer) { - uv__handle_init(loop, (uv_handle_t*)timer, UV_TIMER); +int uv_timer_init(uv_loop_t* loop, uv_timer_t* handle) { loop->counters.timer_init++; - ev_init(&timer->timer_watcher, uv__timer_cb); + uv__handle_init(loop, (uv_handle_t*)handle, UV_TIMER); + handle->timer_cb = NULL; return 0; } -int uv_timer_start(uv_timer_t* timer, +int uv_timer_start(uv_timer_t* handle, uv_timer_cb cb, int64_t timeout, int64_t repeat) { - if (uv__is_active(timer)) - uv_timer_stop(timer); + assert(timeout >= 0); + assert(repeat >= 0); - timer->timer_cb = cb; + if (uv__is_active(handle)) + uv_timer_stop(handle); - if (repeat) - timer->flags |= UV_TIMER_REPEAT; - else - timer->flags &= ~UV_TIMER_REPEAT; + handle->timer_cb = cb; + handle->timeout = handle->loop->time + timeout; + handle->repeat = repeat; - ev_timer_set(&timer->timer_watcher, timeout / 1000.0, repeat / 1000.0); - ev_timer_start(timer->loop->ev, &timer->timer_watcher); - uv__handle_start(timer); + RB_INSERT(uv__timers, &handle->loop->timer_handles, handle); + uv__handle_start(handle); return 0; } -int uv_timer_stop(uv_timer_t* timer) { - timer->flags &= ~UV_TIMER_REPEAT; - ev_timer_stop(timer->loop->ev, &timer->timer_watcher); - uv__handle_stop(timer); +int uv_timer_stop(uv_timer_t* handle) { + if (!uv__is_active(handle)) + return 0; + + RB_REMOVE(uv__timers, &handle->loop->timer_handles, handle); + uv__handle_stop(handle); + return 0; } -int uv_timer_again(uv_timer_t* timer) { - if (!uv__is_active(timer)) { - uv__set_artificial_error(timer->loop, UV_EINVAL); - return -1; +int uv_timer_again(uv_timer_t* handle) { + if (handle->timer_cb == NULL) + return uv__set_artificial_error(handle->loop, UV_EINVAL); + + if (handle->repeat) { + uv_timer_stop(handle); + uv_timer_start(handle, handle->timer_cb, handle->repeat, handle->repeat); } - assert(uv__timer_repeating(timer)); - ev_timer_again(timer->loop->ev, &timer->timer_watcher); return 0; } -void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat) { - assert(timer->type == UV_TIMER); - timer->timer_watcher.repeat = repeat / 1000.0; - - if (repeat) - timer->flags |= UV_TIMER_REPEAT; - else - timer->flags &= ~UV_TIMER_REPEAT; +void uv_timer_set_repeat(uv_timer_t* handle, int64_t repeat) { + assert(repeat >= 0); + handle->repeat = repeat; } -int64_t uv_timer_get_repeat(uv_timer_t* timer) { - assert(timer->type == UV_TIMER); - return (int64_t)(1000 * timer->timer_watcher.repeat); +int64_t uv_timer_get_repeat(uv_timer_t* handle) { + return handle->repeat; +} + + +unsigned int uv__next_timeout(uv_loop_t* loop) { + uv_timer_t* handle; + + handle = RB_MIN(uv__timers, &loop->timer_handles); + + if (handle == NULL) + return (unsigned int) -1; /* block indefinitely */ + + if (handle->timeout <= loop->time) + return 0; + + return handle->timeout - loop->time; +} + + +void uv__run_timers(uv_loop_t* loop) { + uv_timer_t* handle; + + while ((handle = RB_MIN(uv__timers, &loop->timer_handles))) { + if (handle->timeout > loop->time) + break; + + uv_timer_stop(handle); + uv_timer_again(handle); + handle->timer_cb(handle, 0); + } }