darwin: create fsevents thread on demand
* Move CF run loop code to fsevents.c. * Create the fsevents thread on demand rather than at startup. * Remove use of ACCESS_ONCE. All accesses to loop->cf_loop are protected by full memory barriers so no reordering can take place. Fixes #872. Conflicts: src/unix/darwin.c
This commit is contained in:
parent
24a42a406a
commit
9bae606d41
@ -28,8 +28,6 @@
|
||||
#include <ifaddrs.h>
|
||||
#include <net/if.h>
|
||||
|
||||
#include <CoreFoundation/CFRunLoop.h>
|
||||
|
||||
#include <mach/mach.h>
|
||||
#include <mach/mach_time.h>
|
||||
#include <mach-o/dyld.h> /* _NSGetExecutablePath */
|
||||
@ -37,144 +35,19 @@
|
||||
#include <sys/sysctl.h>
|
||||
#include <unistd.h> /* sysconf */
|
||||
|
||||
/* Forward declarations */
|
||||
static void uv__cf_loop_runner(void* arg);
|
||||
static void uv__cf_loop_cb(void* arg);
|
||||
|
||||
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
|
||||
struct uv__cf_loop_signal_s {
|
||||
void* arg;
|
||||
cf_loop_signal_cb cb;
|
||||
ngx_queue_t member;
|
||||
};
|
||||
|
||||
|
||||
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
|
||||
CFRunLoopSourceContext ctx;
|
||||
int r;
|
||||
loop->cf_loop = NULL;
|
||||
|
||||
if (uv__kqueue_init(loop))
|
||||
return -1;
|
||||
|
||||
loop->cf_loop = NULL;
|
||||
if ((r = uv_mutex_init(&loop->cf_mutex)))
|
||||
return r;
|
||||
if ((r = uv_sem_init(&loop->cf_sem, 0)))
|
||||
return r;
|
||||
ngx_queue_init(&loop->cf_signals);
|
||||
|
||||
memset(&ctx, 0, sizeof(ctx));
|
||||
ctx.info = loop;
|
||||
ctx.perform = uv__cf_loop_cb;
|
||||
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);
|
||||
|
||||
if ((r = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop)))
|
||||
return r;
|
||||
|
||||
/* Synchronize threads */
|
||||
uv_sem_wait(&loop->cf_sem);
|
||||
assert(ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) != NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__platform_loop_delete(uv_loop_t* loop) {
|
||||
ngx_queue_t* item;
|
||||
uv__cf_loop_signal_t* s;
|
||||
|
||||
assert(loop->cf_loop != NULL);
|
||||
uv__cf_loop_signal(loop, NULL, NULL);
|
||||
uv_thread_join(&loop->cf_thread);
|
||||
|
||||
uv_sem_destroy(&loop->cf_sem);
|
||||
uv_mutex_destroy(&loop->cf_mutex);
|
||||
|
||||
/* Free any remaining data */
|
||||
while (!ngx_queue_empty(&loop->cf_signals)) {
|
||||
item = ngx_queue_head(&loop->cf_signals);
|
||||
|
||||
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
||||
|
||||
ngx_queue_remove(item);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__cf_loop_runner(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
|
||||
loop = arg;
|
||||
|
||||
/* Get thread's loop */
|
||||
ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) = CFRunLoopGetCurrent();
|
||||
|
||||
CFRunLoopAddSource(loop->cf_loop,
|
||||
loop->cf_cb,
|
||||
kCFRunLoopDefaultMode);
|
||||
|
||||
uv_sem_post(&loop->cf_sem);
|
||||
|
||||
CFRunLoopRun();
|
||||
|
||||
CFRunLoopRemoveSource(loop->cf_loop,
|
||||
loop->cf_cb,
|
||||
kCFRunLoopDefaultMode);
|
||||
}
|
||||
|
||||
|
||||
static void uv__cf_loop_cb(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
ngx_queue_t* item;
|
||||
ngx_queue_t split_head;
|
||||
uv__cf_loop_signal_t* s;
|
||||
|
||||
loop = arg;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
ngx_queue_init(&split_head);
|
||||
if (!ngx_queue_empty(&loop->cf_signals)) {
|
||||
ngx_queue_t* split_pos = ngx_queue_next(&loop->cf_signals);
|
||||
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
|
||||
}
|
||||
uv_mutex_unlock(&loop->cf_mutex);
|
||||
|
||||
while (!ngx_queue_empty(&split_head)) {
|
||||
item = ngx_queue_head(&split_head);
|
||||
|
||||
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
||||
|
||||
/* This was a termination signal */
|
||||
if (s->cb == NULL)
|
||||
CFRunLoopStop(loop->cf_loop);
|
||||
else
|
||||
s->cb(s->arg);
|
||||
|
||||
ngx_queue_remove(item);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
|
||||
uv__cf_loop_signal_t* item;
|
||||
|
||||
item = malloc(sizeof(*item));
|
||||
/* XXX: Fail */
|
||||
if (item == NULL)
|
||||
abort();
|
||||
|
||||
item->arg = arg;
|
||||
item->cb = cb;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
|
||||
uv_mutex_unlock(&loop->cf_mutex);
|
||||
|
||||
assert(loop->cf_loop != NULL);
|
||||
CFRunLoopSourceSignal(loop->cf_cb);
|
||||
CFRunLoopWakeUp(loop->cf_loop);
|
||||
uv__fsevents_loop_delete(loop);
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -34,13 +34,27 @@ int uv__fsevents_close(uv_fs_event_t* handle) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
#else /* TARGET_OS_IPHONE */
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdlib.h>
|
||||
#include <CoreFoundation/CFRunLoop.h>
|
||||
#include <CoreServices/CoreServices.h>
|
||||
|
||||
typedef struct uv__fsevents_event_s uv__fsevents_event_t;
|
||||
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
|
||||
typedef void (*cf_loop_signal_cb)(void* arg);
|
||||
|
||||
struct uv__cf_loop_signal_s {
|
||||
cf_loop_signal_cb cb;
|
||||
ngx_queue_t member;
|
||||
void* arg;
|
||||
};
|
||||
|
||||
struct uv__fsevents_event_s {
|
||||
int events;
|
||||
@ -48,6 +62,12 @@ struct uv__fsevents_event_s {
|
||||
char path[1];
|
||||
};
|
||||
|
||||
/* Forward declarations */
|
||||
static void uv__cf_loop_cb(void* arg);
|
||||
static void uv__cf_loop_runner(void* arg);
|
||||
static void uv__cf_loop_signal(uv_loop_t* loop,
|
||||
cf_loop_signal_cb cb,
|
||||
void* arg);
|
||||
|
||||
#define UV__FSEVENTS_WALK(handle, block) \
|
||||
{ \
|
||||
@ -75,7 +95,7 @@ struct uv__fsevents_event_s {
|
||||
}
|
||||
|
||||
|
||||
void uv__fsevents_cb(uv_async_t* cb, int status) {
|
||||
static void uv__fsevents_cb(uv_async_t* cb, int status) {
|
||||
uv_fs_event_t* handle;
|
||||
|
||||
handle = cb->data;
|
||||
@ -92,12 +112,12 @@ void uv__fsevents_cb(uv_async_t* cb, int status) {
|
||||
}
|
||||
|
||||
|
||||
void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
||||
void* info,
|
||||
size_t numEvents,
|
||||
void* eventPaths,
|
||||
const FSEventStreamEventFlags eventFlags[],
|
||||
const FSEventStreamEventId eventIds[]) {
|
||||
static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
||||
void* info,
|
||||
size_t numEvents,
|
||||
void* eventPaths,
|
||||
const FSEventStreamEventFlags eventFlags[],
|
||||
const FSEventStreamEventId eventIds[]) {
|
||||
size_t i;
|
||||
int len;
|
||||
char** paths;
|
||||
@ -190,7 +210,7 @@ void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
||||
}
|
||||
|
||||
|
||||
void uv__fsevents_schedule(void* arg) {
|
||||
static void uv__fsevents_schedule(void* arg) {
|
||||
uv_fs_event_t* handle;
|
||||
|
||||
handle = arg;
|
||||
@ -202,6 +222,140 @@ void uv__fsevents_schedule(void* arg) {
|
||||
}
|
||||
|
||||
|
||||
static int uv__fsevents_loop_init(uv_loop_t* loop) {
|
||||
CFRunLoopSourceContext ctx;
|
||||
int err;
|
||||
|
||||
if (loop->cf_loop != NULL)
|
||||
return 0;
|
||||
|
||||
err = uv_mutex_init(&loop->cf_mutex);
|
||||
if (err)
|
||||
return err;
|
||||
|
||||
err = uv_sem_init(&loop->cf_sem, 0);
|
||||
if (err)
|
||||
goto fail_sem_init;
|
||||
|
||||
ngx_queue_init(&loop->cf_signals);
|
||||
memset(&ctx, 0, sizeof(ctx));
|
||||
ctx.info = loop;
|
||||
ctx.perform = uv__cf_loop_cb;
|
||||
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);
|
||||
|
||||
err = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop);
|
||||
if (err)
|
||||
goto fail_thread_create;
|
||||
|
||||
/* Synchronize threads */
|
||||
uv_sem_wait(&loop->cf_sem);
|
||||
assert(loop->cf_loop != NULL);
|
||||
return 0;
|
||||
|
||||
fail_thread_create:
|
||||
uv_sem_destroy(&loop->cf_sem);
|
||||
|
||||
fail_sem_init:
|
||||
uv_mutex_destroy(&loop->cf_mutex);
|
||||
return err;
|
||||
}
|
||||
|
||||
|
||||
void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||
uv__cf_loop_signal_t* s;
|
||||
ngx_queue_t* q;
|
||||
|
||||
if (loop->cf_loop == NULL)
|
||||
return;
|
||||
|
||||
uv__cf_loop_signal(loop, NULL, NULL);
|
||||
uv_thread_join(&loop->cf_thread);
|
||||
uv_sem_destroy(&loop->cf_sem);
|
||||
uv_mutex_destroy(&loop->cf_mutex);
|
||||
|
||||
/* Free any remaining data */
|
||||
while (!ngx_queue_empty(&loop->cf_signals)) {
|
||||
q = ngx_queue_head(&loop->cf_signals);
|
||||
s = ngx_queue_data(q, uv__cf_loop_signal_t, member);
|
||||
ngx_queue_remove(q);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__cf_loop_runner(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
|
||||
loop = arg;
|
||||
loop->cf_loop = CFRunLoopGetCurrent();
|
||||
|
||||
CFRunLoopAddSource(loop->cf_loop,
|
||||
loop->cf_cb,
|
||||
kCFRunLoopDefaultMode);
|
||||
|
||||
uv_sem_post(&loop->cf_sem);
|
||||
|
||||
CFRunLoopRun();
|
||||
CFRunLoopRemoveSource(loop->cf_loop,
|
||||
loop->cf_cb,
|
||||
kCFRunLoopDefaultMode);
|
||||
}
|
||||
|
||||
|
||||
static void uv__cf_loop_cb(void* arg) {
|
||||
uv_loop_t* loop;
|
||||
ngx_queue_t* item;
|
||||
ngx_queue_t split_head;
|
||||
uv__cf_loop_signal_t* s;
|
||||
|
||||
loop = arg;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
ngx_queue_init(&split_head);
|
||||
if (!ngx_queue_empty(&loop->cf_signals)) {
|
||||
ngx_queue_t* split_pos = ngx_queue_head(&loop->cf_signals);
|
||||
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
|
||||
}
|
||||
uv_mutex_unlock(&loop->cf_mutex);
|
||||
|
||||
while (!ngx_queue_empty(&split_head)) {
|
||||
item = ngx_queue_head(&split_head);
|
||||
|
||||
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
||||
|
||||
/* This was a termination signal */
|
||||
if (s->cb == NULL)
|
||||
CFRunLoopStop(loop->cf_loop);
|
||||
else
|
||||
s->cb(s->arg);
|
||||
|
||||
ngx_queue_remove(item);
|
||||
free(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
|
||||
uv__cf_loop_signal_t* item;
|
||||
|
||||
item = malloc(sizeof(*item));
|
||||
/* XXX: Fail */
|
||||
if (item == NULL)
|
||||
abort();
|
||||
|
||||
item->arg = arg;
|
||||
item->cb = cb;
|
||||
|
||||
uv_mutex_lock(&loop->cf_mutex);
|
||||
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
|
||||
uv_mutex_unlock(&loop->cf_mutex);
|
||||
|
||||
assert(loop->cf_loop != NULL);
|
||||
CFRunLoopSourceSignal(loop->cf_cb);
|
||||
CFRunLoopWakeUp(loop->cf_loop);
|
||||
}
|
||||
|
||||
|
||||
int uv__fsevents_init(uv_fs_event_t* handle) {
|
||||
FSEventStreamContext ctx;
|
||||
FSEventStreamRef ref;
|
||||
@ -209,6 +363,11 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
|
||||
CFArrayRef paths;
|
||||
CFAbsoluteTime latency;
|
||||
FSEventStreamCreateFlags flags;
|
||||
int err;
|
||||
|
||||
err = uv__fsevents_loop_init(handle->loop);
|
||||
if (err)
|
||||
return err;
|
||||
|
||||
/* Initialize context */
|
||||
ctx.version = 0;
|
||||
|
||||
@ -216,12 +216,10 @@ int uv__make_socketpair(int fds[2], int flags);
|
||||
int uv__make_pipe(int fds[2], int flags);
|
||||
|
||||
#if defined(__APPLE__)
|
||||
typedef void (*cf_loop_signal_cb)(void*);
|
||||
|
||||
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg);
|
||||
|
||||
int uv__fsevents_init(uv_fs_event_t* handle);
|
||||
int uv__fsevents_close(uv_fs_event_t* handle);
|
||||
void uv__fsevents_loop_delete(uv_loop_t* loop);
|
||||
|
||||
/* OSX < 10.7 has no file events, polyfill them */
|
||||
#ifndef MAC_OS_X_VERSION_10_7
|
||||
|
||||
Loading…
Reference in New Issue
Block a user