misc: introduce uv_get|set_threadpool_size

This patch adds the posibility to modify the threadpool size at runtime

Fixes: https://github.com/libuv/libuv/issues/4401
Signed-off-by: Juan José Arboleda <soyjuanarbol@gmail.com>
This commit is contained in:
Juan José Arboleda 2024-12-16 23:09:59 -05:00
parent e59e2a9e49
commit e5471e5bd3
6 changed files with 210 additions and 1 deletions

View File

@ -670,6 +670,7 @@ if(LIBUV_BUILD_TESTS)
test/test-thread-name.c
test/test-thread-priority.c
test/test-threadpool-cancel.c
test/test-threadpool-size.c
test/test-threadpool.c
test/test-timer-again.c
test/test-timer-from-check.c

View File

@ -297,6 +297,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-thread-name.c \
test/test-thread-priority.c \
test/test-threadpool-cancel.c \
test/test-threadpool-size.c \
test/test-threadpool.c \
test/test-timer-again.c \
test/test-timer-from-check.c \

View File

@ -1337,6 +1337,9 @@ enum {
UV_EXTERN int uv_thread_getpriority(uv_thread_t tid, int* priority);
UV_EXTERN int uv_thread_setpriority(uv_thread_t tid, int priority);
UV_EXTERN unsigned int uv_get_threadpool_size(void);
UV_EXTERN unsigned int uv_set_threadpool_size(unsigned int n);
UV_EXTERN unsigned int uv_available_parallelism(void);
UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count);
UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count);

View File

@ -31,16 +31,20 @@
static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
static uv_cond_t cond_kill;
static uv_mutex_t mutex;
static unsigned int idle_threads;
static unsigned int slow_io_work_running;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
static uv_thread_t tid_to_join;
static struct uv__queue exit_message;
static struct uv__queue kill_thread_message;
static struct uv__queue wq;
static struct uv__queue run_slow_work_message;
static struct uv__queue slow_io_pending_wq;
static uv_sem_t sem;
static unsigned int slow_work_thread_threshold(void) {
return (nthreads + 1) / 2;
@ -85,6 +89,16 @@ static void worker(void* arg) {
break;
}
/* Allow the main thread to kill targeted thread. */
if (q == &kill_thread_message) {
tid_to_join = uv_thread_self();
uv__queue_remove(q); /* Remove the kill message. */
uv__queue_init(q);
uv_cond_signal(&cond_kill);
uv_mutex_unlock(&mutex);
break;
}
uv__queue_remove(q);
uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */
@ -185,6 +199,7 @@ void uv__threadpool_cleanup(void) {
uv_mutex_destroy(&mutex);
uv_cond_destroy(&cond);
uv_cond_destroy(&cond_kill);
threads = NULL;
nthreads = 0;
@ -195,7 +210,6 @@ static void init_threads(void) {
uv_thread_options_t config;
unsigned int i;
const char* val;
uv_sem_t sem;
nthreads = ARRAY_SIZE(default_threads);
val = getenv("UV_THREADPOOL_SIZE");
@ -218,6 +232,9 @@ static void init_threads(void) {
if (uv_cond_init(&cond))
abort();
if (uv_cond_init(&cond_kill))
abort();
if (uv_mutex_init(&mutex))
abort();
@ -263,6 +280,146 @@ static void init_once(void) {
}
int uv__threads_spin(unsigned int n) {
uv_thread_options_t config;
int r;
size_t i;
uv_thread_t* threads_tmp;
assert(n > nthreads);
config.flags = UV_THREAD_HAS_STACK_SIZE;
config.stack_size = 8u << 20; /* 8 MB */
threads_tmp = uv__malloc(n * sizeof(threads[0]));
if (threads_tmp == NULL)
return UV_ENOMEM;
/* Copy all threads into the new list. */
for (i = 0; i < nthreads; i++)
threads_tmp[i] = threads[i];
/* Free the old list. And make it points to the new one */
if (threads != default_threads)
uv__free(threads);
threads = threads_tmp;
r = uv_sem_init(&sem, 0);
if (r)
goto out;
while (nthreads < n) {
/* Wire up the error code to the return value. */
r = uv_thread_create_ex(threads + nthreads, &config, worker, &sem);
if (r)
goto out;
uv_sem_wait(&sem);
nthreads++;
}
assert(nthreads == n);
out:
uv_sem_destroy(&sem);
return r;
}
int uv__threads_join(unsigned int n) {
int r;
size_t i;
size_t j;
uv_thread_t* threads_tmp;
assert(nthreads > n);
while (nthreads != n) {
/* Request any thread to kill */
uv__queue_insert_tail(&wq, &kill_thread_message);
if (idle_threads > 0)
uv_cond_signal(&cond);
uv_cond_wait(&cond_kill, &mutex);
r = uv_thread_join(&tid_to_join);
if (r)
return r;
/* Alloc for one thread less */
threads_tmp = uv__malloc((nthreads - 1) * sizeof(threads[0]));
if (threads_tmp == NULL)
return UV_ENOMEM;
/* Copy all threads into the new list except the joined one. */
j = 0;
for (i = 0; i < nthreads; i++) {
if (tid_to_join != threads[i]) {
if (j >= nthreads) abort();
threads_tmp[j] = threads[i];
j++;
}
}
/* Free the old list (if malloc'd) */
if (threads != default_threads)
uv__free(threads);
/* Decrease the number of threads */
nthreads--;
threads = threads_tmp;
}
assert(nthreads == n);
return 0;
}
unsigned int uv_get_threadpool_size(void) {
int r;
uv_once(&once, init_once);
uv_mutex_lock(&mutex);
r = nthreads;
uv_mutex_unlock(&mutex);
return r;
}
unsigned int uv_set_threadpool_size(unsigned int n) {
int r;
uv_once(&once, init_once);
/* For now, NO-threadpool is not supported. */
if (n > MAX_THREADPOOL_SIZE || n < 1)
return UV_EINVAL;
uv_mutex_lock(&mutex);
/* No-op */
if (n == nthreads) {
uv_mutex_unlock(&mutex);
return 0;
}
/* Shrink the threadpool request. */
if (n < nthreads) {
r = uv__threads_join(n);
uv_mutex_unlock(&mutex);
return r;
}
/* Grow the threadpool. */
r = uv__threads_spin(n);
uv_mutex_unlock(&mutex);
return r;
}
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
enum uv__work_kind kind,

View File

@ -456,6 +456,7 @@ TEST_DECLARE (fs_wtf)
TEST_DECLARE (fs_get_system_error)
TEST_DECLARE (strscpy)
TEST_DECLARE (strtok)
TEST_DECLARE (threadpool_size)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
@ -1175,6 +1176,7 @@ TASK_LIST_START
TEST_ENTRY (open_osfhandle_valid_handle)
TEST_ENTRY (strscpy)
TEST_ENTRY (strtok)
TEST_ENTRY (threadpool_size)
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000)

View File

@ -0,0 +1,45 @@
/* Copyright libuv project contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
TEST_IMPL(threadpool_size) {
/* The default thread pool size is 4 */
ASSERT_EQ(uv_get_threadpool_size(), 4);
/* Normal use case (increase the size of the thread pool) */
ASSERT_OK(uv_set_threadpool_size(5));
ASSERT_EQ(uv_get_threadpool_size(), 5);
/* Shrink the thread pool */
ASSERT_OK(uv_set_threadpool_size(4));
ASSERT_EQ(uv_get_threadpool_size(), 4);
/* Grow the thread pool again */
ASSERT_OK(uv_set_threadpool_size(5));
ASSERT_EQ(uv_get_threadpool_size(), 5);
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
MAKE_VALGRIND_HAPPY(uv_default_loop());
return 0;
}