diff --git a/CMakeLists.txt b/CMakeLists.txt index af89db2d..26915b41 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -670,6 +670,7 @@ if(LIBUV_BUILD_TESTS) test/test-thread-name.c test/test-thread-priority.c test/test-threadpool-cancel.c + test/test-threadpool-size.c test/test-threadpool.c test/test-timer-again.c test/test-timer-from-check.c diff --git a/Makefile.am b/Makefile.am index 9b9e6be7..636931b4 100644 --- a/Makefile.am +++ b/Makefile.am @@ -297,6 +297,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-thread-name.c \ test/test-thread-priority.c \ test/test-threadpool-cancel.c \ + test/test-threadpool-size.c \ test/test-threadpool.c \ test/test-timer-again.c \ test/test-timer-from-check.c \ diff --git a/include/uv.h b/include/uv.h index b1689e96..e8742cb9 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1337,6 +1337,9 @@ enum { UV_EXTERN int uv_thread_getpriority(uv_thread_t tid, int* priority); UV_EXTERN int uv_thread_setpriority(uv_thread_t tid, int priority); +UV_EXTERN unsigned int uv_get_threadpool_size(void); +UV_EXTERN unsigned int uv_set_threadpool_size(unsigned int n); + UV_EXTERN unsigned int uv_available_parallelism(void); UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count); UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count); diff --git a/src/threadpool.c b/src/threadpool.c index 98d81cc7..9f9dc554 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -31,16 +31,20 @@ static uv_once_t once = UV_ONCE_INIT; static uv_cond_t cond; +static uv_cond_t cond_kill; static uv_mutex_t mutex; static unsigned int idle_threads; static unsigned int slow_io_work_running; static unsigned int nthreads; static uv_thread_t* threads; static uv_thread_t default_threads[4]; +static uv_thread_t tid_to_join; static struct uv__queue exit_message; +static struct uv__queue kill_thread_message; static struct uv__queue wq; static struct uv__queue run_slow_work_message; static struct uv__queue slow_io_pending_wq; +static uv_sem_t sem; static unsigned int slow_work_thread_threshold(void) { return (nthreads + 1) / 2; @@ -85,6 +89,16 @@ static void worker(void* arg) { break; } + /* Allow the main thread to kill targeted thread. */ + if (q == &kill_thread_message) { + tid_to_join = uv_thread_self(); + uv__queue_remove(q); /* Remove the kill message. */ + uv__queue_init(q); + uv_cond_signal(&cond_kill); + uv_mutex_unlock(&mutex); + break; + } + uv__queue_remove(q); uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ @@ -185,6 +199,7 @@ void uv__threadpool_cleanup(void) { uv_mutex_destroy(&mutex); uv_cond_destroy(&cond); + uv_cond_destroy(&cond_kill); threads = NULL; nthreads = 0; @@ -195,7 +210,6 @@ static void init_threads(void) { uv_thread_options_t config; unsigned int i; const char* val; - uv_sem_t sem; nthreads = ARRAY_SIZE(default_threads); val = getenv("UV_THREADPOOL_SIZE"); @@ -218,6 +232,9 @@ static void init_threads(void) { if (uv_cond_init(&cond)) abort(); + if (uv_cond_init(&cond_kill)) + abort(); + if (uv_mutex_init(&mutex)) abort(); @@ -263,6 +280,146 @@ static void init_once(void) { } +int uv__threads_spin(unsigned int n) { + uv_thread_options_t config; + int r; + size_t i; + uv_thread_t* threads_tmp; + + assert(n > nthreads); + + config.flags = UV_THREAD_HAS_STACK_SIZE; + config.stack_size = 8u << 20; /* 8 MB */ + + threads_tmp = uv__malloc(n * sizeof(threads[0])); + if (threads_tmp == NULL) + return UV_ENOMEM; + + + /* Copy all threads into the new list. */ + for (i = 0; i < nthreads; i++) + threads_tmp[i] = threads[i]; + + /* Free the old list. And make it points to the new one */ + if (threads != default_threads) + uv__free(threads); + + threads = threads_tmp; + + r = uv_sem_init(&sem, 0); + if (r) + goto out; + + while (nthreads < n) { + /* Wire up the error code to the return value. */ + r = uv_thread_create_ex(threads + nthreads, &config, worker, &sem); + if (r) + goto out; + + uv_sem_wait(&sem); + nthreads++; + } + + assert(nthreads == n); + +out: + uv_sem_destroy(&sem); + return r; +} + + +int uv__threads_join(unsigned int n) { + int r; + size_t i; + size_t j; + uv_thread_t* threads_tmp; + + assert(nthreads > n); + while (nthreads != n) { + /* Request any thread to kill */ + uv__queue_insert_tail(&wq, &kill_thread_message); + + if (idle_threads > 0) + uv_cond_signal(&cond); + + uv_cond_wait(&cond_kill, &mutex); + r = uv_thread_join(&tid_to_join); + if (r) + return r; + + /* Alloc for one thread less */ + threads_tmp = uv__malloc((nthreads - 1) * sizeof(threads[0])); + if (threads_tmp == NULL) + return UV_ENOMEM; + + + /* Copy all threads into the new list except the joined one. */ + j = 0; + for (i = 0; i < nthreads; i++) { + if (tid_to_join != threads[i]) { + if (j >= nthreads) abort(); + threads_tmp[j] = threads[i]; + j++; + } + } + + /* Free the old list (if malloc'd) */ + if (threads != default_threads) + uv__free(threads); + + /* Decrease the number of threads */ + nthreads--; + threads = threads_tmp; + } + + assert(nthreads == n); + + return 0; +} + + +unsigned int uv_get_threadpool_size(void) { + int r; + + uv_once(&once, init_once); + + uv_mutex_lock(&mutex); + r = nthreads; + uv_mutex_unlock(&mutex); + return r; +} + + +unsigned int uv_set_threadpool_size(unsigned int n) { + int r; + uv_once(&once, init_once); + + /* For now, NO-threadpool is not supported. */ + if (n > MAX_THREADPOOL_SIZE || n < 1) + return UV_EINVAL; + + uv_mutex_lock(&mutex); + + /* No-op */ + if (n == nthreads) { + uv_mutex_unlock(&mutex); + return 0; + } + + /* Shrink the threadpool request. */ + if (n < nthreads) { + r = uv__threads_join(n); + uv_mutex_unlock(&mutex); + return r; + } + + /* Grow the threadpool. */ + r = uv__threads_spin(n); + uv_mutex_unlock(&mutex); + return r; +} + + void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, diff --git a/test/test-list.h b/test/test-list.h index c6651299..3de2cd47 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -456,6 +456,7 @@ TEST_DECLARE (fs_wtf) TEST_DECLARE (fs_get_system_error) TEST_DECLARE (strscpy) TEST_DECLARE (strtok) +TEST_DECLARE (threadpool_size) TEST_DECLARE (threadpool_queue_work_simple) TEST_DECLARE (threadpool_queue_work_einval) TEST_DECLARE (threadpool_multiple_event_loops) @@ -1175,6 +1176,7 @@ TASK_LIST_START TEST_ENTRY (open_osfhandle_valid_handle) TEST_ENTRY (strscpy) TEST_ENTRY (strtok) + TEST_ENTRY (threadpool_size) TEST_ENTRY (threadpool_queue_work_simple) TEST_ENTRY (threadpool_queue_work_einval) TEST_ENTRY_CUSTOM (threadpool_multiple_event_loops, 0, 0, 60000) diff --git a/test/test-threadpool-size.c b/test/test-threadpool-size.c new file mode 100644 index 00000000..c7eb8681 --- /dev/null +++ b/test/test-threadpool-size.c @@ -0,0 +1,45 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +TEST_IMPL(threadpool_size) { + /* The default thread pool size is 4 */ + ASSERT_EQ(uv_get_threadpool_size(), 4); + + /* Normal use case (increase the size of the thread pool) */ + ASSERT_OK(uv_set_threadpool_size(5)); + ASSERT_EQ(uv_get_threadpool_size(), 5); + + /* Shrink the thread pool */ + ASSERT_OK(uv_set_threadpool_size(4)); + ASSERT_EQ(uv_get_threadpool_size(), 4); + + /* Grow the thread pool again */ + ASSERT_OK(uv_set_threadpool_size(5)); + ASSERT_EQ(uv_get_threadpool_size(), 5); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + MAKE_VALGRIND_HAPPY(uv_default_loop()); + return 0; +}