diff --git a/src/threadpool.c b/src/threadpool.c index 42322ebb..51962bf0 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -311,12 +311,15 @@ void uv__work_done(uv_async_t* handle) { QUEUE* q; QUEUE wq; int err; + int nevents; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); + nevents = 0; + while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); @@ -324,6 +327,20 @@ void uv__work_done(uv_async_t* handle) { w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; w->done(w, err); + nevents++; + } + + /* This check accomplishes 2 things: + * 1. Even if the queue was empty, the call to uv__work_done() should count + * as an event. Which will have been added by the event loop when + * calling this callback. + * 2. Prevents accidental wrap around in case nevents == 0 events == 0. + */ + if (nevents > 1) { + /* Subtract 1 to counter the call to uv__work_done(). */ + uv__metrics_inc_events(loop, nevents - 1); + if (uv__get_internal_fields(loop)->current_timeout == 0) + uv__metrics_inc_events_waiting(loop, nevents - 1); } } diff --git a/src/unix/aix.c b/src/unix/aix.c index b855282e..f1afbed4 100644 --- a/src/unix/aix.c +++ b/src/unix/aix.c @@ -131,6 +131,7 @@ int uv__io_check_fd(uv_loop_t* loop, int fd) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; struct pollfd events[1024]; struct pollfd pqry; struct pollfd* pe; @@ -154,6 +155,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + while (!QUEUE_EMPTY(&loop->watcher_queue)) { q = QUEUE_HEAD(&loop->watcher_queue); QUEUE_REMOVE(q); @@ -217,7 +220,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { base = loop->time; count = 48; /* Benchmarks suggest this gives the best throughput. */ - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -232,6 +235,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = pollset_poll(loop->backend_fd, events, ARRAY_SIZE(events), diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index deb486ba..82916d65 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -127,6 +127,7 @@ static void uv__kqueue_delete(int kqfd, const struct kevent *ev) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; struct kevent events[1024]; struct kevent* ev; struct timespec spec; @@ -155,6 +156,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); nevents = 0; while (!QUEUE_EMPTY(&loop->watcher_queue)) { @@ -222,7 +224,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { base = loop->time; count = 48; /* Benchmarks suggest this gives the best throughput. */ - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -245,6 +247,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (pset != NULL) pthread_sigmask(SIG_BLOCK, pset, NULL); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = kevent(loop->backend_fd, events, nevents, diff --git a/src/unix/linux.c b/src/unix/linux.c index 59e22b74..27ed37b1 100644 --- a/src/unix/linux.c +++ b/src/unix/linux.c @@ -834,12 +834,14 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) { uint32_t tail; uint32_t mask; uint32_t i; + int nevents; head = *iou->cqhead; tail = atomic_load_explicit((_Atomic uint32_t*) iou->cqtail, memory_order_acquire); mask = iou->cqmask; cqe = iou->cqe; + nevents = 0; for (i = head; i != tail; i++) { e = &cqe[i & mask]; @@ -865,13 +867,16 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) { uv__metrics_update_idle_time(loop); req->cb(req); + nevents++; } atomic_store_explicit((_Atomic uint32_t*) iou->cqhead, tail, memory_order_release); - uv__metrics_inc_events(loop, 1); + uv__metrics_inc_events(loop, nevents); + if (uv__get_internal_fields(loop)->current_timeout == 0) + uv__metrics_inc_events_waiting(loop, nevents); } @@ -970,6 +975,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = epoll_pwait(loop->backend_fd, events, ARRAY_SIZE(events), diff --git a/src/unix/os390.c b/src/unix/os390.c index 3954b2c2..a87c2d77 100644 --- a/src/unix/os390.c +++ b/src/unix/os390.c @@ -808,6 +808,7 @@ static int os390_message_queue_handler(uv__os390_epoll* ep) { void uv__io_poll(uv_loop_t* loop, int timeout) { static const int max_safe_timeout = 1789569; + uv__loop_internal_fields_t* lfields; struct epoll_event events[1024]; struct epoll_event* pe; struct epoll_event e; @@ -830,6 +831,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + while (!QUEUE_EMPTY(&loop->watcher_queue)) { uv_stream_t* stream; @@ -877,7 +880,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int nevents = 0; have_signals = 0; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -896,6 +899,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout) timeout = max_safe_timeout; + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = epoll_wait(loop->ep, events, ARRAY_SIZE(events), timeout); diff --git a/src/unix/posix-poll.c b/src/unix/posix-poll.c index 711780ec..7e7de868 100644 --- a/src/unix/posix-poll.c +++ b/src/unix/posix-poll.c @@ -132,6 +132,7 @@ static void uv__pollfds_del(uv_loop_t* loop, int fd) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; sigset_t* pset; sigset_t set; uint64_t time_base; @@ -152,6 +153,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + /* Take queued watchers and add their fds to our poll fds array. */ while (!QUEUE_EMPTY(&loop->watcher_queue)) { q = QUEUE_HEAD(&loop->watcher_queue); @@ -179,7 +182,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { assert(timeout >= -1); time_base = loop->time; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -198,6 +201,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + if (pset != NULL) if (pthread_sigmask(SIG_BLOCK, pset, NULL)) abort(); diff --git a/src/uv-common.h b/src/uv-common.h index df874f8a..b0d9c747 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -421,6 +421,7 @@ struct uv__iou { struct uv__loop_internal_fields_s { unsigned int flags; uv__loop_metrics_t loop_metrics; + int current_timeout; #ifdef __linux__ struct uv__iou iou; #endif /* __linux__ */ diff --git a/src/win/core.c b/src/win/core.c index a52af5a1..426edb18 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -424,6 +424,7 @@ int uv_backend_timeout(const uv_loop_t* loop) { static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { + uv__loop_internal_fields_t* lfields; DWORD bytes; ULONG_PTR key; OVERLAPPED* overlapped; @@ -433,9 +434,10 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { uint64_t user_timeout; int reset_timeout; + lfields = uv__get_internal_fields(loop); timeout_time = loop->time + timeout; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -450,6 +452,12 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + GetQueuedCompletionStatus(loop->iocp, &bytes, &key, @@ -507,6 +515,7 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { static void uv__poll(uv_loop_t* loop, DWORD timeout) { + uv__loop_internal_fields_t* lfields; BOOL success; uv_req_t* req; OVERLAPPED_ENTRY overlappeds[128]; @@ -518,9 +527,10 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) { uint64_t actual_timeout; int reset_timeout; + lfields = uv__get_internal_fields(loop); timeout_time = loop->time + timeout; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -537,6 +547,12 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + success = pGetQueuedCompletionStatusEx(loop->iocp, overlappeds, ARRAY_SIZE(overlappeds), diff --git a/test/test-list.h b/test/test-list.h index 13c96d1d..e6332ed4 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -556,6 +556,7 @@ TEST_DECLARE (utf8_decode1_overrun) TEST_DECLARE (uname) TEST_DECLARE (metrics_info_check) +TEST_DECLARE (metrics_pool_events) TEST_DECLARE (metrics_idle_time) TEST_DECLARE (metrics_idle_time_thread) TEST_DECLARE (metrics_idle_time_zero) @@ -1192,6 +1193,7 @@ TASK_LIST_START TEST_HELPER (readable_on_eof, tcp4_echo_server) TEST_ENTRY (metrics_info_check) + TEST_ENTRY (metrics_pool_events) TEST_ENTRY (metrics_idle_time) TEST_ENTRY (metrics_idle_time_thread) TEST_ENTRY (metrics_idle_time_zero) diff --git a/test/test-metrics.c b/test/test-metrics.c index c6fa432a..d532f4ef 100644 --- a/test/test-metrics.c +++ b/test/test-metrics.c @@ -34,6 +34,7 @@ typedef struct { static uint64_t last_events_count; static char test_buf[] = "test-buffer\n"; static fs_reqs_t fs_reqs; +static int pool_events_counter; static void timer_spin_cb(uv_timer_t* handle) { @@ -239,3 +240,153 @@ TEST_IMPL(metrics_info_check) { MAKE_VALGRIND_HAPPY(uv_default_loop()); return 0; } + + +static void fs_prepare_cb(uv_prepare_t* handle) { + uv_metrics_t metrics; + + ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics)); + + if (pool_events_counter == 1) + ASSERT_EQ(metrics.events, metrics.events_waiting); + + if (pool_events_counter < 7) + return; + + uv_prepare_stop(handle); + pool_events_counter = -42; +} + + +static void fs_stat_cb(uv_fs_t* req) { + uv_fs_req_cleanup(req); + pool_events_counter++; +} + + +static void fs_work_cb(uv_work_t* req) { +} + + +static void fs_after_work_cb(uv_work_t* req, int status) { + free(req); + pool_events_counter++; +} + + +static void fs_write_cb(uv_fs_t* req) { + uv_work_t* work1 = malloc(sizeof(*work1)); + uv_work_t* work2 = malloc(sizeof(*work2)); + pool_events_counter++; + + uv_fs_req_cleanup(req); + + ASSERT_OK(uv_queue_work(uv_default_loop(), + work1, + fs_work_cb, + fs_after_work_cb)); + ASSERT_OK(uv_queue_work(uv_default_loop(), + work2, + fs_work_cb, + fs_after_work_cb)); +} + + +static void fs_random_cb(uv_random_t* req, int status, void* buf, size_t len) { + pool_events_counter++; +} + + +static void fs_addrinfo_cb(uv_getaddrinfo_t* req, + int status, + struct addrinfo* res) { + uv_freeaddrinfo(req->addrinfo); + pool_events_counter++; +} + + +TEST_IMPL(metrics_pool_events) { + uv_buf_t iov; + uv_fs_t open_req; + uv_fs_t stat1_req; + uv_fs_t stat2_req; + uv_fs_t unlink_req; + uv_fs_t write_req; + uv_getaddrinfo_t addrinfo_req; + uv_metrics_t metrics; + uv_prepare_t prepare; + uv_random_t random_req; + int fd; + char rdata; + + ASSERT_OK(uv_loop_configure(uv_default_loop(), UV_METRICS_IDLE_TIME)); + + uv_fs_unlink(NULL, &unlink_req, "test_file", NULL); + uv_fs_req_cleanup(&unlink_req); + + ASSERT_OK(uv_prepare_init(uv_default_loop(), &prepare)); + ASSERT_OK(uv_prepare_start(&prepare, fs_prepare_cb)); + + pool_events_counter = 0; + fd = uv_fs_open(NULL, + &open_req, + "test_file", + O_WRONLY | O_CREAT, + S_IRUSR | S_IWUSR, + NULL); + ASSERT_GT(fd, 0); + uv_fs_req_cleanup(&open_req); + + iov = uv_buf_init(test_buf, sizeof(test_buf)); + ASSERT_OK(uv_fs_write(uv_default_loop(), + &write_req, + fd, + &iov, + 1, + 0, + fs_write_cb)); + ASSERT_OK(uv_fs_stat(uv_default_loop(), + &stat1_req, + "test_file", + fs_stat_cb)); + ASSERT_OK(uv_fs_stat(uv_default_loop(), + &stat2_req, + "test_file", + fs_stat_cb)); + ASSERT_OK(uv_random(uv_default_loop(), + &random_req, + &rdata, + 1, + 0, + fs_random_cb)); + ASSERT_OK(uv_getaddrinfo(uv_default_loop(), + &addrinfo_req, + fs_addrinfo_cb, + "example.invalid", + NULL, + NULL)); + + /* Sleep for a moment to hopefully force the events to complete before + * entering the event loop. */ + uv_sleep(100); + + ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + + ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics)); + /* It's possible for uv__work_done() to execute one extra time even though the + * QUEUE has already been cleared out. This has to do with the way we use an + * uv_async to tell the event loop thread to process the worker pool QUEUE. */ + ASSERT_GE(metrics.events, 7); + /* It's possible one of the other events also got stuck in the event queue, so + * check GE instead of EQ. Reason for 4 instead of 5 is because the call to + * uv_getaddrinfo() is racey and slow. So can't guarantee that it'll always + * execute before sleep completes. */ + ASSERT_GE(metrics.events_waiting, 4); + ASSERT_EQ(pool_events_counter, -42); + + uv_fs_unlink(NULL, &unlink_req, "test_file", NULL); + uv_fs_req_cleanup(&unlink_req); + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +}