diff --git a/CMakeLists.txt b/CMakeLists.txt index aaa9d673..8464e60c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -141,6 +141,7 @@ set(uv_test_sources test/test-tcp-writealot.c test/test-thread-equal.c test/test-thread.c + test/test-thread-affinity.c test/test-threadpool-cancel.c test/test-threadpool.c test/test-timer-again.c diff --git a/Makefile.am b/Makefile.am index 697d4599..225f5482 100644 --- a/Makefile.am +++ b/Makefile.am @@ -263,6 +263,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-write-queue-order.c \ test/test-thread-equal.c \ test/test-thread.c \ + test/test-thread-affinity.c \ test/test-threadpool-cancel.c \ test/test-threadpool.c \ test/test-timer-again.c \ diff --git a/docs/src/threading.rst b/docs/src/threading.rst index 89bb4a6f..1f957db9 100644 --- a/docs/src/threading.rst +++ b/docs/src/threading.rst @@ -59,6 +59,45 @@ Threads .. versionchanged:: 1.4.1 returns a UV_E* error code on failure +.. c:function:: int uv_thread_setaffinity(uv_thread_t* tid, char* cpumask, char* oldmask, size_t mask_size) + + Sets the specified thread's affinity to cpumask, which is specified in + bytes. Optionally returning the previous affinity setting in oldmask. + On Unix, uses :man:`pthread_getaffinity_np(3)` to get the affinity setting + and maps the cpu_set_t to bytes in oldmask. Then maps the bytes in cpumask + to a cpu_set_t and uses :man:`pthread_setaffinity_np(3)`. On Windows, maps + the bytes in cpumask to a bitmask and uses SetThreadAffinityMask() which + returns the previous affinity setting. + + The mask_size specifies the number of entries (bytes) in cpumask / oldmask, + and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`. + + .. note:: + Thread affinity setting is not atomic on Windows. Unsupported on macOS. + + .. versionadded:: 2.0.0 + +.. c:function:: int uv_thread_getaffinity(uv_thread_t* tid, char* cpumask, size_t mask_size) + + Gets the specified thread's affinity setting. On Unix, this maps the + cpu_set_t returned by :man:`pthread_getaffinity_np(3)` to bytes in cpumask. + + The mask_size specifies the number of entries (bytes) in cpumask, + and must be greater-than-or-equal-to :c:func:`uv_cpumask_size`. + + .. note:: + Thread affinity getting is not atomic on Windows. Unsupported on macOS. + + .. versionadded:: 2.0.0 + +.. c:function:: int uv_thread_detach(uv_thread_t* tid) + + Detaches the specified thread so it will be cleaned up on exit automatically; + joining it is no longer necessary (or possible). + Uses :man:`pthread_detach(3)` on Unix and CloseHandle() on Windows. + + .. versionadded:: 2.0.0 + .. c:function:: uv_thread_t uv_thread_self(void) .. c:function:: int uv_thread_join(uv_thread_t *tid) .. c:function:: int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2) diff --git a/include/uv.h b/include/uv.h index 7a2a2f59..8e22ef5b 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1640,8 +1640,16 @@ UV_EXTERN void uv_key_set(uv_key_t* key, void* value); typedef void (*uv_thread_cb)(void* arg); UV_EXTERN int uv_thread_create(uv_thread_t* tid, uv_thread_cb entry, void* arg); +UV_EXTERN int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size); +UV_EXTERN int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size); +UV_EXTERN int uv_thread_detach(uv_thread_t* tid); UV_EXTERN uv_thread_t uv_thread_self(void); -UV_EXTERN int uv_thread_join(uv_thread_t *tid); +UV_EXTERN int uv_thread_join(uv_thread_t* tid); UV_EXTERN int uv_thread_equal(const uv_thread_t* t1, const uv_thread_t* t2); /* The presence of these unions force similar struct layout. */ diff --git a/src/unix/thread.c b/src/unix/thread.c index 53ded376..91cbccc3 100644 --- a/src/unix/thread.c +++ b/src/unix/thread.c @@ -41,6 +41,17 @@ #include /* gnu_get_libc_version() */ #endif +#if defined(__linux__) +# include +# define uv__cpu_set_t cpu_set_t +#elif defined(__FreeBSD__) +# include +# include +# include +# define uv__cpu_set_t cpuset_t +#endif + + #undef NANOSEC #define NANOSEC ((uint64_t) 1e9) @@ -199,6 +210,82 @@ int uv_thread_create(uv_thread_t *tid, void (*entry)(void *arg), void *arg) { } +#if defined(__linux__) || defined(__FreeBSD__) + +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + int i; + int r; + uv__cpu_set_t cpuset; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + if (cpumasksize < 0) + return cpumasksize; + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + if (oldmask != NULL) { + r = uv_thread_getaffinity(tid, oldmask, mask_size); + if (r < 0) + return r; + } + + CPU_ZERO(&cpuset); + for (i = 0; i < cpumasksize; i++) + if (cpumask[i]) + CPU_SET(i, &cpuset); + + return UV__ERR(pthread_setaffinity_np(*tid, sizeof(cpuset), &cpuset)); +} + + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + int r; + int i; + uv__cpu_set_t cpuset; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + if (cpumasksize < 0) + return cpumasksize; + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + CPU_ZERO(&cpuset); + r = pthread_getaffinity_np(*tid, sizeof(cpuset), &cpuset); + if (r) + return UV__ERR(r); + for (i = 0; i < cpumasksize; i++) + cpumask[i] = !!CPU_ISSET(i, &cpuset); + + return 0; +} +#else +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + return UV_ENOTSUP; +} + + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + return UV_ENOTSUP; +} +#endif /* defined(__linux__) || defined(UV_BSD_H) */ + +int uv_thread_detach(uv_thread_t* tid) { + return UV__ERR(pthread_detach(*tid)); +} + + uv_thread_t uv_thread_self(void) { return pthread_self(); } diff --git a/src/win/thread.c b/src/win/thread.c index a05fc5c5..d20bfb4e 100644 --- a/src/win/thread.c +++ b/src/win/thread.c @@ -153,6 +153,87 @@ int uv_thread_create(uv_thread_t *tid, void (*entry)(void *arg), void *arg) { } +int uv_thread_setaffinity(uv_thread_t* tid, + char* cpumask, + char* oldmask, + size_t mask_size) { + int i; + HANDLE hproc; + DWORD_PTR procmask; + DWORD_PTR sysmask; + DWORD_PTR threadmask; + DWORD_PTR oldthreadmask; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + assert(cpumasksize > 0); + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + hproc = GetCurrentProcess(); + if (!GetProcessAffinityMask(hproc, &procmask, &sysmask)) + return uv_translate_sys_error(GetLastError()); + + threadmask = 0; + for (i = 0; i < cpumasksize; i++) { + if (cpumask[i]) { + if (procmask & (1 << i)) + threadmask |= 1 << i; + else + return UV_EINVAL; + } + } + + oldthreadmask = SetThreadAffinityMask(*tid, threadmask); + if (oldthreadmask == 0) + return uv_translate_sys_error(GetLastError()); + + if (oldmask != NULL) { + for (i = 0; i < cpumasksize; i++) + oldmask[i] = (oldthreadmask >> i) & 1; + } + + return 0; +} + + +int uv_thread_getaffinity(uv_thread_t* tid, + char* cpumask, + size_t mask_size) { + int i; + HANDLE hproc; + DWORD_PTR procmask; + DWORD_PTR sysmask; + DWORD_PTR threadmask; + int cpumasksize; + + cpumasksize = uv_cpumask_size(); + assert(cpumasksize > 0); + if (mask_size < (size_t)cpumasksize) + return UV_EINVAL; + + hproc = GetCurrentProcess(); + if (!GetProcessAffinityMask(hproc, &procmask, &sysmask)) + return uv_translate_sys_error(GetLastError()); + + threadmask = SetThreadAffinityMask(*tid, procmask); + if (threadmask == 0 || SetThreadAffinityMask(*tid, threadmask) == 0) + return uv_translate_sys_error(GetLastError()); + + for (i = 0; i < cpumasksize; i++) + cpumask[i] = (threadmask >> i) & 1; + + return 0; +} + + +int uv_thread_detach(uv_thread_t* tid) { + CloseHandle(*tid); + *tid = 0; + return 0; +} + + uv_thread_t uv_thread_self(void) { uv_once(&uv__current_thread_init_guard, uv__init_current_thread_key); return (uv_thread_t) uv_key_get(&uv__current_thread_key); diff --git a/test/test-list.h b/test/test-list.h index dbcc622e..ff689216 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -385,6 +385,7 @@ TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_rwlock_trylock) TEST_DECLARE (thread_create) TEST_DECLARE (thread_equal) +TEST_DECLARE (thread_affinity) TEST_DECLARE (dlerror) #if (defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))) && \ !defined(__sun) @@ -957,6 +958,7 @@ TASK_LIST_START TEST_ENTRY (thread_rwlock_trylock) TEST_ENTRY (thread_create) TEST_ENTRY (thread_equal) + TEST_ENTRY (thread_affinity) TEST_ENTRY (dlerror) TEST_ENTRY (ip4_addr) TEST_ENTRY (ip6_addr_link_local) diff --git a/test/test-thread-affinity.c b/test/test-thread-affinity.c new file mode 100644 index 00000000..43783bc4 --- /dev/null +++ b/test/test-thread-affinity.c @@ -0,0 +1,116 @@ +/* Copyright libuv project contributors. All rights reserved. + */ + +#include "uv.h" +#include "task.h" + +#include + +#ifndef NO_CPU_AFFINITY + +static void check_affinity(void* arg) { + int r; + char* cpumask; + int cpumasksize; + uv_thread_t tid; + + cpumask = (char*)arg; + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize > 0); + tid = uv_thread_self(); + r = uv_thread_setaffinity(&tid, cpumask, NULL, cpumasksize); + ASSERT(r == 0); + r = uv_thread_setaffinity(&tid, cpumask + cpumasksize, cpumask, cpumasksize); + ASSERT(r == 0); +} + + +TEST_IMPL(thread_affinity) { + int t1first; + int t1second; + int t2first; + int t2second; + int cpumasksize; + char* cpumask; + int ncpus; + int r; + uv_thread_t threads[3]; + +#ifdef _WIN32 + /* uv_thread_self isn't defined for the main thread on Windows */ + threads[0] = GetCurrentThread(); +#else + threads[0] = uv_thread_self(); +#endif + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize > 0); + + cpumask = calloc(4 * cpumasksize, 1); + ASSERT(cpumask); + + r = uv_thread_getaffinity(&threads[0], cpumask, cpumasksize); + ASSERT(r == 0); + ASSERT(cpumask[0] && "test must be run with cpu 0 affinity"); + ncpus = 0; + while (cpumask[++ncpus]) { } + memset(cpumask, 0, 4 * cpumasksize); + + t1first = cpumasksize * 0; + t1second = cpumasksize * 1; + t2first = cpumasksize * 2; + t2second = cpumasksize * 3; + + cpumask[t1second + 0] = 1; + cpumask[t2first + 0] = 1; + cpumask[t1first + (ncpus >= 2)] = 1; + cpumask[t2second + (ncpus >= 2)] = 1; +#ifdef __linux__ + cpumask[t1second + 2] = 1; + cpumask[t2first + 2] = 1; + cpumask[t1first + 3] = 1; + cpumask[t2second + 3] = 1; +#else + if (ncpus >= 3) { + cpumask[t1second + 2] = 1; + cpumask[t2first + 2] = 1; + } + if (ncpus >= 4) { + cpumask[t1first + 3] = 1; + cpumask[t2second + 3] = 1; + } +#endif + + ASSERT(0 == uv_thread_create(threads + 1, + check_affinity, + &cpumask[t1first])); + ASSERT(0 == uv_thread_create(threads + 2, + check_affinity, + &cpumask[t2first])); + ASSERT(0 == uv_thread_join(threads + 1)); + ASSERT(0 == uv_thread_join(threads + 2)); + + ASSERT(cpumask[t1first + 0] == (ncpus == 1)); + ASSERT(cpumask[t1first + 1] == (ncpus >= 2)); + ASSERT(cpumask[t1first + 2] == 0); + ASSERT(cpumask[t1first + 3] == (ncpus >= 4)); + + ASSERT(cpumask[t2first + 0] == 1); + ASSERT(cpumask[t2first + 1] == 0); + ASSERT(cpumask[t2first + 2] == (ncpus >= 3)); + ASSERT(cpumask[t2first + 3] == 0); + + free(cpumask); + + return 0; +} + +#else + +TEST_IMPL(thread_affinity) { + int cpumasksize; + cpumasksize = uv_cpumask_size(); + ASSERT(cpumasksize == UV_ENOTSUP); + return 0; +} + +#endif diff --git a/test/test.gyp b/test/test.gyp index 6a06542c..e5f49474 100644 --- a/test/test.gyp +++ b/test/test.gyp @@ -126,6 +126,7 @@ 'test-tmpdir.c', 'test-mutexes.c', 'test-thread.c', + 'test-thread-affinity.c', 'test-barrier.c', 'test-buf.c', 'test-condvar.c',