From e900006642a5e8d4ab27a8760afcc03136f0dd8f Mon Sep 17 00:00:00 2001 From: daomingq Date: Fri, 21 Oct 2022 21:18:10 +0800 Subject: [PATCH] thread: add support for affinity (#3774) Backported thread affinity feature and related dependency commits from master. It will add support for those APIs: uv_cpumask_size, uv_thread_setaffinity, uv_thread_getaffinity. The supported platforms are Linux, Freebsd, and Windows. Empty implementations (returning UV_ENOTSUP) on non-supported platforms (such as OS X and AIX). --- CMakeLists.txt | 1 + Makefile.am | 1 + docs/src/misc.rst | 7 +++ docs/src/threading.rst | 31 ++++++++++ include/uv.h | 8 +++ src/unix/core.c | 9 +++ src/unix/thread.c | 98 ++++++++++++++++++++++++++++++ src/win/thread.c | 72 ++++++++++++++++++++++ test/task.h | 7 +++ test/test-list.h | 2 + test/test-thread-affinity.c | 116 ++++++++++++++++++++++++++++++++++++ 11 files changed, 352 insertions(+) create mode 100644 test/test-thread-affinity.c diff --git a/CMakeLists.txt b/CMakeLists.txt index 74ee7936..d2dfdc5f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -615,6 +615,7 @@ if(LIBUV_BUILD_TESTS) test/test-tcp-write-to-half-open-connection.c test/test-tcp-writealot.c test/test-test-macros.c + test/test-thread-affinity.c test/test-thread-equal.c test/test-thread.c test/test-threadpool-cancel.c diff --git a/Makefile.am b/Makefile.am index b0b4f252..0e45172e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -283,6 +283,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-test-macros.c \ test/test-thread-equal.c \ test/test-thread.c \ + test/test-thread-affinity.c \ test/test-threadpool-cancel.c \ test/test-threadpool.c \ test/test-timer-again.c \ diff --git a/docs/src/misc.rst b/docs/src/misc.rst index f6d26efc..423ef84c 100644 --- a/docs/src/misc.rst +++ b/docs/src/misc.rst @@ -365,6 +365,13 @@ API Frees the `cpu_infos` array previously allocated with :c:func:`uv_cpu_info`. +.. c:function:: int uv_cpumask_size(void) + + Returns the maximum size of the mask used for process/thread affinities, + or ``UV_ENOTSUP`` if affinities are not supported on the current platform. + + .. versionadded:: 1.45.0 + .. c:function:: int uv_interface_addresses(uv_interface_address_t** addresses, int* count) Gets address information about the network interfaces on the system. An diff --git a/docs/src/threading.rst b/docs/src/threading.rst index 7ca1d4b7..ca9fb0ce 100644 --- a/docs/src/threading.rst +++ b/docs/src/threading.rst @@ -88,6 +88,37 @@ Threads .. versionadded:: 1.26.0 +.. c:function:: int uv_thread_setaffinity(uv_thread_t* tid, char* cpumask, char* oldmask, size_t mask_size) + + Sets the specified thread's affinity to cpumask, which is specified in + bytes. Optionally returning the previous affinity setting in oldmask. + On Unix, uses :man:`pthread_getaffinity_np(3)` to get the affinity setting + and maps the cpu_set_t to bytes in oldmask. Then maps the bytes in cpumask + to a cpu_set_t and uses :man:`pthread_setaffinity_np(3)`. On Windows, maps + the bytes in cpumask to a bitmask and uses SetThreadAffinityMask() which + returns the previous affinity setting. + + The mask_size specifies the number of entries (bytes) in cpumask / oldmask, + and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`. + + .. note:: + Thread affinity setting is not atomic on Windows. Unsupported on macOS. + + .. versionadded:: 1.45.0 + +.. c:function:: int uv_thread_getaffinity(uv_thread_t* tid, char* cpumask, size_t mask_size) + + Gets the specified thread's affinity setting. On Unix, this maps the + cpu_set_t returned by :man:`pthread_getaffinity_np(3)` to bytes in cpumask. + + The mask_size specifies the number of entries (bytes) in cpumask, + and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`. + + .. note:: + Thread affinity getting is not atomic on Windows. Unsupported on macOS. + + .. versionadded:: 1.45.0 + .. c:function:: uv_thread_t uv_thread_self(void) .. c:function:: int uv_thread_join(uv_thread_t *tid) .. c:function:: int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2) diff --git a/include/uv.h b/include/uv.h index d68c5c84..38397424 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1240,6 +1240,7 @@ UV_EXTERN int uv_os_setpriority(uv_pid_t pid, int priority); UV_EXTERN unsigned int uv_available_parallelism(void); UV_EXTERN int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count); UV_EXTERN void uv_free_cpu_info(uv_cpu_info_t* cpu_infos, int count); +UV_EXTERN int uv_cpumask_size(void); UV_EXTERN int uv_interface_addresses(uv_interface_address_t** addresses, int* count); @@ -1782,6 +1783,13 @@ UV_EXTERN int uv_thread_create_ex(uv_thread_t* tid, const uv_thread_options_t* params, uv_thread_cb entry, void* arg); +UV_EXTERN int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size); +UV_EXTERN int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size); UV_EXTERN uv_thread_t uv_thread_self(void); UV_EXTERN int uv_thread_join(uv_thread_t *tid); UV_EXTERN int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2); diff --git a/src/unix/core.c b/src/unix/core.c index aaa980b8..fc8559ba 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -72,6 +72,8 @@ extern char** environ; # include # include # include +# include +# include # if defined(__FreeBSD__) # define uv__accept4 accept4 # endif @@ -1420,6 +1422,13 @@ uv_pid_t uv_os_getppid(void) { return getppid(); } +int uv_cpumask_size(void) { +#if defined(__linux__) || defined(__FreeBSD__) + return CPU_SETSIZE; +#else + return UV_ENOTSUP; +#endif +} int uv_os_getpriority(uv_pid_t pid, int* priority) { int r; diff --git a/src/unix/thread.c b/src/unix/thread.c index d89e5cd1..77159511 100644 --- a/src/unix/thread.c +++ b/src/unix/thread.c @@ -41,6 +41,17 @@ #include /* gnu_get_libc_version() */ #endif +#if defined(__linux__) +# include +# define uv__cpu_set_t cpu_set_t +#elif defined(__FreeBSD__) +# include +# include +# include +# define uv__cpu_set_t cpuset_t +#endif + + #undef NANOSEC #define NANOSEC ((uint64_t) 1e9) @@ -284,6 +295,93 @@ int uv_thread_create_ex(uv_thread_t* tid, return UV__ERR(err); } +#if defined(__linux__) || defined(__FreeBSD__) + +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + int i; + int r; + uv__cpu_set_t cpuset; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + if (cpumasksize < 0) + return cpumasksize; + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + if (oldmask != NULL) { + r = uv_thread_getaffinity(tid, oldmask, mask_size); + if (r < 0) + return r; + } + + CPU_ZERO(&cpuset); + for (i = 0; i < cpumasksize; i++) + if (cpumask[i]) + CPU_SET(i, &cpuset); + +#if defined(__ANDROID__) + if (sched_setaffinity(pthread_gettid_np(*tid), sizeof(cpuset), &cpuset)) + r = errno; + else + r = 0; +#else + r = pthread_setaffinity_np(*tid, sizeof(cpuset), &cpuset); +#endif + + return UV__ERR(r); +} + + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + int r; + int i; + uv__cpu_set_t cpuset; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + if (cpumasksize < 0) + return cpumasksize; + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + CPU_ZERO(&cpuset); +#if defined(__ANDROID__) + if (sched_getaffinity(pthread_gettid_np(*tid), sizeof(cpuset), &cpuset)) + r = errno; + else + r = 0; +#else + r = pthread_getaffinity_np(*tid, sizeof(cpuset), &cpuset); +#endif + if (r) + return UV__ERR(r); + for (i = 0; i < cpumasksize; i++) + cpumask[i] = !!CPU_ISSET(i, &cpuset); + + return 0; +} +#else +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + return UV_ENOTSUP; +} + + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + return UV_ENOTSUP; +} +#endif /* defined(__linux__) || defined(UV_BSD_H) */ + uv_thread_t uv_thread_self(void) { return pthread_self(); diff --git a/src/win/thread.c b/src/win/thread.c index d3b1c96b..da150e45 100644 --- a/src/win/thread.c +++ b/src/win/thread.c @@ -180,6 +180,78 @@ int uv_thread_create_ex(uv_thread_t* tid, return UV_EIO; } +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + int i; + HANDLE hproc; + DWORD_PTR procmask; + DWORD_PTR sysmask; + DWORD_PTR threadmask; + DWORD_PTR oldthreadmask; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + assert(cpumasksize > 0); + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + hproc = GetCurrentProcess(); + if (!GetProcessAffinityMask(hproc, &procmask, &sysmask)) + return uv_translate_sys_error(GetLastError()); + + threadmask = 0; + for (i = 0; i < cpumasksize; i++) { + if (cpumask[i]) { + if (procmask & (1 << i)) + threadmask |= 1 << i; + else + return UV_EINVAL; + } + } + + oldthreadmask = SetThreadAffinityMask(*tid, threadmask); + if (oldthreadmask == 0) + return uv_translate_sys_error(GetLastError()); + + if (oldmask != NULL) { + for (i = 0; i < cpumasksize; i++) + oldmask[i] = (oldthreadmask >> i) & 1; + } + + return 0; +} + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + int i; + HANDLE hproc; + DWORD_PTR procmask; + DWORD_PTR sysmask; + DWORD_PTR threadmask; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + assert(cpumasksize > 0); + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + hproc = GetCurrentProcess(); + if (!GetProcessAffinityMask(hproc, &procmask, &sysmask)) + return uv_translate_sys_error(GetLastError()); + + threadmask = SetThreadAffinityMask(*tid, procmask); + if (threadmask == 0 || SetThreadAffinityMask(*tid, threadmask) == 0) + return uv_translate_sys_error(GetLastError()); + + for (i = 0; i < cpumasksize; i++) + cpumask[i] = (threadmask >> i) & 1; + + return 0; +} + uv_thread_t uv_thread_self(void) { uv_thread_t key; diff --git a/test/task.h b/test/task.h index 44fa25bf..91ccb59a 100644 --- a/test/task.h +++ b/test/task.h @@ -370,4 +370,11 @@ UNUSED static int can_ipv6(void) { "Cygwin runtime hangs on listen+connect in same process." #endif +#if !defined(__linux__) && \ + !defined(__FreeBSD__) && \ + !defined(_WIN32) +# define NO_CPU_AFFINITY \ + "affinity not supported on this platform." +#endif + #endif /* TASK_H_ */ diff --git a/test/test-list.h b/test/test-list.h index 8983cbc3..b2e8a75f 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -450,6 +450,7 @@ TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_rwlock_trylock) TEST_DECLARE (thread_create) TEST_DECLARE (thread_equal) +TEST_DECLARE (thread_affinity) TEST_DECLARE (dlerror) #if (defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))) && \ !defined(__sun) @@ -1122,6 +1123,7 @@ TASK_LIST_START TEST_ENTRY (thread_rwlock_trylock) TEST_ENTRY (thread_create) TEST_ENTRY (thread_equal) + TEST_ENTRY (thread_affinity) TEST_ENTRY (dlerror) TEST_ENTRY (ip4_addr) TEST_ENTRY (ip6_addr_link_local) diff --git a/test/test-thread-affinity.c b/test/test-thread-affinity.c new file mode 100644 index 00000000..43783bc4 --- /dev/null +++ b/test/test-thread-affinity.c @@ -0,0 +1,116 @@ +/* Copyright libuv project contributors. All rights reserved. + */ + +#include "uv.h" +#include "task.h" + +#include + +#ifndef NO_CPU_AFFINITY + +static void check_affinity(void* arg) { + int r; + char* cpumask; + int cpumasksize; + uv_thread_t tid; + + cpumask = (char*)arg; + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize > 0); + tid = uv_thread_self(); + r = uv_thread_setaffinity(&tid, cpumask, NULL, cpumasksize); + ASSERT(r == 0); + r = uv_thread_setaffinity(&tid, cpumask + cpumasksize, cpumask, cpumasksize); + ASSERT(r == 0); +} + + +TEST_IMPL(thread_affinity) { + int t1first; + int t1second; + int t2first; + int t2second; + int cpumasksize; + char* cpumask; + int ncpus; + int r; + uv_thread_t threads[3]; + +#ifdef _WIN32 + /* uv_thread_self isn't defined for the main thread on Windows */ + threads[0] = GetCurrentThread(); +#else + threads[0] = uv_thread_self(); +#endif + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize > 0); + + cpumask = calloc(4 * cpumasksize, 1); + ASSERT(cpumask); + + r = uv_thread_getaffinity(&threads[0], cpumask, cpumasksize); + ASSERT(r == 0); + ASSERT(cpumask[0] && "test must be run with cpu 0 affinity"); + ncpus = 0; + while (cpumask[++ncpus]) { } + memset(cpumask, 0, 4 * cpumasksize); + + t1first = cpumasksize * 0; + t1second = cpumasksize * 1; + t2first = cpumasksize * 2; + t2second = cpumasksize * 3; + + cpumask[t1second + 0] = 1; + cpumask[t2first + 0] = 1; + cpumask[t1first + (ncpus >= 2)] = 1; + cpumask[t2second + (ncpus >= 2)] = 1; +#ifdef __linux__ + cpumask[t1second + 2] = 1; + cpumask[t2first + 2] = 1; + cpumask[t1first + 3] = 1; + cpumask[t2second + 3] = 1; +#else + if (ncpus >= 3) { + cpumask[t1second + 2] = 1; + cpumask[t2first + 2] = 1; + } + if (ncpus >= 4) { + cpumask[t1first + 3] = 1; + cpumask[t2second + 3] = 1; + } +#endif + + ASSERT(0 == uv_thread_create(threads + 1, + check_affinity, + &cpumask[t1first])); + ASSERT(0 == uv_thread_create(threads + 2, + check_affinity, + &cpumask[t2first])); + ASSERT(0 == uv_thread_join(threads + 1)); + ASSERT(0 == uv_thread_join(threads + 2)); + + ASSERT(cpumask[t1first + 0] == (ncpus == 1)); + ASSERT(cpumask[t1first + 1] == (ncpus >= 2)); + ASSERT(cpumask[t1first + 2] == 0); + ASSERT(cpumask[t1first + 3] == (ncpus >= 4)); + + ASSERT(cpumask[t2first + 0] == 1); + ASSERT(cpumask[t2first + 1] == 0); + ASSERT(cpumask[t2first + 2] == (ncpus >= 3)); + ASSERT(cpumask[t2first + 3] == 0); + + free(cpumask); + + return 0; +} + +#else + +TEST_IMPL(thread_affinity) { + int cpumasksize; + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize == UV_ENOTSUP); + return 0; +} + +#endif