From e02642cf3b768b2c58a41f97fa38507e032ae415 Mon Sep 17 00:00:00 2001 From: Trevor Norris Date: Mon, 17 Apr 2023 12:48:39 -0600 Subject: [PATCH] src: fix events/events_waiting metrics counter (#3957) The worker pool calls all callbacks locally within the queue. So the value of nevents doesn't properly reflect that case. Increase the number of events directly from the worker pool's callback to correct this. In order to properly determine if the events_waiting counter needs to be incremented, store the timeout value at the time the event provider was called. --- src/threadpool.c | 17 +++++ src/unix/aix.c | 11 ++- src/unix/kqueue.c | 10 ++- src/unix/linux.c | 13 +++- src/unix/os390.c | 11 ++- src/unix/posix-poll.c | 11 ++- src/uv-common.h | 1 + src/win/core.c | 20 +++++- test/test-list.h | 2 + test/test-metrics.c | 151 ++++++++++++++++++++++++++++++++++++++++++ 10 files changed, 240 insertions(+), 7 deletions(-) diff --git a/src/threadpool.c b/src/threadpool.c index 42322ebb..51962bf0 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -311,12 +311,15 @@ void uv__work_done(uv_async_t* handle) { QUEUE* q; QUEUE wq; int err; + int nevents; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); + nevents = 0; + while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); @@ -324,6 +327,20 @@ void uv__work_done(uv_async_t* handle) { w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; w->done(w, err); + nevents++; + } + + /* This check accomplishes 2 things: + * 1. Even if the queue was empty, the call to uv__work_done() should count + * as an event. Which will have been added by the event loop when + * calling this callback. + * 2. Prevents accidental wrap around in case nevents == 0 events == 0. + */ + if (nevents > 1) { + /* Subtract 1 to counter the call to uv__work_done(). */ + uv__metrics_inc_events(loop, nevents - 1); + if (uv__get_internal_fields(loop)->current_timeout == 0) + uv__metrics_inc_events_waiting(loop, nevents - 1); } } diff --git a/src/unix/aix.c b/src/unix/aix.c index b855282e..f1afbed4 100644 --- a/src/unix/aix.c +++ b/src/unix/aix.c @@ -131,6 +131,7 @@ int uv__io_check_fd(uv_loop_t* loop, int fd) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; struct pollfd events[1024]; struct pollfd pqry; struct pollfd* pe; @@ -154,6 +155,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + while (!QUEUE_EMPTY(&loop->watcher_queue)) { q = QUEUE_HEAD(&loop->watcher_queue); QUEUE_REMOVE(q); @@ -217,7 +220,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { base = loop->time; count = 48; /* Benchmarks suggest this gives the best throughput. */ - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -232,6 +235,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = pollset_poll(loop->backend_fd, events, ARRAY_SIZE(events), diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index deb486ba..82916d65 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -127,6 +127,7 @@ static void uv__kqueue_delete(int kqfd, const struct kevent *ev) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; struct kevent events[1024]; struct kevent* ev; struct timespec spec; @@ -155,6 +156,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); nevents = 0; while (!QUEUE_EMPTY(&loop->watcher_queue)) { @@ -222,7 +224,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { base = loop->time; count = 48; /* Benchmarks suggest this gives the best throughput. */ - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -245,6 +247,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (pset != NULL) pthread_sigmask(SIG_BLOCK, pset, NULL); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = kevent(loop->backend_fd, events, nevents, diff --git a/src/unix/linux.c b/src/unix/linux.c index 59e22b74..27ed37b1 100644 --- a/src/unix/linux.c +++ b/src/unix/linux.c @@ -834,12 +834,14 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) { uint32_t tail; uint32_t mask; uint32_t i; + int nevents; head = *iou->cqhead; tail = atomic_load_explicit((_Atomic uint32_t*) iou->cqtail, memory_order_acquire); mask = iou->cqmask; cqe = iou->cqe; + nevents = 0; for (i = head; i != tail; i++) { e = &cqe[i & mask]; @@ -865,13 +867,16 @@ static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) { uv__metrics_update_idle_time(loop); req->cb(req); + nevents++; } atomic_store_explicit((_Atomic uint32_t*) iou->cqhead, tail, memory_order_release); - uv__metrics_inc_events(loop, 1); + uv__metrics_inc_events(loop, nevents); + if (uv__get_internal_fields(loop)->current_timeout == 0) + uv__metrics_inc_events_waiting(loop, nevents); } @@ -970,6 +975,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = epoll_pwait(loop->backend_fd, events, ARRAY_SIZE(events), diff --git a/src/unix/os390.c b/src/unix/os390.c index 3954b2c2..a87c2d77 100644 --- a/src/unix/os390.c +++ b/src/unix/os390.c @@ -808,6 +808,7 @@ static int os390_message_queue_handler(uv__os390_epoll* ep) { void uv__io_poll(uv_loop_t* loop, int timeout) { static const int max_safe_timeout = 1789569; + uv__loop_internal_fields_t* lfields; struct epoll_event events[1024]; struct epoll_event* pe; struct epoll_event e; @@ -830,6 +831,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + while (!QUEUE_EMPTY(&loop->watcher_queue)) { uv_stream_t* stream; @@ -877,7 +880,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int nevents = 0; have_signals = 0; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -896,6 +899,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (sizeof(int32_t) == sizeof(long) && timeout >= max_safe_timeout) timeout = max_safe_timeout; + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + nfds = epoll_wait(loop->ep, events, ARRAY_SIZE(events), timeout); diff --git a/src/unix/posix-poll.c b/src/unix/posix-poll.c index 711780ec..7e7de868 100644 --- a/src/unix/posix-poll.c +++ b/src/unix/posix-poll.c @@ -132,6 +132,7 @@ static void uv__pollfds_del(uv_loop_t* loop, int fd) { void uv__io_poll(uv_loop_t* loop, int timeout) { + uv__loop_internal_fields_t* lfields; sigset_t* pset; sigset_t set; uint64_t time_base; @@ -152,6 +153,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { return; } + lfields = uv__get_internal_fields(loop); + /* Take queued watchers and add their fds to our poll fds array. */ while (!QUEUE_EMPTY(&loop->watcher_queue)) { q = QUEUE_HEAD(&loop->watcher_queue); @@ -179,7 +182,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { assert(timeout >= -1); time_base = loop->time; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -198,6 +201,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + if (pset != NULL) if (pthread_sigmask(SIG_BLOCK, pset, NULL)) abort(); diff --git a/src/uv-common.h b/src/uv-common.h index df874f8a..b0d9c747 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -421,6 +421,7 @@ struct uv__iou { struct uv__loop_internal_fields_s { unsigned int flags; uv__loop_metrics_t loop_metrics; + int current_timeout; #ifdef __linux__ struct uv__iou iou; #endif /* __linux__ */ diff --git a/src/win/core.c b/src/win/core.c index a52af5a1..426edb18 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -424,6 +424,7 @@ int uv_backend_timeout(const uv_loop_t* loop) { static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { + uv__loop_internal_fields_t* lfields; DWORD bytes; ULONG_PTR key; OVERLAPPED* overlapped; @@ -433,9 +434,10 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { uint64_t user_timeout; int reset_timeout; + lfields = uv__get_internal_fields(loop); timeout_time = loop->time + timeout; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -450,6 +452,12 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + GetQueuedCompletionStatus(loop->iocp, &bytes, &key, @@ -507,6 +515,7 @@ static void uv__poll_wine(uv_loop_t* loop, DWORD timeout) { static void uv__poll(uv_loop_t* loop, DWORD timeout) { + uv__loop_internal_fields_t* lfields; BOOL success; uv_req_t* req; OVERLAPPED_ENTRY overlappeds[128]; @@ -518,9 +527,10 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) { uint64_t actual_timeout; int reset_timeout; + lfields = uv__get_internal_fields(loop); timeout_time = loop->time + timeout; - if (uv__get_internal_fields(loop)->flags & UV_METRICS_IDLE_TIME) { + if (lfields->flags & UV_METRICS_IDLE_TIME) { reset_timeout = 1; user_timeout = timeout; timeout = 0; @@ -537,6 +547,12 @@ static void uv__poll(uv_loop_t* loop, DWORD timeout) { if (timeout != 0) uv__metrics_set_provider_entry_time(loop); + /* Store the current timeout in a location that's globally accessible so + * other locations like uv__work_done() can determine whether the queue + * of events in the callback were waiting when poll was called. + */ + lfields->current_timeout = timeout; + success = pGetQueuedCompletionStatusEx(loop->iocp, overlappeds, ARRAY_SIZE(overlappeds), diff --git a/test/test-list.h b/test/test-list.h index 13c96d1d..e6332ed4 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -556,6 +556,7 @@ TEST_DECLARE (utf8_decode1_overrun) TEST_DECLARE (uname) TEST_DECLARE (metrics_info_check) +TEST_DECLARE (metrics_pool_events) TEST_DECLARE (metrics_idle_time) TEST_DECLARE (metrics_idle_time_thread) TEST_DECLARE (metrics_idle_time_zero) @@ -1192,6 +1193,7 @@ TASK_LIST_START TEST_HELPER (readable_on_eof, tcp4_echo_server) TEST_ENTRY (metrics_info_check) + TEST_ENTRY (metrics_pool_events) TEST_ENTRY (metrics_idle_time) TEST_ENTRY (metrics_idle_time_thread) TEST_ENTRY (metrics_idle_time_zero) diff --git a/test/test-metrics.c b/test/test-metrics.c index c6fa432a..d532f4ef 100644 --- a/test/test-metrics.c +++ b/test/test-metrics.c @@ -34,6 +34,7 @@ typedef struct { static uint64_t last_events_count; static char test_buf[] = "test-buffer\n"; static fs_reqs_t fs_reqs; +static int pool_events_counter; static void timer_spin_cb(uv_timer_t* handle) { @@ -239,3 +240,153 @@ TEST_IMPL(metrics_info_check) { MAKE_VALGRIND_HAPPY(uv_default_loop()); return 0; } + + +static void fs_prepare_cb(uv_prepare_t* handle) { + uv_metrics_t metrics; + + ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics)); + + if (pool_events_counter == 1) + ASSERT_EQ(metrics.events, metrics.events_waiting); + + if (pool_events_counter < 7) + return; + + uv_prepare_stop(handle); + pool_events_counter = -42; +} + + +static void fs_stat_cb(uv_fs_t* req) { + uv_fs_req_cleanup(req); + pool_events_counter++; +} + + +static void fs_work_cb(uv_work_t* req) { +} + + +static void fs_after_work_cb(uv_work_t* req, int status) { + free(req); + pool_events_counter++; +} + + +static void fs_write_cb(uv_fs_t* req) { + uv_work_t* work1 = malloc(sizeof(*work1)); + uv_work_t* work2 = malloc(sizeof(*work2)); + pool_events_counter++; + + uv_fs_req_cleanup(req); + + ASSERT_OK(uv_queue_work(uv_default_loop(), + work1, + fs_work_cb, + fs_after_work_cb)); + ASSERT_OK(uv_queue_work(uv_default_loop(), + work2, + fs_work_cb, + fs_after_work_cb)); +} + + +static void fs_random_cb(uv_random_t* req, int status, void* buf, size_t len) { + pool_events_counter++; +} + + +static void fs_addrinfo_cb(uv_getaddrinfo_t* req, + int status, + struct addrinfo* res) { + uv_freeaddrinfo(req->addrinfo); + pool_events_counter++; +} + + +TEST_IMPL(metrics_pool_events) { + uv_buf_t iov; + uv_fs_t open_req; + uv_fs_t stat1_req; + uv_fs_t stat2_req; + uv_fs_t unlink_req; + uv_fs_t write_req; + uv_getaddrinfo_t addrinfo_req; + uv_metrics_t metrics; + uv_prepare_t prepare; + uv_random_t random_req; + int fd; + char rdata; + + ASSERT_OK(uv_loop_configure(uv_default_loop(), UV_METRICS_IDLE_TIME)); + + uv_fs_unlink(NULL, &unlink_req, "test_file", NULL); + uv_fs_req_cleanup(&unlink_req); + + ASSERT_OK(uv_prepare_init(uv_default_loop(), &prepare)); + ASSERT_OK(uv_prepare_start(&prepare, fs_prepare_cb)); + + pool_events_counter = 0; + fd = uv_fs_open(NULL, + &open_req, + "test_file", + O_WRONLY | O_CREAT, + S_IRUSR | S_IWUSR, + NULL); + ASSERT_GT(fd, 0); + uv_fs_req_cleanup(&open_req); + + iov = uv_buf_init(test_buf, sizeof(test_buf)); + ASSERT_OK(uv_fs_write(uv_default_loop(), + &write_req, + fd, + &iov, + 1, + 0, + fs_write_cb)); + ASSERT_OK(uv_fs_stat(uv_default_loop(), + &stat1_req, + "test_file", + fs_stat_cb)); + ASSERT_OK(uv_fs_stat(uv_default_loop(), + &stat2_req, + "test_file", + fs_stat_cb)); + ASSERT_OK(uv_random(uv_default_loop(), + &random_req, + &rdata, + 1, + 0, + fs_random_cb)); + ASSERT_OK(uv_getaddrinfo(uv_default_loop(), + &addrinfo_req, + fs_addrinfo_cb, + "example.invalid", + NULL, + NULL)); + + /* Sleep for a moment to hopefully force the events to complete before + * entering the event loop. */ + uv_sleep(100); + + ASSERT_OK(uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + + ASSERT_OK(uv_metrics_info(uv_default_loop(), &metrics)); + /* It's possible for uv__work_done() to execute one extra time even though the + * QUEUE has already been cleared out. This has to do with the way we use an + * uv_async to tell the event loop thread to process the worker pool QUEUE. */ + ASSERT_GE(metrics.events, 7); + /* It's possible one of the other events also got stuck in the event queue, so + * check GE instead of EQ. Reason for 4 instead of 5 is because the call to + * uv_getaddrinfo() is racey and slow. So can't guarantee that it'll always + * execute before sleep completes. */ + ASSERT_GE(metrics.events_waiting, 4); + ASSERT_EQ(pool_events_counter, -42); + + uv_fs_unlink(NULL, &unlink_req, "test_file", NULL); + uv_fs_req_cleanup(&unlink_req); + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +}