fsevents: report errors to user

This commit is contained in:
Fedor Indutny 2013-10-23 15:21:54 +04:00
parent b45e3cca6b
commit 43bef41031
2 changed files with 120 additions and 55 deletions

View File

@ -47,10 +47,10 @@
char* realpath; \
int realpath_len; \
int cf_flags; \
void* cf_event; \
uv_async_t* cf_cb; \
void* cf_events[2]; \
void* cf_member[2]; \
uv_sem_t _cf_reserved; \
int cf_error; \
uv_mutex_t cf_mutex; \
#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \

View File

@ -73,6 +73,17 @@ typedef struct uv__fsevents_event_s uv__fsevents_event_t;
typedef struct uv__cf_loop_signal_s uv__cf_loop_signal_t;
typedef struct uv__cf_loop_state_s uv__cf_loop_state_t;
struct uv__cf_loop_signal_s {
QUEUE member;
uv_fs_event_t* handle;
};
struct uv__fsevents_event_s {
QUEUE member;
int events;
char path[1];
};
struct uv__cf_loop_state_s {
CFRunLoopRef loop;
CFRunLoopSourceRef signal_source;
@ -81,18 +92,7 @@ struct uv__cf_loop_state_s {
uv_sem_t fsevent_sem;
uv_mutex_t fsevent_mutex;
void* fsevent_handles[2];
int fsevent_handle_count;
};
struct uv__cf_loop_signal_s {
QUEUE member;
uv_fs_event_t* handle;
};
struct uv__fsevents_event_s {
int events;
void* next;
char path[1];
unsigned int fsevent_handle_count;
};
/* Forward declarations */
@ -143,22 +143,36 @@ static void (*pFSEventStreamStop)(FSEventStreamRef);
#define UV__FSEVENTS_PROCESS(handle, block) \
do { \
QUEUE events; \
QUEUE* q; \
uv__fsevents_event_t* event; \
uv__fsevents_event_t* next; \
int err; \
uv_mutex_lock(&(handle)->cf_mutex); \
event = (handle)->cf_event; \
(handle)->cf_event = NULL; \
/* Split-off all events and empty original queue */ \
QUEUE_INIT(&events); \
if (!QUEUE_EMPTY(&(handle)->cf_events)) { \
q = QUEUE_HEAD(&(handle)->cf_events); \
QUEUE_SPLIT(&(handle)->cf_events, q, &events); \
} \
/* Get error (if any) and zero original one */ \
err = (handle)->cf_error; \
(handle)->cf_error = 0; \
uv_mutex_unlock(&(handle)->cf_mutex); \
while (event != NULL) { \
/* Invoke callback */ \
/* Invoke block code, but only if handle wasn't closed */ \
if (!uv__is_closing((handle))) \
/* Loop through events, deallocating each after processing */ \
while (!QUEUE_EMPTY(&events)) { \
q = QUEUE_HEAD(&events); \
event = QUEUE_DATA(q, uv__fsevents_event_t, member); \
QUEUE_REMOVE(q); \
/* NOTE: Checking uv__is_active() is required here, because handle \
* callback may close handle and invoking it after it will lead to \
* incorrect behaviour */ \
if (!uv__is_closing((handle)) && uv__is_active((handle))) \
block \
/* Free allocated data */ \
next = event->next; \
free(event); \
event = next; \
} \
if (err != 0 && !uv__is_closing((handle)) && uv__is_active((handle))) \
(handle)->cb((handle), NULL, 0, err); \
} while (0)
@ -169,12 +183,28 @@ static void uv__fsevents_cb(uv_async_t* cb, int status) {
handle = cb->data;
UV__FSEVENTS_PROCESS(handle, {
if (handle->event_watcher.fd != -1)
handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0);
handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0);
});
}
if (!uv__is_closing(handle) && handle->event_watcher.fd == -1)
uv__fsevents_close(handle);
/* Runs in CF thread, pushed event into handle's event list */
static void uv__fsevents_push_event(uv_fs_event_t* handle,
QUEUE* events,
int err) {
assert(events != NULL || err != 0);
uv_mutex_lock(&handle->cf_mutex);
/* Concatenate two queues */
if (events != NULL)
QUEUE_ADD(&handle->cf_events, events);
/* Propagate error */
if (err != 0)
handle->cf_error = err;
uv_mutex_unlock(&handle->cf_mutex);
uv_async_send(handle->cf_cb);
}
@ -195,7 +225,7 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
uv_loop_t* loop;
uv__cf_loop_state_t* state;
uv__fsevents_event_t* event;
uv__fsevents_event_t* tail;
QUEUE head;
loop = info;
state = loop->cf_state;
@ -203,9 +233,10 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
paths = eventPaths;
/* For each handle */
uv_mutex_lock(&state->fsevent_mutex);
QUEUE_FOREACH(q, &state->fsevent_handles) {
handle = QUEUE_DATA(q, uv_fs_event_t, cf_member);
tail = NULL;
QUEUE_INIT(&head);
/* Process and filter out events */
for (i = 0; i < numEvents; i++) {
@ -260,25 +291,18 @@ static void uv__fsevents_event_cb(ConstFSEventStreamRef streamRef,
else
event->events = UV_RENAME;
if (tail != NULL)
tail->next = event;
tail = event;
QUEUE_INSERT_TAIL(&head, &event->member);
}
if (tail != NULL) {
uv_mutex_lock(&handle->cf_mutex);
tail->next = handle->cf_event;
handle->cf_event = tail;
uv_mutex_unlock(&handle->cf_mutex);
uv_async_send(handle->cf_cb);
}
if (!QUEUE_EMPTY(&head))
uv__fsevents_push_event(handle, &head, 0);
}
uv_mutex_unlock(&state->fsevent_mutex);
}
/* Runs in CF thread */
static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) {
static int uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) {
uv__cf_loop_state_t* state;
FSEventStreamContext ctx;
FSEventStreamRef ref;
@ -316,10 +340,14 @@ static void uv__fsevents_create_stream(uv_loop_t* loop, CFArrayRef paths) {
pFSEventStreamScheduleWithRunLoop(ref,
state->loop,
*pkCFRunLoopDefaultMode);
if (!pFSEventStreamStart(ref))
abort();
if (!pFSEventStreamStart(ref)) {
pFSEventStreamInvalidate(ref);
pFSEventStreamRelease(ref);
return -EMFILE;
}
state->fsevent_stream = ref;
return 0;
}
@ -352,10 +380,16 @@ static void uv__fsevents_reschedule(uv_fs_event_t* handle) {
uv_fs_event_t* curr;
CFArrayRef cf_paths;
CFStringRef* paths;
int i;
int path_count;
unsigned int i;
int err;
unsigned int path_count;
state = handle->loop->cf_state;
paths = NULL;
cf_paths = NULL;
err = 0;
/* NOTE: `i` is used in deallocation loop below */
i = 0;
/* Optimization to prevent O(n^2) time spent when starting to watch
* many files simultaneously
@ -371,16 +405,21 @@ static void uv__fsevents_reschedule(uv_fs_event_t* handle) {
/* Destroy previous FSEventStream */
uv__fsevents_destroy_stream(handle->loop);
/* Any failure below will be a memory failure */
err = -ENOMEM;
/* Create list of all watched paths */
uv_mutex_lock(&state->fsevent_mutex);
path_count = state->fsevent_handle_count;
if (path_count != 0) {
paths = malloc(sizeof(*paths) * path_count);
if (paths == NULL)
abort();
if (paths == NULL) {
uv_mutex_unlock(&state->fsevent_mutex);
goto final;
}
q = &state->fsevent_handles;
for (i = 0; i < path_count; i++) {
for (; i < path_count; i++) {
q = QUEUE_NEXT(q);
assert(q != &state->fsevent_handles);
curr = QUEUE_DATA(q, uv_fs_event_t, cf_member);
@ -389,21 +428,46 @@ static void uv__fsevents_reschedule(uv_fs_event_t* handle) {
paths[i] = pCFStringCreateWithCString(NULL,
curr->realpath,
pCFStringGetSystemEncoding());
if (paths[i] == NULL)
abort();
if (paths[i] == NULL) {
uv_mutex_unlock(&state->fsevent_mutex);
goto final;
}
}
}
uv_mutex_unlock(&state->fsevent_mutex);
err = 0;
if (path_count != 0) {
/* Create new FSEventStream */
cf_paths = pCFArrayCreate(NULL, (const void**) paths, path_count, NULL);
if (cf_paths == NULL)
abort();
uv__fsevents_create_stream(handle->loop, cf_paths);
if (cf_paths == NULL) {
err = -ENOMEM;
goto final;
}
err = uv__fsevents_create_stream(handle->loop, cf_paths);
}
final:
/* Deallocate all paths in case of failure */
if (err != 0) {
if (cf_paths == NULL) {
while (i != 0)
pCFRelease(paths[--i]);
free(paths);
} else {
/* CFArray takes ownership of both strings and original C-array */
pCFRelease(cf_paths);
}
/* Broadcast error to all handles */
uv_mutex_lock(&state->fsevent_mutex);
QUEUE_FOREACH(q, &state->fsevent_handles) {
curr = QUEUE_DATA(q, uv_fs_event_t, cf_member);
uv__fsevents_push_event(curr, NULL, err);
}
uv_mutex_unlock(&state->fsevent_mutex);
}
/*
* Main thread will block until the removal of handle from the list,
* we must tell it when we're ready.
@ -722,8 +786,9 @@ int uv__fsevents_init(uv_fs_event_t* handle) {
return -errno;
handle->realpath_len = strlen(handle->realpath);
/* Initialize singly-linked list */
handle->cf_event = NULL;
/* Initialize event queue */
QUEUE_INIT(&handle->cf_events);
handle->cf_error = 0;
/*
* Events will occur in other thread.