From acea3028c53c6e8c5eeeb09f0fba9e4c031aa0ee Mon Sep 17 00:00:00 2001 From: Hiroaki Nakamura Date: Tue, 9 Oct 2012 21:44:20 +0900 Subject: [PATCH] unix, windows: add thread barrier support --- include/uv-private/uv-unix.h | 17 +++++++ include/uv-private/uv-win.h | 8 +++ include/uv.h | 4 ++ src/unix/thread.c | 80 +++++++++++++++++++++++++++++ src/win/thread.c | 54 ++++++++++++++++++++ test/test-barrier.c | 98 ++++++++++++++++++++++++++++++++++++ test/test-list.h | 6 +++ uv.gyp | 1 + 8 files changed, 268 insertions(+) create mode 100644 test/test-barrier.c diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index f50142c0..42e39fcc 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -108,6 +108,23 @@ typedef pthread_rwlock_t uv_rwlock_t; typedef UV_PLATFORM_SEM_T uv_sem_t; typedef pthread_cond_t uv_cond_t; + +#if defined(__APPLE__) && defined(__MACH__) + +typedef struct { + unsigned int n; + unsigned int count; + uv_mutex_t mutex; + uv_sem_t turnstile1; + uv_sem_t turnstile2; +} uv_barrier_t; + +#else /* defined(__APPLE__) && defined(__MACH__) */ + +typedef pthread_barrier_t uv_barrier_t; + +#endif /* defined(__APPLE__) && defined(__MACH__) */ + /* Platform-specific definitions for uv_spawn support. */ typedef gid_t uv_gid_t; typedef uid_t uv_uid_t; diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 78dbb96b..10bd17ea 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -240,6 +240,14 @@ typedef union { } fallback_; } uv_rwlock_t; +typedef struct { + unsigned int n; + unsigned int count; + uv_mutex_t mutex; + uv_sem_t turnstile1; + uv_sem_t turnstile2; +} uv_barrier_t; + #define UV_ONCE_INIT { 0, NULL } typedef struct uv_once_s { diff --git a/include/uv.h b/include/uv.h index 46eee26e..99b69884 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1821,6 +1821,10 @@ UV_EXTERN void uv_cond_wait(uv_cond_t* cond, uv_mutex_t* mutex); UV_EXTERN int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, uint64_t timeout); +UV_EXTERN int uv_barrier_init(uv_barrier_t* barrier, unsigned int count); +UV_EXTERN void uv_barrier_destroy(uv_barrier_t* barrier); +UV_EXTERN void uv_barrier_wait(uv_barrier_t* barrier); + /* Runs a function once and only once. Concurrent calls to uv_once() with the * same guard will block all callers except one (it's unspecified which one). * The guard should be initialized statically with the UV_ONCE_INIT macro. diff --git a/src/unix/thread.c b/src/unix/thread.c index f50961c9..0583cb47 100644 --- a/src/unix/thread.c +++ b/src/unix/thread.c @@ -370,3 +370,83 @@ int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, uint64_t timeout) { } #endif /* defined(__APPLE__) && defined(__MACH__) */ + + +#if defined(__APPLE__) && defined(__MACH__) + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + barrier->n = count; + barrier->count = 0; + + if (uv_mutex_init(&barrier->mutex)) + return -1; + + if (uv_sem_init(&barrier->turnstile1, 0)) + goto error2; + + if (uv_sem_init(&barrier->turnstile2, 1)) + goto error; + + return 0; + +error: + uv_sem_destroy(&barrier->turnstile1); +error2: + uv_mutex_destroy(&barrier->mutex); + return -1; + +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + uv_sem_destroy(&barrier->turnstile2); + uv_sem_destroy(&barrier->turnstile1); + uv_mutex_destroy(&barrier->mutex); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + uv_mutex_lock(&barrier->mutex); + if (++barrier->count == barrier->n) { + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile1); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile1); + + uv_mutex_lock(&barrier->mutex); + if (--barrier->count == 0) { + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile2); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile2); +} + +#else /* !(defined(__APPLE__) && defined(__MACH__)) */ + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + if (pthread_barrier_init(barrier, NULL, count)) + return -1; + else + return 0; +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + if (pthread_barrier_destroy(barrier)) + abort(); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + int r = pthread_barrier_wait(barrier); + if (r && r != PTHREAD_BARRIER_SERIAL_THREAD) + abort(); +} + +#endif /* defined(__APPLE__) && defined(__MACH__) */ diff --git a/src/win/thread.c b/src/win/thread.c index da6138ad..e774fd9c 100644 --- a/src/win/thread.c +++ b/src/win/thread.c @@ -610,3 +610,57 @@ int uv_cond_timedwait(uv_cond_t* cond, uv_mutex_t* mutex, else return uv_cond_fallback_timedwait(cond, mutex, timeout); } + + +int uv_barrier_init(uv_barrier_t* barrier, unsigned int count) { + barrier->n = count; + barrier->count = 0; + + if (uv_mutex_init(&barrier->mutex)) + return -1; + + if (uv_sem_init(&barrier->turnstile1, 0)) + goto error2; + + if (uv_sem_init(&barrier->turnstile2, 1)) + goto error; + + return 0; + +error: + uv_sem_destroy(&barrier->turnstile1); +error2: + uv_mutex_destroy(&barrier->mutex); + return -1; + +} + + +void uv_barrier_destroy(uv_barrier_t* barrier) { + uv_sem_destroy(&barrier->turnstile2); + uv_sem_destroy(&barrier->turnstile1); + uv_mutex_destroy(&barrier->mutex); +} + + +void uv_barrier_wait(uv_barrier_t* barrier) { + uv_mutex_lock(&barrier->mutex); + if (++barrier->count == barrier->n) { + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile1); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile1); + + uv_mutex_lock(&barrier->mutex); + if (--barrier->count == 0) { + uv_sem_wait(&barrier->turnstile1); + uv_sem_post(&barrier->turnstile2); + } + uv_mutex_unlock(&barrier->mutex); + + uv_sem_wait(&barrier->turnstile2); + uv_sem_post(&barrier->turnstile2); +} diff --git a/test/test-barrier.c b/test/test-barrier.c new file mode 100644 index 00000000..97df704c --- /dev/null +++ b/test/test-barrier.c @@ -0,0 +1,98 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include + +typedef struct { + uv_barrier_t barrier; + int delay; + volatile int posted; +} worker_config; + + +static void worker(void* arg) { + worker_config* c = arg; + + if (c->delay) + uv_sleep(c->delay); + + uv_barrier_wait(&c->barrier); +} + + +TEST_IMPL(barrier_1) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_sleep(100); + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} + + +TEST_IMPL(barrier_2) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + wc.delay = 100; + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} + + +TEST_IMPL(barrier_3) { + uv_thread_t thread; + worker_config wc; + + memset(&wc, 0, sizeof(wc)); + + ASSERT(0 == uv_barrier_init(&wc.barrier, 2)); + ASSERT(0 == uv_thread_create(&thread, worker, &wc)); + + uv_barrier_wait(&wc.barrier); + + ASSERT(0 == uv_thread_join(&thread)); + uv_barrier_destroy(&wc.barrier); + + return 0; +} diff --git a/test/test-list.h b/test/test-list.h index 20c12e3e..28a13b2e 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -22,6 +22,9 @@ TEST_DECLARE (platform_output) TEST_DECLARE (callback_order) TEST_DECLARE (run_once) +TEST_DECLARE (barrier_1) +TEST_DECLARE (barrier_2) +TEST_DECLARE (barrier_3) TEST_DECLARE (condvar_1) TEST_DECLARE (condvar_2) TEST_DECLARE (condvar_3) @@ -214,6 +217,9 @@ TASK_LIST_START TEST_ENTRY (callback_order) #endif TEST_ENTRY (run_once) + TEST_ENTRY (barrier_1) + TEST_ENTRY (barrier_2) + TEST_ENTRY (barrier_3) TEST_ENTRY (condvar_1) TEST_ENTRY (condvar_2) TEST_ENTRY (condvar_3) diff --git a/uv.gyp b/uv.gyp index fadbd29e..9962194d 100644 --- a/uv.gyp +++ b/uv.gyp @@ -305,6 +305,7 @@ 'test/test-mutexes.c', 'test/test-signal.c', 'test/test-thread.c', + 'test/test-barrier.c', 'test/test-condvar.c', 'test/test-condvar-consumer-producer.c', 'test/test-timer-again.c',