unix: use select() for specific fds on OS X
kqueue(2) on osx doesn't work (emits EINVAL error) with specific fds (i.e. /dev/tty, /dev/null, etc). When given such descriptors - start select(2) watcher thread that will emit io events.
This commit is contained in:
parent
8073a2637f
commit
5da380a5ca
@ -34,4 +34,7 @@
|
|||||||
int fflags; \
|
int fflags; \
|
||||||
int fd; \
|
int fd; \
|
||||||
|
|
||||||
|
#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
||||||
|
void* select; \
|
||||||
|
|
||||||
#endif /* UV_DARWIN_H */
|
#endif /* UV_DARWIN_H */
|
||||||
|
|||||||
@ -79,6 +79,10 @@ struct uv__io_s {
|
|||||||
# define UV_PLATFORM_FS_EVENT_FIELDS /* empty */
|
# define UV_PLATFORM_FS_EVENT_FIELDS /* empty */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifndef UV_STREAM_PRIVATE_PLATFORM_FIELDS
|
||||||
|
# define UV_STREAM_PRIVATE_PLATFORM_FIELDS /* empty */
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Note: May be cast to struct iovec. See writev(2). */
|
/* Note: May be cast to struct iovec. See writev(2). */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* base;
|
char* base;
|
||||||
@ -178,6 +182,7 @@ typedef struct {
|
|||||||
int delayed_error; \
|
int delayed_error; \
|
||||||
int accepted_fd; \
|
int accepted_fd; \
|
||||||
int fd; \
|
int fd; \
|
||||||
|
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
||||||
|
|
||||||
#define UV_TCP_PRIVATE_FIELDS \
|
#define UV_TCP_PRIVATE_FIELDS \
|
||||||
uv_idle_t* idle_handle; /* for UV_TCP_SINGLE_ACCEPT handles */ \
|
uv_idle_t* idle_handle; /* for UV_TCP_SINGLE_ACCEPT handles */ \
|
||||||
|
|||||||
@ -34,6 +34,27 @@
|
|||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
# include <sys/event.h>
|
||||||
|
# include <sys/time.h>
|
||||||
|
# include <sys/select.h>
|
||||||
|
|
||||||
|
/* ev.h is overwriting EV_ERROR from sys/event.h */
|
||||||
|
#define EV_ERROR_ORIG 0x4000
|
||||||
|
|
||||||
|
/* Forward declaration */
|
||||||
|
typedef struct uv__stream_select_s uv__stream_select_t;
|
||||||
|
|
||||||
|
struct uv__stream_select_s {
|
||||||
|
uv_stream_t* stream;
|
||||||
|
uv_thread_t thread;
|
||||||
|
uv_sem_t sem;
|
||||||
|
uv_mutex_t mutex;
|
||||||
|
uv_async_t async;
|
||||||
|
int events;
|
||||||
|
int fake_fd;
|
||||||
|
};
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
static void uv__stream_connect(uv_stream_t*);
|
static void uv__stream_connect(uv_stream_t*);
|
||||||
static void uv__write(uv_stream_t* stream);
|
static void uv__write(uv_stream_t* stream);
|
||||||
@ -69,11 +90,182 @@ void uv__stream_init(uv_loop_t* loop,
|
|||||||
ngx_queue_init(&stream->write_completed_queue);
|
ngx_queue_init(&stream->write_completed_queue);
|
||||||
stream->write_queue_size = 0;
|
stream->write_queue_size = 0;
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
stream->select = NULL;
|
||||||
|
#endif /* defined(__APPLE_) */
|
||||||
|
|
||||||
uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0);
|
uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0);
|
||||||
uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0);
|
uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
void uv__stream_osx_select(void* arg) {
|
||||||
|
uv_stream_t* stream;
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
fd_set read;
|
||||||
|
fd_set write;
|
||||||
|
fd_set error;
|
||||||
|
struct timeval timeout;
|
||||||
|
int events;
|
||||||
|
int fd;
|
||||||
|
int r;
|
||||||
|
|
||||||
|
stream = arg;
|
||||||
|
s = stream->select;
|
||||||
|
fd = stream->fd;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
/* Terminate on semaphore */
|
||||||
|
if (uv_sem_trywait(&s->sem) == 0) break;
|
||||||
|
|
||||||
|
/* Watch fd using select(2) */
|
||||||
|
FD_ZERO(&read);
|
||||||
|
FD_ZERO(&write);
|
||||||
|
FD_ZERO(&error);
|
||||||
|
FD_SET(fd, &read);
|
||||||
|
FD_SET(fd, &write);
|
||||||
|
FD_SET(fd, &error);
|
||||||
|
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 250000; /* 250 ms timeout */
|
||||||
|
r = select(fd + 1, &read, &write, &error, &timeout);
|
||||||
|
if (r == -1) {
|
||||||
|
if (errno == EINTR) continue;
|
||||||
|
/* XXX: Possible?! */
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Ignore timeouts */
|
||||||
|
if (r == 0) continue;
|
||||||
|
|
||||||
|
/* Handle events */
|
||||||
|
events = 0;
|
||||||
|
if (FD_ISSET(fd, &read)) events |= UV__IO_READ;
|
||||||
|
if (FD_ISSET(fd, &write)) events |= UV__IO_WRITE;
|
||||||
|
if (FD_ISSET(fd, &error)) events |= UV__IO_ERROR;
|
||||||
|
|
||||||
|
uv_mutex_lock(&s->mutex);
|
||||||
|
s->events |= events;
|
||||||
|
uv_mutex_unlock(&s->mutex);
|
||||||
|
|
||||||
|
if (events != 0) uv_async_send(&s->async);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
|
||||||
|
uv_stream_t* stream;
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
int events;
|
||||||
|
|
||||||
|
s = container_of(handle, uv__stream_select_t, async);
|
||||||
|
stream = s->stream;
|
||||||
|
|
||||||
|
/* Get and reset stream's events */
|
||||||
|
uv_mutex_lock(&s->mutex);
|
||||||
|
events = s->events;
|
||||||
|
s->events = 0;
|
||||||
|
uv_mutex_unlock(&s->mutex);
|
||||||
|
|
||||||
|
/* Invoke callback on event-loop */
|
||||||
|
if ((events & UV__IO_READ) && uv__io_active(&stream->read_watcher)) {
|
||||||
|
uv__stream_io(stream->loop, &stream->read_watcher, UV__IO_READ);
|
||||||
|
}
|
||||||
|
if ((events & UV__IO_WRITE) && uv__io_active(&stream->write_watcher)) {
|
||||||
|
uv__stream_io(stream->loop, &stream->write_watcher, UV__IO_WRITE);
|
||||||
|
}
|
||||||
|
if (events & UV__IO_ERROR) {
|
||||||
|
/* XXX: Handle it! */
|
||||||
|
uv__stream_io(stream->loop, NULL, UV__IO_ERROR);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__stream_osx_cb_close(uv_handle_t* async) {
|
||||||
|
/* Free container */
|
||||||
|
free(container_of(async, uv__stream_select_t, async));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int uv__stream_try_select(uv_stream_t* stream, int fd) {
|
||||||
|
/*
|
||||||
|
* kqueue doesn't work with some files from /dev mount on osx.
|
||||||
|
* select(2) in separate thread for those fds
|
||||||
|
*/
|
||||||
|
|
||||||
|
int kq;
|
||||||
|
int ret;
|
||||||
|
struct kevent filter[1];
|
||||||
|
struct kevent events[1];
|
||||||
|
struct timespec timeout;
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
|
||||||
|
kq = kqueue();
|
||||||
|
if (kq < 0) {
|
||||||
|
fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
EV_SET(&filter[0], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
|
||||||
|
|
||||||
|
/* Use small timeout, because we only want to capture EINVALs */
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_nsec = 1;
|
||||||
|
|
||||||
|
ret = kevent(kq, filter, 1, events, 1, &timeout);
|
||||||
|
close(kq);
|
||||||
|
if (ret < 1) return -1;
|
||||||
|
if ((events[0].flags & EV_ERROR_ORIG) == 0 || events[0].data != EINVAL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* At this point we definitely know that this fd won't work with kqueue */
|
||||||
|
s = malloc(sizeof(*s));
|
||||||
|
if (s == NULL) {
|
||||||
|
/* TODO: Return error */
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uv_async_init(stream->loop,
|
||||||
|
&s->async,
|
||||||
|
uv__stream_osx_select_cb)) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
s->async.flags |= UV__HANDLE_INTERNAL;
|
||||||
|
uv__handle_unref((uv_handle_t*) &s->async);
|
||||||
|
|
||||||
|
if (uv_sem_init(&s->sem, 0)) goto fatal1;
|
||||||
|
if (uv_mutex_init(&s->mutex)) goto fatal2;
|
||||||
|
|
||||||
|
/* Create fake fd for io watcher */
|
||||||
|
s->fake_fd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (s->fake_fd == -1) goto fatal3;
|
||||||
|
|
||||||
|
if (uv_thread_create(&s->thread, uv__stream_osx_select, stream)) {
|
||||||
|
goto fatal4;
|
||||||
|
}
|
||||||
|
|
||||||
|
s->stream = stream;
|
||||||
|
stream->select = s;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
fatal4:
|
||||||
|
close(s->fake_fd);
|
||||||
|
fatal3:
|
||||||
|
uv_mutex_destroy(&s->mutex);
|
||||||
|
fatal2:
|
||||||
|
uv_sem_destroy(&s->sem);
|
||||||
|
fatal1:
|
||||||
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||||
|
|
||||||
|
free(s);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
|
|
||||||
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||||
socklen_t yes;
|
socklen_t yes;
|
||||||
|
|
||||||
@ -102,6 +294,13 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
if (uv__stream_try_select(stream, fd) == 0) {
|
||||||
|
/* Use fake fd */
|
||||||
|
fd = ((uv__stream_select_t*) stream->select)->fake_fd;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
/* Associate the fd with each watcher. */
|
/* Associate the fd with each watcher. */
|
||||||
uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ);
|
uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ);
|
||||||
uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE);
|
uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE);
|
||||||
@ -980,6 +1179,24 @@ int uv_is_writable(const uv_stream_t* stream) {
|
|||||||
|
|
||||||
|
|
||||||
void uv__stream_close(uv_stream_t* handle) {
|
void uv__stream_close(uv_stream_t* handle) {
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
/* Terminate select loop first */
|
||||||
|
if (handle->select != NULL) {
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
|
||||||
|
s = handle->select;
|
||||||
|
|
||||||
|
uv_sem_post(&s->sem);
|
||||||
|
uv_thread_join(&s->thread);
|
||||||
|
uv_sem_destroy(&s->sem);
|
||||||
|
uv_mutex_destroy(&s->mutex);
|
||||||
|
close(s->fake_fd);
|
||||||
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||||
|
|
||||||
|
handle->select = NULL;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
uv_read_stop(handle);
|
uv_read_stop(handle);
|
||||||
uv__io_stop(handle->loop, &handle->write_watcher);
|
uv__io_stop(handle->loop, &handle->write_watcher);
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user