darwin: create fsevents thread on demand
* Move CF run loop code to fsevents.c. * Create the fsevents thread on demand rather than at startup. * Remove use of ACCESS_ONCE. All accesses to loop->cf_loop are protected by full memory barriers so no reordering can take place. Fixes #872. Conflicts: src/unix/darwin.c
This commit is contained in:
parent
24a42a406a
commit
9bae606d41
@ -28,8 +28,6 @@
|
|||||||
#include <ifaddrs.h>
|
#include <ifaddrs.h>
|
||||||
#include <net/if.h>
|
#include <net/if.h>
|
||||||
|
|
||||||
#include <CoreFoundation/CFRunLoop.h>
|
|
||||||
|
|
||||||
#include <mach/mach.h>
|
#include <mach/mach.h>
|
||||||
#include <mach/mach_time.h>
|
#include <mach/mach_time.h>
|
||||||
#include <mach-o/dyld.h> /* _NSGetExecutablePath */
|
#include <mach-o/dyld.h> /* _NSGetExecutablePath */
|
||||||
@ -37,144 +35,19 @@
|
|||||||
#include <sys/sysctl.h>
|
#include <sys/sysctl.h>
|
||||||
#include <unistd.h> /* sysconf */
|
#include <unistd.h> /* sysconf */
|
||||||
|
|
||||||
/* Forward declarations */
|
|
||||||
static void uv__cf_loop_runner(void* arg);
|
|
||||||
static void uv__cf_loop_cb(void* arg);
|
|
||||||
|
|
||||||
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
|
|
||||||
struct uv__cf_loop_signal_s {
|
|
||||||
void* arg;
|
|
||||||
cf_loop_signal_cb cb;
|
|
||||||
ngx_queue_t member;
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
|
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
|
||||||
CFRunLoopSourceContext ctx;
|
loop->cf_loop = NULL;
|
||||||
int r;
|
|
||||||
|
|
||||||
if (uv__kqueue_init(loop))
|
if (uv__kqueue_init(loop))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
loop->cf_loop = NULL;
|
|
||||||
if ((r = uv_mutex_init(&loop->cf_mutex)))
|
|
||||||
return r;
|
|
||||||
if ((r = uv_sem_init(&loop->cf_sem, 0)))
|
|
||||||
return r;
|
|
||||||
ngx_queue_init(&loop->cf_signals);
|
|
||||||
|
|
||||||
memset(&ctx, 0, sizeof(ctx));
|
|
||||||
ctx.info = loop;
|
|
||||||
ctx.perform = uv__cf_loop_cb;
|
|
||||||
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);
|
|
||||||
|
|
||||||
if ((r = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop)))
|
|
||||||
return r;
|
|
||||||
|
|
||||||
/* Synchronize threads */
|
|
||||||
uv_sem_wait(&loop->cf_sem);
|
|
||||||
assert(ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) != NULL);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void uv__platform_loop_delete(uv_loop_t* loop) {
|
void uv__platform_loop_delete(uv_loop_t* loop) {
|
||||||
ngx_queue_t* item;
|
uv__fsevents_loop_delete(loop);
|
||||||
uv__cf_loop_signal_t* s;
|
|
||||||
|
|
||||||
assert(loop->cf_loop != NULL);
|
|
||||||
uv__cf_loop_signal(loop, NULL, NULL);
|
|
||||||
uv_thread_join(&loop->cf_thread);
|
|
||||||
|
|
||||||
uv_sem_destroy(&loop->cf_sem);
|
|
||||||
uv_mutex_destroy(&loop->cf_mutex);
|
|
||||||
|
|
||||||
/* Free any remaining data */
|
|
||||||
while (!ngx_queue_empty(&loop->cf_signals)) {
|
|
||||||
item = ngx_queue_head(&loop->cf_signals);
|
|
||||||
|
|
||||||
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
|
||||||
|
|
||||||
ngx_queue_remove(item);
|
|
||||||
free(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void uv__cf_loop_runner(void* arg) {
|
|
||||||
uv_loop_t* loop;
|
|
||||||
|
|
||||||
loop = arg;
|
|
||||||
|
|
||||||
/* Get thread's loop */
|
|
||||||
ACCESS_ONCE(CFRunLoopRef, loop->cf_loop) = CFRunLoopGetCurrent();
|
|
||||||
|
|
||||||
CFRunLoopAddSource(loop->cf_loop,
|
|
||||||
loop->cf_cb,
|
|
||||||
kCFRunLoopDefaultMode);
|
|
||||||
|
|
||||||
uv_sem_post(&loop->cf_sem);
|
|
||||||
|
|
||||||
CFRunLoopRun();
|
|
||||||
|
|
||||||
CFRunLoopRemoveSource(loop->cf_loop,
|
|
||||||
loop->cf_cb,
|
|
||||||
kCFRunLoopDefaultMode);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void uv__cf_loop_cb(void* arg) {
|
|
||||||
uv_loop_t* loop;
|
|
||||||
ngx_queue_t* item;
|
|
||||||
ngx_queue_t split_head;
|
|
||||||
uv__cf_loop_signal_t* s;
|
|
||||||
|
|
||||||
loop = arg;
|
|
||||||
|
|
||||||
uv_mutex_lock(&loop->cf_mutex);
|
|
||||||
ngx_queue_init(&split_head);
|
|
||||||
if (!ngx_queue_empty(&loop->cf_signals)) {
|
|
||||||
ngx_queue_t* split_pos = ngx_queue_next(&loop->cf_signals);
|
|
||||||
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
|
|
||||||
}
|
|
||||||
uv_mutex_unlock(&loop->cf_mutex);
|
|
||||||
|
|
||||||
while (!ngx_queue_empty(&split_head)) {
|
|
||||||
item = ngx_queue_head(&split_head);
|
|
||||||
|
|
||||||
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
|
||||||
|
|
||||||
/* This was a termination signal */
|
|
||||||
if (s->cb == NULL)
|
|
||||||
CFRunLoopStop(loop->cf_loop);
|
|
||||||
else
|
|
||||||
s->cb(s->arg);
|
|
||||||
|
|
||||||
ngx_queue_remove(item);
|
|
||||||
free(s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
|
|
||||||
uv__cf_loop_signal_t* item;
|
|
||||||
|
|
||||||
item = malloc(sizeof(*item));
|
|
||||||
/* XXX: Fail */
|
|
||||||
if (item == NULL)
|
|
||||||
abort();
|
|
||||||
|
|
||||||
item->arg = arg;
|
|
||||||
item->cb = cb;
|
|
||||||
|
|
||||||
uv_mutex_lock(&loop->cf_mutex);
|
|
||||||
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
|
|
||||||
uv_mutex_unlock(&loop->cf_mutex);
|
|
||||||
|
|
||||||
assert(loop->cf_loop != NULL);
|
|
||||||
CFRunLoopSourceSignal(loop->cf_cb);
|
|
||||||
CFRunLoopWakeUp(loop->cf_loop);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -34,13 +34,27 @@ int uv__fsevents_close(uv_fs_event_t* handle) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
#else /* TARGET_OS_IPHONE */
|
#else /* TARGET_OS_IPHONE */
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <CoreFoundation/CFRunLoop.h>
|
||||||
#include <CoreServices/CoreServices.h>
|
#include <CoreServices/CoreServices.h>
|
||||||
|
|
||||||
typedef struct uv__fsevents_event_s uv__fsevents_event_t;
|
typedef struct uv__fsevents_event_s uv__fsevents_event_t;
|
||||||
|
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
|
||||||
|
typedef void (*cf_loop_signal_cb)(void* arg);
|
||||||
|
|
||||||
|
struct uv__cf_loop_signal_s {
|
||||||
|
cf_loop_signal_cb cb;
|
||||||
|
ngx_queue_t member;
|
||||||
|
void* arg;
|
||||||
|
};
|
||||||
|
|
||||||
struct uv__fsevents_event_s {
|
struct uv__fsevents_event_s {
|
||||||
int events;
|
int events;
|
||||||
@ -48,6 +62,12 @@ struct uv__fsevents_event_s {
|
|||||||
char path[1];
|
char path[1];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* Forward declarations */
|
||||||
|
static void uv__cf_loop_cb(void* arg);
|
||||||
|
static void uv__cf_loop_runner(void* arg);
|
||||||
|
static void uv__cf_loop_signal(uv_loop_t* loop,
|
||||||
|
cf_loop_signal_cb cb,
|
||||||
|
void* arg);
|
||||||
|
|
||||||
#define UV__FSEVENTS_WALK(handle, block) \
|
#define UV__FSEVENTS_WALK(handle, block) \
|
||||||
{ \
|
{ \
|
||||||
@ -75,7 +95,7 @@ struct uv__fsevents_event_s {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void uv__fsevents_cb(uv_async_t* cb, int status) {
|
static void uv__fsevents_cb(uv_async_t* cb, int status) {
|
||||||
uv_fs_event_t* handle;
|
uv_fs_event_t* handle;
|
||||||
|
|
||||||
handle = cb->data;
|
handle = cb->data;
|
||||||
@ -92,7 +112,7 @@ void uv__fsevents_cb(uv_async_t* cb, int status) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
||||||
void* info,
|
void* info,
|
||||||
size_t numEvents,
|
size_t numEvents,
|
||||||
void* eventPaths,
|
void* eventPaths,
|
||||||
@ -190,7 +210,7 @@ void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void uv__fsevents_schedule(void* arg) {
|
static void uv__fsevents_schedule(void* arg) {
|
||||||
uv_fs_event_t* handle;
|
uv_fs_event_t* handle;
|
||||||
|
|
||||||
handle = arg;
|
handle = arg;
|
||||||
@ -202,6 +222,140 @@ void uv__fsevents_schedule(void* arg) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int uv__fsevents_loop_init(uv_loop_t* loop) {
|
||||||
|
CFRunLoopSourceContext ctx;
|
||||||
|
int err;
|
||||||
|
|
||||||
|
if (loop->cf_loop != NULL)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
err = uv_mutex_init(&loop->cf_mutex);
|
||||||
|
if (err)
|
||||||
|
return err;
|
||||||
|
|
||||||
|
err = uv_sem_init(&loop->cf_sem, 0);
|
||||||
|
if (err)
|
||||||
|
goto fail_sem_init;
|
||||||
|
|
||||||
|
ngx_queue_init(&loop->cf_signals);
|
||||||
|
memset(&ctx, 0, sizeof(ctx));
|
||||||
|
ctx.info = loop;
|
||||||
|
ctx.perform = uv__cf_loop_cb;
|
||||||
|
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);
|
||||||
|
|
||||||
|
err = uv_thread_create(&loop->cf_thread, uv__cf_loop_runner, loop);
|
||||||
|
if (err)
|
||||||
|
goto fail_thread_create;
|
||||||
|
|
||||||
|
/* Synchronize threads */
|
||||||
|
uv_sem_wait(&loop->cf_sem);
|
||||||
|
assert(loop->cf_loop != NULL);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
fail_thread_create:
|
||||||
|
uv_sem_destroy(&loop->cf_sem);
|
||||||
|
|
||||||
|
fail_sem_init:
|
||||||
|
uv_mutex_destroy(&loop->cf_mutex);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__fsevents_loop_delete(uv_loop_t* loop) {
|
||||||
|
uv__cf_loop_signal_t* s;
|
||||||
|
ngx_queue_t* q;
|
||||||
|
|
||||||
|
if (loop->cf_loop == NULL)
|
||||||
|
return;
|
||||||
|
|
||||||
|
uv__cf_loop_signal(loop, NULL, NULL);
|
||||||
|
uv_thread_join(&loop->cf_thread);
|
||||||
|
uv_sem_destroy(&loop->cf_sem);
|
||||||
|
uv_mutex_destroy(&loop->cf_mutex);
|
||||||
|
|
||||||
|
/* Free any remaining data */
|
||||||
|
while (!ngx_queue_empty(&loop->cf_signals)) {
|
||||||
|
q = ngx_queue_head(&loop->cf_signals);
|
||||||
|
s = ngx_queue_data(q, uv__cf_loop_signal_t, member);
|
||||||
|
ngx_queue_remove(q);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void uv__cf_loop_runner(void* arg) {
|
||||||
|
uv_loop_t* loop;
|
||||||
|
|
||||||
|
loop = arg;
|
||||||
|
loop->cf_loop = CFRunLoopGetCurrent();
|
||||||
|
|
||||||
|
CFRunLoopAddSource(loop->cf_loop,
|
||||||
|
loop->cf_cb,
|
||||||
|
kCFRunLoopDefaultMode);
|
||||||
|
|
||||||
|
uv_sem_post(&loop->cf_sem);
|
||||||
|
|
||||||
|
CFRunLoopRun();
|
||||||
|
CFRunLoopRemoveSource(loop->cf_loop,
|
||||||
|
loop->cf_cb,
|
||||||
|
kCFRunLoopDefaultMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void uv__cf_loop_cb(void* arg) {
|
||||||
|
uv_loop_t* loop;
|
||||||
|
ngx_queue_t* item;
|
||||||
|
ngx_queue_t split_head;
|
||||||
|
uv__cf_loop_signal_t* s;
|
||||||
|
|
||||||
|
loop = arg;
|
||||||
|
|
||||||
|
uv_mutex_lock(&loop->cf_mutex);
|
||||||
|
ngx_queue_init(&split_head);
|
||||||
|
if (!ngx_queue_empty(&loop->cf_signals)) {
|
||||||
|
ngx_queue_t* split_pos = ngx_queue_head(&loop->cf_signals);
|
||||||
|
ngx_queue_split(&loop->cf_signals, split_pos, &split_head);
|
||||||
|
}
|
||||||
|
uv_mutex_unlock(&loop->cf_mutex);
|
||||||
|
|
||||||
|
while (!ngx_queue_empty(&split_head)) {
|
||||||
|
item = ngx_queue_head(&split_head);
|
||||||
|
|
||||||
|
s = ngx_queue_data(item, uv__cf_loop_signal_t, member);
|
||||||
|
|
||||||
|
/* This was a termination signal */
|
||||||
|
if (s->cb == NULL)
|
||||||
|
CFRunLoopStop(loop->cf_loop);
|
||||||
|
else
|
||||||
|
s->cb(s->arg);
|
||||||
|
|
||||||
|
ngx_queue_remove(item);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
|
||||||
|
uv__cf_loop_signal_t* item;
|
||||||
|
|
||||||
|
item = malloc(sizeof(*item));
|
||||||
|
/* XXX: Fail */
|
||||||
|
if (item == NULL)
|
||||||
|
abort();
|
||||||
|
|
||||||
|
item->arg = arg;
|
||||||
|
item->cb = cb;
|
||||||
|
|
||||||
|
uv_mutex_lock(&loop->cf_mutex);
|
||||||
|
ngx_queue_insert_tail(&loop->cf_signals, &item->member);
|
||||||
|
uv_mutex_unlock(&loop->cf_mutex);
|
||||||
|
|
||||||
|
assert(loop->cf_loop != NULL);
|
||||||
|
CFRunLoopSourceSignal(loop->cf_cb);
|
||||||
|
CFRunLoopWakeUp(loop->cf_loop);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int uv__fsevents_init(uv_fs_event_t* handle) {
|
int uv__fsevents_init(uv_fs_event_t* handle) {
|
||||||
FSEventStreamContext ctx;
|
FSEventStreamContext ctx;
|
||||||
FSEventStreamRef ref;
|
FSEventStreamRef ref;
|
||||||
@ -209,6 +363,11 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
|
|||||||
CFArrayRef paths;
|
CFArrayRef paths;
|
||||||
CFAbsoluteTime latency;
|
CFAbsoluteTime latency;
|
||||||
FSEventStreamCreateFlags flags;
|
FSEventStreamCreateFlags flags;
|
||||||
|
int err;
|
||||||
|
|
||||||
|
err = uv__fsevents_loop_init(handle->loop);
|
||||||
|
if (err)
|
||||||
|
return err;
|
||||||
|
|
||||||
/* Initialize context */
|
/* Initialize context */
|
||||||
ctx.version = 0;
|
ctx.version = 0;
|
||||||
|
|||||||
@ -216,12 +216,10 @@ int uv__make_socketpair(int fds[2], int flags);
|
|||||||
int uv__make_pipe(int fds[2], int flags);
|
int uv__make_pipe(int fds[2], int flags);
|
||||||
|
|
||||||
#if defined(__APPLE__)
|
#if defined(__APPLE__)
|
||||||
typedef void (*cf_loop_signal_cb)(void*);
|
|
||||||
|
|
||||||
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg);
|
|
||||||
|
|
||||||
int uv__fsevents_init(uv_fs_event_t* handle);
|
int uv__fsevents_init(uv_fs_event_t* handle);
|
||||||
int uv__fsevents_close(uv_fs_event_t* handle);
|
int uv__fsevents_close(uv_fs_event_t* handle);
|
||||||
|
void uv__fsevents_loop_delete(uv_loop_t* loop);
|
||||||
|
|
||||||
/* OSX < 10.7 has no file events, polyfill them */
|
/* OSX < 10.7 has no file events, polyfill them */
|
||||||
#ifndef MAC_OS_X_VERSION_10_7
|
#ifndef MAC_OS_X_VERSION_10_7
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user