From 9bae606d413327187828155b61babcd52b2d2517 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Tue, 13 Aug 2013 02:02:12 +0200 Subject: [PATCH] darwin: create fsevents thread on demand * Move CF run loop code to fsevents.c. * Create the fsevents thread on demand rather than at startup. * Remove use of ACCESS_ONCE. All accesses to loop->cf_loop are protected by full memory barriers so no reordering can take place. Fixes #872. Conflicts: src/unix/darwin.c --- src/unix/darwin.c | 131 +-------------------------------- src/unix/fsevents.c | 175 ++++++++++++++++++++++++++++++++++++++++++-- src/unix/internal.h | 4 +- 3 files changed, 170 insertions(+), 140 deletions(-) diff --git a/src/unix/darwin.c b/src/unix/darwin.c index 77e662f4..2358e9f6 100644 --- a/src/unix/darwin.c +++ b/src/unix/darwin.c @@ -28,8 +28,6 @@ #include #include -#include - #include #include #include /* _NSGetExecutablePath */ @@ -37,144 +35,19 @@ #include #include /* sysconf */ -/* Forward declarations */ -static void uv__cf_loop_runner(void* arg); -static void uv__cf_loop_cb(void* arg); - -typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t; -struct uv__cf_loop_signal_s { - void* arg; - cf_loop_signal_cb cb; - ngx_queue_t member; -}; - int uv__platform_loop_init(uv_loop_t* loop, int default_loop) { - CFRunLoopSourceContext ctx; - int r; + loop->cf_loop = NULL; if (uv__kqueue_init(loop)) return -1; - loop->cf_loop = NULL; - if ((r = uv_mutex_init(&loop->cf_mutex))) - return r; - if ((r = uv_sem_init(&loop->cf_sem, 0))) - return r; - ngx_queue_init(&loop->cf_signals); - - memset(&ctx, 0, sizeof(ctx)); - ctx.info = loop; - ctx.perform = uv__cf_loop_cb; - loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx); - - if ((r = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop))) - return r; - - /* Synchronize threads */ - uv_sem_wait(&loop->cf_sem); - assert(ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) != NULL); - return 0; } void uv__platform_loop_delete(uv_loop_t* loop) { - ngx_queue_t* item; - uv__cf_loop_signal_t* s; - - assert(loop->cf_loop != NULL); - uv__cf_loop_signal(loop, NULL, NULL); - uv_thread_join(&loop->cf_thread); - - uv_sem_destroy(&loop->cf_sem); - uv_mutex_destroy(&loop->cf_mutex); - - /* Free any remaining data */ - while (!ngx_queue_empty(&loop->cf_signals)) { - item = ngx_queue_head(&loop->cf_signals); - - s = ngx_queue_data(item, uv__cf_loop_signal_t, member); - - ngx_queue_remove(item); - free(s); - } -} - - -static void uv__cf_loop_runner(void* arg) { - uv_loop_t* loop; - - loop = arg; - - /* Get thread's loop */ - ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) = CFRunLoopGetCurrent(); - - CFRunLoopAddSource(loop->cf_loop, - loop->cf_cb, - kCFRunLoopDefaultMode); - - uv_sem_post(&loop->cf_sem); - - CFRunLoopRun(); - - CFRunLoopRemoveSource(loop->cf_loop, - loop->cf_cb, - kCFRunLoopDefaultMode); -} - - -static void uv__cf_loop_cb(void* arg) { - uv_loop_t* loop; - ngx_queue_t* item; - ngx_queue_t split_head; - uv__cf_loop_signal_t* s; - - loop = arg; - - uv_mutex_lock(&loop->cf_mutex); - ngx_queue_init(&split_head); - if (!ngx_queue_empty(&loop->cf_signals)) { - ngx_queue_t* split_pos = ngx_queue_next(&loop->cf_signals); - ngx_queue_split(&loop->cf_signals, split_pos, &split_head); - } - uv_mutex_unlock(&loop->cf_mutex); - - while (!ngx_queue_empty(&split_head)) { - item = ngx_queue_head(&split_head); - - s = ngx_queue_data(item, uv__cf_loop_signal_t, member); - - /* This was a termination signal */ - if (s->cb == NULL) - CFRunLoopStop(loop->cf_loop); - else - s->cb(s->arg); - - ngx_queue_remove(item); - free(s); - } -} - - -void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { - uv__cf_loop_signal_t* item; - - item = malloc(sizeof(*item)); - /* XXX: Fail */ - if (item == NULL) - abort(); - - item->arg = arg; - item->cb = cb; - - uv_mutex_lock(&loop->cf_mutex); - ngx_queue_insert_tail(&loop->cf_signals, &item->member); - uv_mutex_unlock(&loop->cf_mutex); - - assert(loop->cf_loop != NULL); - CFRunLoopSourceSignal(loop->cf_cb); - CFRunLoopWakeUp(loop->cf_loop); + uv__fsevents_loop_delete(loop); } diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index b6d27467..25b10903 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -34,13 +34,27 @@ int uv__fsevents_close(uv_fs_event_t* handle) { return 0; } + +void uv__fsevents_loop_delete(uv_loop_t* loop) { + return 0; +} + #else /* TARGET_OS_IPHONE */ #include #include +#include #include typedef struct uv__fsevents_event_s uv__fsevents_event_t; +typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t; +typedef void (*cf_loop_signal_cb)(void* arg); + +struct uv__cf_loop_signal_s { + cf_loop_signal_cb cb; + ngx_queue_t member; + void* arg; +}; struct uv__fsevents_event_s { int events; @@ -48,6 +62,12 @@ struct uv__fsevents_event_s { char path[1]; }; +/* Forward declarations */ +static void uv__cf_loop_cb(void* arg); +static void uv__cf_loop_runner(void* arg); +static void uv__cf_loop_signal(uv_loop_t* loop, + cf_loop_signal_cb cb, + void* arg); #define UV__FSEVENTS_WALK(handle, block) \ { \ @@ -75,7 +95,7 @@ struct uv__fsevents_event_s { } -void uv__fsevents_cb(uv_async_t* cb, int status) { +static void uv__fsevents_cb(uv_async_t* cb, int status) { uv_fs_event_t* handle; handle = cb->data; @@ -92,12 +112,12 @@ void uv__fsevents_cb(uv_async_t* cb, int status) { } -void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, - void* info, - size_t numEvents, - void* eventPaths, - const FSEventStreamEventFlags eventFlags[], - const FSEventStreamEventId eventIds[]) { +static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, + void* info, + size_t numEvents, + void* eventPaths, + const FSEventStreamEventFlags eventFlags[], + const FSEventStreamEventId eventIds[]) { size_t i; int len; char** paths; @@ -190,7 +210,7 @@ void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef, } -void uv__fsevents_schedule(void* arg) { +static void uv__fsevents_schedule(void* arg) { uv_fs_event_t* handle; handle = arg; @@ -202,6 +222,140 @@ void uv__fsevents_schedule(void* arg) { } +static int uv__fsevents_loop_init(uv_loop_t* loop) { + CFRunLoopSourceContext ctx; + int err; + + if (loop->cf_loop != NULL) + return 0; + + err = uv_mutex_init(&loop->cf_mutex); + if (err) + return err; + + err = uv_sem_init(&loop->cf_sem, 0); + if (err) + goto fail_sem_init; + + ngx_queue_init(&loop->cf_signals); + memset(&ctx, 0, sizeof(ctx)); + ctx.info = loop; + ctx.perform = uv__cf_loop_cb; + loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx); + + err = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop); + if (err) + goto fail_thread_create; + + /* Synchronize threads */ + uv_sem_wait(&loop->cf_sem); + assert(loop->cf_loop != NULL); + return 0; + +fail_thread_create: + uv_sem_destroy(&loop->cf_sem); + +fail_sem_init: + uv_mutex_destroy(&loop->cf_mutex); + return err; +} + + +void uv__fsevents_loop_delete(uv_loop_t* loop) { + uv__cf_loop_signal_t* s; + ngx_queue_t* q; + + if (loop->cf_loop == NULL) + return; + + uv__cf_loop_signal(loop, NULL, NULL); + uv_thread_join(&loop->cf_thread); + uv_sem_destroy(&loop->cf_sem); + uv_mutex_destroy(&loop->cf_mutex); + + /* Free any remaining data */ + while (!ngx_queue_empty(&loop->cf_signals)) { + q = ngx_queue_head(&loop->cf_signals); + s = ngx_queue_data(q, uv__cf_loop_signal_t, member); + ngx_queue_remove(q); + free(s); + } +} + + +static void uv__cf_loop_runner(void* arg) { + uv_loop_t* loop; + + loop = arg; + loop->cf_loop = CFRunLoopGetCurrent(); + + CFRunLoopAddSource(loop->cf_loop, + loop->cf_cb, + kCFRunLoopDefaultMode); + + uv_sem_post(&loop->cf_sem); + + CFRunLoopRun(); + CFRunLoopRemoveSource(loop->cf_loop, + loop->cf_cb, + kCFRunLoopDefaultMode); +} + + +static void uv__cf_loop_cb(void* arg) { + uv_loop_t* loop; + ngx_queue_t* item; + ngx_queue_t split_head; + uv__cf_loop_signal_t* s; + + loop = arg; + + uv_mutex_lock(&loop->cf_mutex); + ngx_queue_init(&split_head); + if (!ngx_queue_empty(&loop->cf_signals)) { + ngx_queue_t* split_pos = ngx_queue_head(&loop->cf_signals); + ngx_queue_split(&loop->cf_signals, split_pos, &split_head); + } + uv_mutex_unlock(&loop->cf_mutex); + + while (!ngx_queue_empty(&split_head)) { + item = ngx_queue_head(&split_head); + + s = ngx_queue_data(item, uv__cf_loop_signal_t, member); + + /* This was a termination signal */ + if (s->cb == NULL) + CFRunLoopStop(loop->cf_loop); + else + s->cb(s->arg); + + ngx_queue_remove(item); + free(s); + } +} + + +void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) { + uv__cf_loop_signal_t* item; + + item = malloc(sizeof(*item)); + /* XXX: Fail */ + if (item == NULL) + abort(); + + item->arg = arg; + item->cb = cb; + + uv_mutex_lock(&loop->cf_mutex); + ngx_queue_insert_tail(&loop->cf_signals, &item->member); + uv_mutex_unlock(&loop->cf_mutex); + + assert(loop->cf_loop != NULL); + CFRunLoopSourceSignal(loop->cf_cb); + CFRunLoopWakeUp(loop->cf_loop); +} + + int uv__fsevents_init(uv_fs_event_t* handle) { FSEventStreamContext ctx; FSEventStreamRef ref; @@ -209,6 +363,11 @@ int uv__fsevents_init(uv_fs_event_t* handle) { CFArrayRef paths; CFAbsoluteTime latency; FSEventStreamCreateFlags flags; + int err; + + err = uv__fsevents_loop_init(handle->loop); + if (err) + return err; /* Initialize context */ ctx.version = 0; diff --git a/src/unix/internal.h b/src/unix/internal.h index 61cb1ec1..2bb4dc1b 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -216,12 +216,10 @@ int uv__make_socketpair(int fds[2], int flags); int uv__make_pipe(int fds[2], int flags); #if defined(__APPLE__) -typedef void (*cf_loop_signal_cb)(void*); - -void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg); int uv__fsevents_init(uv_fs_event_t* handle); int uv__fsevents_close(uv_fs_event_t* handle); +void uv__fsevents_loop_delete(uv_loop_t* loop); /* OSX < 10.7 has no file events, polyfill them */ #ifndef MAC_OS_X_VERSION_10_7