fsevents: use shared FSEventStream

It seems that number of simultaneously opened FSEventStreams is
limited on OSX (i.e. you can have only fixed number of them on
one running system), getting past through this limit will cause
`FSEventStreamCreate` to return false and write following message
to stderr:

    (CarbonCore.framework) FSEventStreamStart: register_with_server:
    ERROR: f2d_register_rpc() => (null) (-21)

To prevent this, we must use only one shared FSEventStream with a
paths for all uv_fsevent_t handles, and then filter out events for
each handle using this paths again.

See https://github.com/joyent/node/issues/5463
This commit is contained in:
Fedor Indutny 2013-08-21 01:43:09 +04:00
parent abcad99e23
commit cd2794c01f
4 changed files with 342 additions and 155 deletions

View File

@ -36,8 +36,8 @@
#define UV_PLATFORM_LOOP_FIELDS \
uv_thread_t cf_thread; \
void* cf_cb; \
void* cf_loop; \
void* _cf_reserved; \
void* cf_state; \
uv_mutex_t cf_mutex; \
uv_sem_t cf_sem; \
void* cf_signals[2]; \
@ -47,10 +47,10 @@
char* realpath; \
int realpath_len; \
int cf_flags; \
void* cf_eventstream; \
void* cf_event; \
uv_async_t* cf_cb; \
void* cf_events[2]; \
uv_sem_t cf_sem; \
void* cf_member[2]; \
uv_sem_t _cf_reserved; \
uv_mutex_t cf_mutex; \
#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \

View File

@ -38,7 +38,7 @@
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
loop->cf_loop = NULL;
loop->cf_state = NULL;
if (uv__kqueue_init(loop))
return -errno;

View File

@ -49,70 +49,89 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) {
typedef struct uv__fsevents_event_s uv__fsevents_event_t;
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
typedef void (*cf_loop_signal_cb)(void* arg);
typedef struct uv__cf_loop_state_s uv__cf_loop_state_t;
struct uv__cf_loop_state_s {
CFRunLoopRef loop;
CFRunLoopSourceRef signal_source;
volatile int fsevent_need_reschedule;
FSEventStreamRef fsevent_stream;
uv_sem_t fsevent_sem;
uv_mutex_t fsevent_mutex;
void* fsevent_handles[2];
int fsevent_handle_count;
};
struct uv__cf_loop_signal_s {
cf_loop_signal_cb cb;
QUEUE member;
void* arg;
uv_fs_event_t* handle;
};
struct uv__fsevents_event_s {
int events;
QUEUE member;
void* next;
char path[1];
};
static const int kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod |
kFSEventStreamEventFlagItemModified |
kFSEventStreamEventFlagItemInodeMetaMod |
kFSEventStreamEventFlagItemChangeOwner |
kFSEventStreamEventFlagItemXattrMod;
static const int kFSEventsRenamed = kFSEventStreamEventFlagItemCreated |
kFSEventStreamEventFlagItemRemoved |
kFSEventStreamEventFlagItemRenamed;
static const int kFSEventsSystem = kFSEventStreamEventFlagUserDropped |
kFSEventStreamEventFlagKernelDropped |
kFSEventStreamEventFlagEventIdsWrapped |
kFSEventStreamEventFlagHistoryDone |
kFSEventStreamEventFlagMount |
kFSEventStreamEventFlagUnmount |
kFSEventStreamEventFlagRootChanged;
/* Forward declarations */
static void uv__cf_loop_cb(void* arg);
static void* uv__cf_loop_runner(void* arg);
static void uv__cf_loop_signal(uv_loop_t* loop,
cf_loop_signal_cb cb,
void* arg);
static int uv__cf_loop_signal(uv_loop_t* loop, uv_fs_event_t* handle);
#define UV__FSEVENTS_WALK(handle, block) \
{ \
QUEUE* curr; \
QUEUE split_head; \
#define UV__FSEVENTS_PROCESS(handle, block) \
do { \
uv__fsevents_event_t* event; \
uv__fsevents_event_t* next; \
uv_mutex_lock(&(handle)->cf_mutex); \
QUEUE_INIT(&split_head); \
if (!QUEUE_EMPTY(&(handle)->cf_events)) { \
QUEUE* split_pos = QUEUE_HEAD(&(handle)->cf_events); \
QUEUE_SPLIT(&(handle)->cf_events, split_pos, &split_head); \
} \
event = (handle)->cf_event; \
(handle)->cf_event = NULL; \
uv_mutex_unlock(&(handle)->cf_mutex); \
while (!QUEUE_EMPTY(&split_head)) { \
curr = QUEUE_HEAD(&split_head); \
while (event != NULL) { \
/* Invoke callback */ \
event = QUEUE_DATA(curr, uv__fsevents_event_t, member); \
QUEUE_REMOVE(curr); \
/* Invoke block code, but only if handle wasn't closed */ \
if (((handle)->flags & (UV_CLOSING | UV_CLOSED)) == 0) \
if (!uv__is_closing((handle))) \
block \
/* Free allocated data */ \
next = event->next; \
free(event); \
event = next; \
} \
}
} while (0)
/* Runs in UV loop's thread, when there're events to report to handle */
static void uv__fsevents_cb(uv_async_t* cb, int status) {
uv_fs_event_t* handle;
handle = cb->data;
UV__FSEVENTS_WALK(handle, {
UV__FSEVENTS_PROCESS(handle, {
if (handle->event_watcher.fd != -1)
handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0);
});
if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 &&
handle->event_watcher.fd == -1) {
if (!uv__is_closing(handle) && handle->event_watcher.fd == -1)
uv__fsevents_close(handle);
}
}
/* Runs in CF thread, when there're events in FSEventStream */
static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
void* info,
size_t numEvents,
@ -125,42 +144,35 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
char* path;
char* pos;
uv_fs_event_t* handle;
QUEUE* q;
uv_loop_t* loop;
uv__cf_loop_state_t* state;
uv__fsevents_event_t* event;
QUEUE add_list;
int kFSEventsModified;
int kFSEventsRenamed;
uv__fsevents_event_t* tail;
kFSEventsModified = kFSEventStreamEventFlagItemFinderInfoMod |
kFSEventStreamEventFlagItemModified |
kFSEventStreamEventFlagItemInodeMetaMod |
kFSEventStreamEventFlagItemChangeOwner |
kFSEventStreamEventFlagItemXattrMod;
kFSEventsRenamed = kFSEventStreamEventFlagItemCreated |
kFSEventStreamEventFlagItemRemoved |
kFSEventStreamEventFlagItemRenamed;
handle = info;
loop = info;
state = loop->cf_state;
assert(state != NULL);
paths = eventPaths;
QUEUE_INIT(&add_list);
for (i = 0; i < numEvents; i++) {
/* Ignore system events */
if (eventFlags[i] & (kFSEventStreamEventFlagUserDropped |
kFSEventStreamEventFlagKernelDropped |
kFSEventStreamEventFlagEventIdsWrapped |
kFSEventStreamEventFlagHistoryDone |
kFSEventStreamEventFlagMount |
kFSEventStreamEventFlagUnmount |
kFSEventStreamEventFlagRootChanged)) {
continue;
}
/* For each handle */
QUEUE_FOREACH(q, &state->fsevent_handles) {
handle = QUEUE_DATA(q, uv_fs_event_t, cf_member);
tail = NULL;
/* TODO: Report errors */
path = paths[i];
len = strlen(path);
/* Process and filter out events */
for (i = 0; i < numEvents; i++) {
/* Ignore system events */
if (eventFlags[i] & kFSEventsSystem)
continue;
path = paths[i];
len = strlen(path);
/* Filter out paths that are outside handle's request */
if (strncmp(path, handle->realpath, handle->realpath_len) != 0)
continue;
/* Remove absolute path prefix */
if (strstr(path, handle->realpath) == path) {
path += handle->realpath_len;
len -= handle->realpath_len;
@ -169,79 +181,81 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
path++;
len--;
}
}
#ifdef MAC_OS_X_VERSION_10_7
/* Ignore events with path equal to directory itself */
if (len == 0)
continue;
/* Ignore events with path equal to directory itself */
if (len == 0)
continue;
#endif /* MAC_OS_X_VERSION_10_7 */
/* Do not emit events from subdirectories (without option set) */
pos = strchr(path, '/');
if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0 &&
pos != NULL &&
pos != path + 1)
continue;
/* Do not emit events from subdirectories (without option set) */
if ((handle->cf_flags & UV_FS_EVENT_RECURSIVE) == 0) {
pos = strchr(path, '/');
if (pos != NULL && pos != path + 1)
continue;
}
#ifndef MAC_OS_X_VERSION_10_7
path = "";
len = 0;
path = "";
len = 0;
#endif /* MAC_OS_X_VERSION_10_7 */
event = malloc(sizeof(*event) + len);
if (event == NULL)
break;
event = malloc(sizeof(*event) + len);
if (event == NULL)
break;
memcpy(event->path, path, len + 1);
memset(event, 0, sizeof(*event));
memcpy(event->path, path, len + 1);
if ((eventFlags[i] & kFSEventsModified) != 0 &&
(eventFlags[i] & kFSEventsRenamed) == 0)
event->events = UV_CHANGE;
else
event->events = UV_RENAME;
if ((eventFlags[i] & kFSEventsModified) != 0 &&
(eventFlags[i] & kFSEventsRenamed) == 0)
event->events = UV_CHANGE;
else
event->events = UV_RENAME;
QUEUE_INSERT_TAIL(&add_list, &event->member);
if (tail != NULL)
tail->next = event;
tail = event;
}
if (tail != NULL) {
uv_mutex_lock(&handle->cf_mutex);
tail->next = handle->cf_event;
handle->cf_event = tail;
uv_mutex_unlock(&handle->cf_mutex);
uv_async_send(handle->cf_cb);
}
}
uv_mutex_lock(&handle->cf_mutex);
QUEUE_ADD(&handle->cf_events, &add_list);
uv_mutex_unlock(&handle->cf_mutex);
uv_async_send(handle->cf_cb);
}
static void uv__fsevents_schedule(void* arg) {
uv_fs_event_t* handle;
/* Runs in CF thread */
static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) {
uv__cf_loop_state_t* state;
FSEventStreamContext ctx;
FSEventStreamRef ref;
CFStringRef path;
CFArrayRef paths;
CFAbsoluteTime latency;
FSEventStreamCreateFlags flags;
handle = arg;
/* Initialize context */
ctx.version = 0;
ctx.info = handle;
ctx.info = loop;
ctx.retain = NULL;
ctx.release = NULL;
ctx.copyDescription = NULL;
/* Initialize paths array */
path = CFStringCreateWithCString(NULL,
handle->filename,
CFStringGetSystemEncoding());
assert(path != NULL);
paths = CFArrayCreate(NULL, (const void**)&path, 1, NULL);
assert(paths != NULL);
latency = 0.15;
/* Set appropriate flags */
flags = kFSEventStreamCreateFlagFileEvents;
/*
* NOTE: It might sound like a good idea to remember last seen StreamEventId,
* but in reality one dir might have last StreamEventId less than, the other,
* that is being watched now. Which will cause FSEventStream API to report
* changes to files from the past.
*/
ref = FSEventStreamCreate(NULL,
&uv__fsevents_event_cb,
&ctx,
@ -250,43 +264,120 @@ static void uv__fsevents_schedule(void* arg) {
latency,
flags);
assert(ref != NULL);
handle->cf_eventstream = ref;
FSEventStreamScheduleWithRunLoop(handle->cf_eventstream,
handle->loop->cf_loop,
state = loop->cf_state;
FSEventStreamScheduleWithRunLoop(ref,
state->loop,
kCFRunLoopDefaultMode);
if (!FSEventStreamStart(handle->cf_eventstream))
if (!FSEventStreamStart(ref))
abort();
state->fsevent_stream = ref;
}
static void uv__fsevents_unschedule(void* arg) {
uv_fs_event_t* handle;
/* Runs in CF thread */
static void uv__fsevents_destroy_stream(uv_loop_t* loop) {
uv__cf_loop_state_t* state;
handle = arg;
state = loop->cf_state;
if (state->fsevent_stream == NULL)
return;
/* Flush all accumulated events */
FSEventStreamFlushSync(state->fsevent_stream);
/* Stop emitting events */
FSEventStreamStop(handle->cf_eventstream);
FSEventStreamStop(state->fsevent_stream);
/* Release stream */
FSEventStreamInvalidate(handle->cf_eventstream);
FSEventStreamRelease(handle->cf_eventstream);
handle->cf_eventstream = NULL;
/* Notify main thread that we're done here */
uv_sem_post(&handle->cf_sem);
FSEventStreamInvalidate(state->fsevent_stream);
FSEventStreamRelease(state->fsevent_stream);
state->fsevent_stream = NULL;
}
/* Runs in CF thread, when there're new fsevent handles to add to stream */
static void uv__fsevents_reschedule(uv_fs_event_t* handle) {
uv__cf_loop_state_t* state;
QUEUE* q;
uv_fs_event_t* curr;
CFArrayRef cf_paths;
CFStringRef* paths;
int i;
int path_count;
state = handle->loop->cf_state;
/* Optimization to prevent O(n^2) time spent when starting to watch
* many files simultaneously
*/
if (!state->fsevent_need_reschedule)
return;
state->fsevent_need_reschedule = 0;
/* Destroy previous FSEventStream */
uv__fsevents_destroy_stream(handle->loop);
/* Create list of all watched paths */
uv_mutex_lock(&state->fsevent_mutex);
path_count = state->fsevent_handle_count;
if (path_count != 0) {
paths = malloc(sizeof(*paths) * path_count);
if (paths == NULL)
abort();
q = &state->fsevent_handles;
for (i = 0; i < path_count; i++) {
q = QUEUE_NEXT(q);
assert(q != &state->fsevent_handles);
curr = QUEUE_DATA(q, uv_fs_event_t, cf_member);
assert(curr->realpath != NULL);
paths[i] = CFStringCreateWithCString(NULL,
curr->realpath,
CFStringGetSystemEncoding());
if (paths[i] == NULL)
abort();
}
}
uv_mutex_unlock(&state->fsevent_mutex);
if (path_count != 0) {
/* Create new FSEventStream */
cf_paths = CFArrayCreate(NULL, (const void**) paths, path_count, NULL);
if (cf_paths == NULL)
abort();
uv__fsevents_create_stream(handle->loop, cf_paths);
}
/*
* Main thread will block until the removal of handle from the list,
* we must tell it when we're ready.
*
* NOTE: This is coupled with `uv_sem_wait()` in `uv__fsevents_close`
*/
if (uv__is_closing(handle))
uv_sem_post(&state->fsevent_sem);
}
/* Runs in UV loop */
static int uv__fsevents_loop_init(uv_loop_t* loop) {
CFRunLoopSourceContext ctx;
uv__cf_loop_state_t* state;
pthread_attr_t attr_storage;
pthread_attr_t* attr;
int err;
if (loop->cf_loop != NULL)
if (loop->cf_state != NULL)
return 0;
state = calloc(1, sizeof(*state));
if (state == NULL)
return -ENOMEM;
err = uv_mutex_init(&loop->cf_mutex);
if (err)
return err;
@ -296,10 +387,27 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) {
goto fail_sem_init;
QUEUE_INIT(&loop->cf_signals);
err = uv_sem_init(&state->fsevent_sem, 0);
if (err)
goto fail_fsevent_sem_init;
err = uv_mutex_init(&state->fsevent_mutex);
if (err)
goto fail_fsevent_mutex_init;
QUEUE_INIT(&state->fsevent_handles);
state->fsevent_need_reschedule = 0;
state->fsevent_handle_count = 0;
memset(&ctx, 0, sizeof(ctx));
ctx.info = loop;
ctx.perform = uv__cf_loop_cb;
loop->cf_cb = CFRunLoopSourceCreate(NULL, 0, &ctx);
state->signal_source = CFRunLoopSourceCreate(NULL, 0, &ctx);
if (state->signal_source == NULL) {
err = -ENOMEM;
goto fail_signal_source_create;
}
/* In the unlikely event that pthread_attr_init() fails, create the thread
* with the default stack size. We'll use a little more address space but
@ -313,6 +421,8 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) {
if (pthread_attr_setstacksize(attr, 3 * PTHREAD_STACK_MIN))
abort();
loop->cf_state = state;
/* uv_thread_t is an alias for pthread_t. */
err = -pthread_create(&loop->cf_thread, attr, uv__cf_loop_runner, loop);
@ -324,26 +434,39 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) {
/* Synchronize threads */
uv_sem_wait(&loop->cf_sem);
assert(loop->cf_loop != NULL);
return 0;
fail_thread_create:
loop->cf_state = NULL;
fail_signal_source_create:
uv_mutex_destroy(&state->fsevent_mutex);
fail_fsevent_mutex_init:
uv_sem_destroy(&state->fsevent_sem);
fail_fsevent_sem_init:
uv_sem_destroy(&loop->cf_sem);
fail_sem_init:
uv_mutex_destroy(&loop->cf_mutex);
free(state);
return err;
}
/* Runs in UV loop */
void uv__fsevents_loop_delete(uv_loop_t* loop) {
uv__cf_loop_signal_t* s;
uv__cf_loop_state_t* state;
QUEUE* q;
if (loop->cf_loop == NULL)
if (loop->cf_state == NULL)
return;
uv__cf_loop_signal(loop, NULL, NULL);
if (uv__cf_loop_signal(loop, NULL) != 0)
abort();
uv_thread_join(&loop->cf_thread);
uv_sem_destroy(&loop->cf_sem);
uv_mutex_destroy(&loop->cf_mutex);
@ -355,40 +478,54 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) {
QUEUE_REMOVE(q);
free(s);
}
/* Destroy state */
state = loop->cf_state;
uv_sem_destroy(&state->fsevent_sem);
uv_mutex_destroy(&state->fsevent_mutex);
CFRelease(state->signal_source);
free(state);
loop->cf_state = NULL;
}
/* Runs in CF thread. This is the CF loop's body */
static void* uv__cf_loop_runner(void* arg) {
uv_loop_t* loop;
uv__cf_loop_state_t* state;
loop = arg;
loop->cf_loop = CFRunLoopGetCurrent();
state = loop->cf_state;
state->loop = CFRunLoopGetCurrent();
CFRunLoopAddSource(loop->cf_loop,
loop->cf_cb,
CFRunLoopAddSource(state->loop,
state->signal_source,
kCFRunLoopDefaultMode);
uv_sem_post(&loop->cf_sem);
CFRunLoopRun();
CFRunLoopRemoveSource(loop->cf_loop,
loop->cf_cb,
CFRunLoopRemoveSource(state->loop,
state->signal_source,
kCFRunLoopDefaultMode);
return NULL;
}
/* Runs in CF thread, executed after `uv__cf_loop_signal()` */
static void uv__cf_loop_cb(void* arg) {
uv_loop_t* loop;
uv__cf_loop_state_t* state;
QUEUE* item;
QUEUE split_head;
uv__cf_loop_signal_t* s;
loop = arg;
state = loop->cf_state;
QUEUE_INIT(&split_head);
uv_mutex_lock(&loop->cf_mutex);
QUEUE_INIT(&split_head);
if (!QUEUE_EMPTY(&loop->cf_signals)) {
QUEUE* split_pos = QUEUE_HEAD(&loop->cf_signals);
QUEUE_SPLIT(&loop->cf_signals, split_pos, &split_head);
@ -401,10 +538,10 @@ static void uv__cf_loop_cb(void* arg) {
s = QUEUE_DATA(item, uv__cf_loop_signal_t, member);
/* This was a termination signal */
if (s->cb == NULL)
CFRunLoopStop(loop->cf_loop);
if (s->handle == NULL)
CFRunLoopStop(state->loop);
else
s->cb(s->arg);
uv__fsevents_reschedule(s->handle);
QUEUE_REMOVE(item);
free(s);
@ -412,29 +549,34 @@ static void uv__cf_loop_cb(void* arg) {
}
void uv__cf_loop_signal(uv_loop_t* loop, cf_loop_signal_cb cb, void* arg) {
/* Runs in UV loop to notify CF thread */
int uv__cf_loop_signal(uv_loop_t* loop, uv_fs_event_t* handle) {
uv__cf_loop_signal_t* item;
uv__cf_loop_state_t* state;
item = malloc(sizeof(*item));
/* XXX: Fail */
if (item == NULL)
abort();
return -ENOMEM;
item->arg = arg;
item->cb = cb;
item->handle = handle;
uv_mutex_lock(&loop->cf_mutex);
QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member);
uv_mutex_unlock(&loop->cf_mutex);
assert(loop->cf_loop != NULL);
CFRunLoopSourceSignal(loop->cf_cb);
CFRunLoopWakeUp(loop->cf_loop);
state = loop->cf_state;
assert(state != NULL);
CFRunLoopSourceSignal(state->signal_source);
CFRunLoopWakeUp(state->loop);
return 0;
}
/* Runs in UV loop to initialize handle */
int uv__fsevents_init(uv_fs_event_t* handle) {
int err;
uv__cf_loop_state_t* state;
err = uv__fsevents_loop_init(handle->loop);
if (err)
@ -442,52 +584,98 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
/* Get absolute path to file */
handle->realpath = realpath(handle->filename, NULL);
if (handle->realpath != NULL)
handle->realpath_len = strlen(handle->realpath);
if (handle->realpath == NULL)
return -errno;
handle->realpath_len = strlen(handle->realpath);
/* Initialize singly-linked list */
handle->cf_event = NULL;
handle->cf_eventstream = NULL;
/*
* Events will occur in other thread.
* Initialize callback for getting them back into event loop's thread
*/
handle->cf_cb = malloc(sizeof(*handle->cf_cb));
if (handle->cf_cb == NULL)
return -ENOMEM;
if (handle->cf_cb == NULL) {
err = -ENOMEM;
goto fail_cf_cb_malloc;
}
handle->cf_cb->data = handle;
uv_async_init(handle->loop, handle->cf_cb, uv__fsevents_cb);
handle->cf_cb->flags |= UV__HANDLE_INTERNAL;
uv_unref((uv_handle_t*) handle->cf_cb);
uv_mutex_init(&handle->cf_mutex);
uv_sem_init(&handle->cf_sem, 0);
QUEUE_INIT(&handle->cf_events);
err = uv_mutex_init(&handle->cf_mutex);
if (err)
goto fail_cf_mutex_init;
uv__cf_loop_signal(handle->loop, uv__fsevents_schedule, handle);
/* Insert handle into the list */
state = handle->loop->cf_state;
uv_mutex_lock(&state->fsevent_mutex);
QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member);
state->fsevent_handle_count++;
state->fsevent_need_reschedule = 1;
uv_mutex_unlock(&state->fsevent_mutex);
/* Reschedule FSEventStream */
assert(handle != NULL);
err = uv__cf_loop_signal(handle->loop, handle);
if (err)
goto fail_loop_signal;
return 0;
fail_loop_signal:
uv_mutex_destroy(&handle->cf_mutex);
fail_cf_mutex_init:
free(handle->cf_cb);
handle->cf_cb = NULL;
fail_cf_cb_malloc:
free(handle->realpath);
handle->realpath = NULL;
handle->realpath_len = 0;
return err;
}
/* Runs in UV loop to de-initialize handle */
int uv__fsevents_close(uv_fs_event_t* handle) {
int err;
uv__cf_loop_state_t* state;
if (handle->cf_cb == NULL)
return -EINVAL;
uv__cf_loop_signal(handle->loop, uv__fsevents_unschedule, handle);
/* Remove handle from the list */
state = handle->loop->cf_state;
uv_mutex_lock(&state->fsevent_mutex);
QUEUE_REMOVE(&handle->cf_member);
state->fsevent_handle_count--;
state->fsevent_need_reschedule = 1;
uv_mutex_unlock(&state->fsevent_mutex);
/* Reschedule FSEventStream */
assert(handle != NULL);
err = uv__cf_loop_signal(handle->loop, handle);
if (err)
return -err;
/* Wait for deinitialization */
uv_sem_wait(&handle->cf_sem);
uv_sem_wait(&state->fsevent_sem);
uv_close((uv_handle_t*) handle->cf_cb, (uv_close_cb) free);
handle->cf_cb = NULL;
/* Free data in queue */
UV__FSEVENTS_WALK(handle, {
UV__FSEVENTS_PROCESS(handle, {
/* NOP */
})
});
uv_mutex_destroy(&handle->cf_mutex);
uv_sem_destroy(&handle->cf_sem);
free(handle->realpath);
handle->realpath = NULL;
handle->realpath_len = 0;

View File

@ -320,7 +320,6 @@ int uv_fs_event_init(uv_loop_t* loop,
#if defined(__APPLE__)
/* Nullify field to perform checks later */
handle->cf_cb = NULL;
handle->cf_eventstream = NULL;
handle->realpath = NULL;
handle->realpath_len = 0;
handle->cf_flags = flags;