stream: use harder sync restrictions for osx-hack
Synchronize harder to avoid excessive spins, invokations of async callback and sporadic assertion failures on double-call of async callback.
This commit is contained in:
parent
4e5b8dc2ef
commit
67f9b91a8b
@ -46,8 +46,8 @@ typedef struct uv__stream_select_s uv__stream_select_t;
|
||||
struct uv__stream_select_s {
|
||||
uv_stream_t* stream;
|
||||
uv_thread_t thread;
|
||||
uv_sem_t sem;
|
||||
uv_mutex_t mutex;
|
||||
uv_sem_t close_sem;
|
||||
uv_sem_t async_sem;
|
||||
uv_async_t async;
|
||||
int events;
|
||||
int fake_fd;
|
||||
@ -148,7 +148,7 @@ static void uv__stream_osx_select(void* arg) {
|
||||
|
||||
while (1) {
|
||||
/* Terminate on semaphore */
|
||||
if (uv_sem_trywait(&s->sem) == 0)
|
||||
if (uv_sem_trywait(&s->close_sem) == 0)
|
||||
break;
|
||||
|
||||
/* Watch fd using select(2) */
|
||||
@ -204,11 +204,13 @@ static void uv__stream_osx_select(void* arg) {
|
||||
|
||||
assert(events != 0 || FD_ISSET(s->int_fd, &sread));
|
||||
if (events != 0) {
|
||||
uv_mutex_lock(&s->mutex);
|
||||
s->events |= events;
|
||||
ACCESS_ONCE(int, s->events) = events;
|
||||
|
||||
uv_async_send(&s->async);
|
||||
uv_mutex_unlock(&s->mutex);
|
||||
uv_sem_wait(&s->async_sem);
|
||||
|
||||
/* Should be processed at this stage */
|
||||
assert((s->events == 0) || (stream->flags & UV_CLOSING));
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -242,10 +244,9 @@ static void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
|
||||
stream = s->stream;
|
||||
|
||||
/* Get and reset stream's events */
|
||||
uv_mutex_lock(&s->mutex);
|
||||
events = s->events;
|
||||
s->events = 0;
|
||||
uv_mutex_unlock(&s->mutex);
|
||||
ACCESS_ONCE(int, s->events) = 0;
|
||||
uv_sem_post(&s->async_sem);
|
||||
|
||||
assert(events != 0);
|
||||
assert(events == (events & (UV__POLLIN | UV__POLLOUT)));
|
||||
@ -318,10 +319,10 @@ int uv__stream_try_select(uv_stream_t* stream, int* fd) {
|
||||
s->async.flags |= UV__HANDLE_INTERNAL;
|
||||
uv__handle_unref(&s->async);
|
||||
|
||||
if (uv_sem_init(&s->sem, 0))
|
||||
if (uv_sem_init(&s->close_sem, 0))
|
||||
goto fatal1;
|
||||
|
||||
if (uv_mutex_init(&s->mutex))
|
||||
if (uv_sem_init(&s->async_sem, 0))
|
||||
goto fatal2;
|
||||
|
||||
/* Create fds for io watcher and to interrupt the select() loop. */
|
||||
@ -346,9 +347,9 @@ fatal4:
|
||||
s->fake_fd = -1;
|
||||
s->int_fd = -1;
|
||||
fatal3:
|
||||
uv_mutex_destroy(&s->mutex);
|
||||
uv_sem_destroy(&s->async_sem);
|
||||
fatal2:
|
||||
uv_sem_destroy(&s->sem);
|
||||
uv_sem_destroy(&s->close_sem);
|
||||
fatal1:
|
||||
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||
return uv__set_sys_error(stream->loop, errno);
|
||||
@ -1360,11 +1361,12 @@ void uv__stream_close(uv_stream_t* handle) {
|
||||
|
||||
s = handle->select;
|
||||
|
||||
uv_sem_post(&s->sem);
|
||||
uv_sem_post(&s->close_sem);
|
||||
uv_sem_post(&s->async_sem);
|
||||
uv__stream_osx_interrupt_select(handle);
|
||||
uv_thread_join(&s->thread);
|
||||
uv_sem_destroy(&s->sem);
|
||||
uv_mutex_destroy(&s->mutex);
|
||||
uv_sem_destroy(&s->close_sem);
|
||||
uv_sem_destroy(&s->async_sem);
|
||||
close(s->fake_fd);
|
||||
close(s->int_fd);
|
||||
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||
|
||||
Loading…
Reference in New Issue
Block a user