unix: use select() for specific fds on OS X
kqueue(2) on osx doesn't work (emits EINVAL error) with specific fds (i.e. /dev/tty, /dev/null, etc). When given such descriptors - start select(2) watcher thread that will emit io events.
This commit is contained in:
parent
52c8a8617d
commit
731adacad2
@ -49,4 +49,7 @@
|
|||||||
uv_sem_t cf_sem; \
|
uv_sem_t cf_sem; \
|
||||||
uv_mutex_t cf_mutex; \
|
uv_mutex_t cf_mutex; \
|
||||||
|
|
||||||
|
#define UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
||||||
|
void* select; \
|
||||||
|
|
||||||
#endif /* UV_DARWIN_H */
|
#endif /* UV_DARWIN_H */
|
||||||
|
|||||||
@ -90,6 +90,10 @@ struct uv__work {
|
|||||||
# define UV_PLATFORM_FS_EVENT_FIELDS /* empty */
|
# define UV_PLATFORM_FS_EVENT_FIELDS /* empty */
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#ifndef UV_STREAM_PRIVATE_PLATFORM_FIELDS
|
||||||
|
# define UV_STREAM_PRIVATE_PLATFORM_FIELDS /* empty */
|
||||||
|
#endif
|
||||||
|
|
||||||
/* Note: May be cast to struct iovec. See writev(2). */
|
/* Note: May be cast to struct iovec. See writev(2). */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* base;
|
char* base;
|
||||||
@ -209,6 +213,7 @@ typedef struct {
|
|||||||
uv_connection_cb connection_cb; \
|
uv_connection_cb connection_cb; \
|
||||||
int delayed_error; \
|
int delayed_error; \
|
||||||
int accepted_fd; \
|
int accepted_fd; \
|
||||||
|
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
||||||
|
|
||||||
#define UV_TCP_PRIVATE_FIELDS /* empty */
|
#define UV_TCP_PRIVATE_FIELDS /* empty */
|
||||||
|
|
||||||
|
|||||||
@ -84,6 +84,8 @@ int uv_async_send(uv_async_t* handle) {
|
|||||||
r = write(handle->loop->async_pipefd[1], "x", 1);
|
r = write(handle->loop->async_pipefd[1], "x", 1);
|
||||||
while (r == -1 && errno == EINTR);
|
while (r == -1 && errno == EINTR);
|
||||||
|
|
||||||
|
assert(r == -1 || r == 1);
|
||||||
|
|
||||||
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
|
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
|
||||||
return uv__set_sys_error(handle->loop, errno);
|
return uv__set_sys_error(handle->loop, errno);
|
||||||
|
|
||||||
|
|||||||
@ -197,6 +197,13 @@ void uv__timer_close(uv_timer_t* handle);
|
|||||||
void uv__udp_close(uv_udp_t* handle);
|
void uv__udp_close(uv_udp_t* handle);
|
||||||
void uv__udp_finish_close(uv_udp_t* handle);
|
void uv__udp_finish_close(uv_udp_t* handle);
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
int uv___stream_fd(uv_stream_t* handle);
|
||||||
|
#define uv__stream_fd(handle) (uv___stream_fd((uv_stream_t*) (handle)))
|
||||||
|
#else
|
||||||
|
#define uv__stream_fd(handle) ((handle)->io_watcher.fd)
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
#ifdef UV__O_NONBLOCK
|
#ifdef UV__O_NONBLOCK
|
||||||
# define UV__F_NONBLOCK UV__O_NONBLOCK
|
# define UV__F_NONBLOCK UV__O_NONBLOCK
|
||||||
#else
|
#else
|
||||||
|
|||||||
@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
|
|||||||
bound = 0;
|
bound = 0;
|
||||||
|
|
||||||
/* Already bound? */
|
/* Already bound? */
|
||||||
if (handle->io_watcher.fd >= 0) {
|
if (uv__stream_fd(handle) >= 0) {
|
||||||
uv__set_artificial_error(handle->loop, UV_EINVAL);
|
uv__set_artificial_error(handle->loop, UV_EINVAL);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
@ -117,13 +117,13 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
|
|||||||
saved_errno = errno;
|
saved_errno = errno;
|
||||||
status = -1;
|
status = -1;
|
||||||
|
|
||||||
if (handle->io_watcher.fd == -1) {
|
if (uv__stream_fd(handle) == -1) {
|
||||||
uv__set_artificial_error(handle->loop, UV_EINVAL);
|
uv__set_artificial_error(handle->loop, UV_EINVAL);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
assert(handle->io_watcher.fd >= 0);
|
assert(uv__stream_fd(handle) >= 0);
|
||||||
|
|
||||||
if ((status = listen(handle->io_watcher.fd, backlog)) == -1) {
|
if ((status = listen(uv__stream_fd(handle), backlog)) == -1) {
|
||||||
uv__set_sys_error(handle->loop, errno);
|
uv__set_sys_error(handle->loop, errno);
|
||||||
} else {
|
} else {
|
||||||
handle->connection_cb = cb;
|
handle->connection_cb = cb;
|
||||||
@ -172,7 +172,7 @@ void uv_pipe_connect(uv_connect_t* req,
|
|||||||
int r;
|
int r;
|
||||||
|
|
||||||
saved_errno = errno;
|
saved_errno = errno;
|
||||||
new_sock = (handle->io_watcher.fd == -1);
|
new_sock = (uv__stream_fd(handle) == -1);
|
||||||
err = -1;
|
err = -1;
|
||||||
|
|
||||||
if (new_sock)
|
if (new_sock)
|
||||||
@ -187,7 +187,8 @@ void uv_pipe_connect(uv_connect_t* req,
|
|||||||
* is either there or not.
|
* is either there or not.
|
||||||
*/
|
*/
|
||||||
do {
|
do {
|
||||||
r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr);
|
r = connect(uv__stream_fd(handle),
|
||||||
|
(struct sockaddr*)&saddr, sizeof saddr);
|
||||||
}
|
}
|
||||||
while (r == -1 && errno == EINTR);
|
while (r == -1 && errno == EINTR);
|
||||||
|
|
||||||
@ -196,7 +197,7 @@ void uv_pipe_connect(uv_connect_t* req,
|
|||||||
|
|
||||||
if (new_sock)
|
if (new_sock)
|
||||||
if (uv__stream_open((uv_stream_t*)handle,
|
if (uv__stream_open((uv_stream_t*)handle,
|
||||||
handle->io_watcher.fd,
|
uv__stream_fd(handle),
|
||||||
UV_STREAM_READABLE | UV_STREAM_WRITABLE))
|
UV_STREAM_READABLE | UV_STREAM_WRITABLE))
|
||||||
goto out;
|
goto out;
|
||||||
|
|
||||||
@ -233,7 +234,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
|||||||
|
|
||||||
assert(pipe->type == UV_NAMED_PIPE);
|
assert(pipe->type == UV_NAMED_PIPE);
|
||||||
|
|
||||||
sockfd = uv__accept(pipe->io_watcher.fd);
|
sockfd = uv__accept(uv__stream_fd(pipe));
|
||||||
if (sockfd == -1) {
|
if (sockfd == -1) {
|
||||||
if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
if (errno != EAGAIN && errno != EWOULDBLOCK) {
|
||||||
uv__set_sys_error(pipe->loop, errno);
|
uv__set_sys_error(pipe->loop, errno);
|
||||||
|
|||||||
@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
|
|||||||
if (container->flags & UV_INHERIT_FD) {
|
if (container->flags & UV_INHERIT_FD) {
|
||||||
fd = container->data.fd;
|
fd = container->data.fd;
|
||||||
} else {
|
} else {
|
||||||
fd = container->data.stream->io_watcher.fd;
|
fd = uv__stream_fd(container->data.stream);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fd == -1) {
|
if (fd == -1) {
|
||||||
|
|||||||
@ -34,6 +34,26 @@
|
|||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
# include <sys/event.h>
|
||||||
|
# include <sys/time.h>
|
||||||
|
# include <sys/select.h>
|
||||||
|
|
||||||
|
/* Forward declaration */
|
||||||
|
typedef struct uv__stream_select_s uv__stream_select_t;
|
||||||
|
|
||||||
|
struct uv__stream_select_s {
|
||||||
|
uv_stream_t* stream;
|
||||||
|
uv_thread_t thread;
|
||||||
|
uv_sem_t sem;
|
||||||
|
uv_mutex_t mutex;
|
||||||
|
uv_async_t async;
|
||||||
|
int events;
|
||||||
|
int fake_fd;
|
||||||
|
int int_fd;
|
||||||
|
int fd;
|
||||||
|
};
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
static void uv__stream_connect(uv_stream_t*);
|
static void uv__stream_connect(uv_stream_t*);
|
||||||
static void uv__write(uv_stream_t* stream);
|
static void uv__write(uv_stream_t* stream);
|
||||||
@ -96,10 +116,226 @@ void uv__stream_init(uv_loop_t* loop,
|
|||||||
if (loop->emfile_fd == -1)
|
if (loop->emfile_fd == -1)
|
||||||
loop->emfile_fd = uv__open_cloexec("/", O_RDONLY);
|
loop->emfile_fd = uv__open_cloexec("/", O_RDONLY);
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
stream->select = NULL;
|
||||||
|
#endif /* defined(__APPLE_) */
|
||||||
|
|
||||||
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
|
uv__io_init(&stream->io_watcher, uv__stream_io, -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
void uv__stream_osx_select(void* arg) {
|
||||||
|
uv_stream_t* stream;
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
fd_set read;
|
||||||
|
fd_set write;
|
||||||
|
fd_set error;
|
||||||
|
struct timeval timeout;
|
||||||
|
int events;
|
||||||
|
int fd;
|
||||||
|
int r;
|
||||||
|
int max_fd;
|
||||||
|
|
||||||
|
stream = arg;
|
||||||
|
s = stream->select;
|
||||||
|
fd = stream->io_watcher.fd;
|
||||||
|
|
||||||
|
if (fd > s->int_fd)
|
||||||
|
max_fd = fd;
|
||||||
|
else
|
||||||
|
max_fd = s->int_fd;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
/* Terminate on semaphore */
|
||||||
|
if (uv_sem_trywait(&s->sem) == 0)
|
||||||
|
break;
|
||||||
|
|
||||||
|
/* Watch fd using select(2) */
|
||||||
|
FD_ZERO(&read);
|
||||||
|
FD_ZERO(&write);
|
||||||
|
FD_ZERO(&error);
|
||||||
|
|
||||||
|
if (uv_is_readable(stream))
|
||||||
|
FD_SET(fd, &read);
|
||||||
|
if (uv_is_writable(stream))
|
||||||
|
FD_SET(fd, &write);
|
||||||
|
FD_SET(fd, &error);
|
||||||
|
FD_SET(s->int_fd, &read);
|
||||||
|
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 250000; /* 250 ms timeout */
|
||||||
|
r = select(max_fd + 1, &read, &write, &error, &timeout);
|
||||||
|
if (r == -1) {
|
||||||
|
if (errno == EINTR)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
/* XXX: Possible?! */
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Ignore timeouts */
|
||||||
|
if (r == 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
/* Handle events */
|
||||||
|
events = 0;
|
||||||
|
if (FD_ISSET(fd, &read))
|
||||||
|
events |= UV__POLLIN;
|
||||||
|
if (FD_ISSET(fd, &write))
|
||||||
|
events |= UV__POLLOUT;
|
||||||
|
if (FD_ISSET(fd, &error))
|
||||||
|
events |= UV__POLLERR;
|
||||||
|
|
||||||
|
uv_mutex_lock(&s->mutex);
|
||||||
|
s->events |= events;
|
||||||
|
uv_mutex_unlock(&s->mutex);
|
||||||
|
|
||||||
|
if (events != 0)
|
||||||
|
uv_async_send(&s->async);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
|
||||||
|
/* Notify select() thread about state change */
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
int r;
|
||||||
|
|
||||||
|
s = stream->select;
|
||||||
|
|
||||||
|
/* Interrupt select() loop
|
||||||
|
* NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
|
||||||
|
* emit read event on other side
|
||||||
|
*/
|
||||||
|
do
|
||||||
|
r = write(s->fake_fd, "x", 1);
|
||||||
|
while (r == -1 && errno == EINTR);
|
||||||
|
|
||||||
|
assert(r == 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__stream_osx_select_cb(uv_async_t* handle, int status) {
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
uv_stream_t* stream;
|
||||||
|
int events;
|
||||||
|
|
||||||
|
s = container_of(handle, uv__stream_select_t, async);
|
||||||
|
stream = s->stream;
|
||||||
|
|
||||||
|
/* Get and reset stream's events */
|
||||||
|
uv_mutex_lock(&s->mutex);
|
||||||
|
events = s->events;
|
||||||
|
s->events = 0;
|
||||||
|
uv_mutex_unlock(&s->mutex);
|
||||||
|
|
||||||
|
assert(0 == (events & UV__POLLERR));
|
||||||
|
|
||||||
|
/* Invoke callback on event-loop */
|
||||||
|
if ((events & UV__POLLIN) && uv__io_active(&stream->io_watcher, UV__POLLIN))
|
||||||
|
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLIN);
|
||||||
|
|
||||||
|
if ((events & UV__POLLOUT) && uv__io_active(&stream->io_watcher, UV__POLLOUT))
|
||||||
|
uv__stream_io(stream->loop, &stream->io_watcher, UV__POLLOUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__stream_osx_cb_close(uv_handle_t* async) {
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
|
||||||
|
s = container_of(async, uv__stream_select_t, async);
|
||||||
|
free(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int uv__stream_try_select(uv_stream_t* stream, int fd) {
|
||||||
|
/*
|
||||||
|
* kqueue doesn't work with some files from /dev mount on osx.
|
||||||
|
* select(2) in separate thread for those fds
|
||||||
|
*/
|
||||||
|
|
||||||
|
struct kevent filter[1];
|
||||||
|
struct kevent events[1];
|
||||||
|
struct timespec timeout;
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
int fds[2];
|
||||||
|
int ret;
|
||||||
|
int kq;
|
||||||
|
|
||||||
|
kq = kqueue();
|
||||||
|
if (kq == -1) {
|
||||||
|
fprintf(stderr, "(libuv) Failed to create kqueue (%d)\n", errno);
|
||||||
|
return uv__set_sys_error(stream->loop, errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
EV_SET(&filter[0], fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
|
||||||
|
|
||||||
|
/* Use small timeout, because we only want to capture EINVALs */
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_nsec = 1;
|
||||||
|
|
||||||
|
ret = kevent(kq, filter, 1, events, 1, &timeout);
|
||||||
|
SAVE_ERRNO(close(kq));
|
||||||
|
|
||||||
|
if (ret == -1)
|
||||||
|
return uv__set_sys_error(stream->loop, errno);
|
||||||
|
|
||||||
|
if ((events[0].flags & EV_ERROR) == 0 || events[0].data != EINVAL)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
/* At this point we definitely know that this fd won't work with kqueue */
|
||||||
|
s = malloc(sizeof(*s));
|
||||||
|
if (s == NULL)
|
||||||
|
return uv__set_artificial_error(stream->loop, UV_ENOMEM);
|
||||||
|
|
||||||
|
s->fd = fd;
|
||||||
|
|
||||||
|
if (uv_async_init(stream->loop, &s->async, uv__stream_osx_select_cb)) {
|
||||||
|
SAVE_ERRNO(free(s));
|
||||||
|
return uv__set_sys_error(stream->loop, errno);
|
||||||
|
}
|
||||||
|
|
||||||
|
s->async.flags |= UV__HANDLE_INTERNAL;
|
||||||
|
uv__handle_unref(&s->async);
|
||||||
|
|
||||||
|
if (uv_sem_init(&s->sem, 0))
|
||||||
|
goto fatal1;
|
||||||
|
|
||||||
|
if (uv_mutex_init(&s->mutex))
|
||||||
|
goto fatal2;
|
||||||
|
|
||||||
|
/* Create fds for io watcher and to interrupt the select() loop. */
|
||||||
|
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
|
||||||
|
goto fatal3;
|
||||||
|
|
||||||
|
s->fake_fd = fds[0];
|
||||||
|
s->int_fd = fds[1];
|
||||||
|
|
||||||
|
if (uv_thread_create(&s->thread, uv__stream_osx_select, stream))
|
||||||
|
goto fatal4;
|
||||||
|
|
||||||
|
s->stream = stream;
|
||||||
|
stream->select = s;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
fatal4:
|
||||||
|
close(s->fake_fd);
|
||||||
|
close(s->int_fd);
|
||||||
|
s->fake_fd = -1;
|
||||||
|
s->int_fd = -1;
|
||||||
|
fatal3:
|
||||||
|
uv_mutex_destroy(&s->mutex);
|
||||||
|
fatal2:
|
||||||
|
uv_sem_destroy(&s->sem);
|
||||||
|
fatal1:
|
||||||
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||||
|
return uv__set_sys_error(stream->loop, errno);
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
|
|
||||||
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||||
assert(fd >= 0);
|
assert(fd >= 0);
|
||||||
stream->flags |= flags;
|
stream->flags |= flags;
|
||||||
@ -113,6 +349,21 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
|||||||
return uv__set_sys_error(stream->loop, errno);
|
return uv__set_sys_error(stream->loop, errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
{
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
int r;
|
||||||
|
|
||||||
|
r = uv__stream_try_select(stream, fd);
|
||||||
|
if (r == -1)
|
||||||
|
return r;
|
||||||
|
|
||||||
|
s = stream->select;
|
||||||
|
if (s != NULL)
|
||||||
|
fd = s->fake_fd;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
stream->io_watcher.fd = fd;
|
stream->io_watcher.fd = fd;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -231,9 +482,9 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
|||||||
/* connection_cb can close the server socket while we're
|
/* connection_cb can close the server socket while we're
|
||||||
* in the loop so check it on each iteration.
|
* in the loop so check it on each iteration.
|
||||||
*/
|
*/
|
||||||
while (stream->io_watcher.fd != -1) {
|
while (uv__stream_fd(stream) != -1) {
|
||||||
assert(stream->accepted_fd == -1);
|
assert(stream->accepted_fd == -1);
|
||||||
fd = uv__accept(stream->io_watcher.fd);
|
fd = uv__accept(uv__stream_fd(stream));
|
||||||
|
|
||||||
if (fd == -1) {
|
if (fd == -1) {
|
||||||
switch (errno) {
|
switch (errno) {
|
||||||
@ -254,7 +505,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (use_emfile_trick) {
|
if (use_emfile_trick) {
|
||||||
SAVE_ERRNO(r = uv__emfile_trick(loop, stream->io_watcher.fd));
|
SAVE_ERRNO(r = uv__emfile_trick(loop, uv__stream_fd(stream)));
|
||||||
if (r == 0)
|
if (r == 0)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -386,7 +637,7 @@ static void uv__drain(uv_stream_t* stream) {
|
|||||||
stream->shutdown_req = NULL;
|
stream->shutdown_req = NULL;
|
||||||
uv__req_unregister(stream->loop, req);
|
uv__req_unregister(stream->loop, req);
|
||||||
|
|
||||||
if (shutdown(stream->io_watcher.fd, SHUT_WR)) {
|
if (shutdown(uv__stream_fd(stream), SHUT_WR)) {
|
||||||
/* Error. Report it. User should call uv_close(). */
|
/* Error. Report it. User should call uv_close(). */
|
||||||
uv__set_sys_error(stream->loop, errno);
|
uv__set_sys_error(stream->loop, errno);
|
||||||
if (req->cb) {
|
if (req->cb) {
|
||||||
@ -450,7 +701,7 @@ static void uv__write(uv_stream_t* stream) {
|
|||||||
|
|
||||||
start:
|
start:
|
||||||
|
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
|
|
||||||
/* Get the request at the head of the queue. */
|
/* Get the request at the head of the queue. */
|
||||||
req = uv_write_queue_head(stream);
|
req = uv_write_queue_head(stream);
|
||||||
@ -504,15 +755,15 @@ start:
|
|||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
n = sendmsg(stream->io_watcher.fd, &msg, 0);
|
n = sendmsg(uv__stream_fd(stream), &msg, 0);
|
||||||
}
|
}
|
||||||
while (n == -1 && errno == EINTR);
|
while (n == -1 && errno == EINTR);
|
||||||
} else {
|
} else {
|
||||||
do {
|
do {
|
||||||
if (iovcnt == 1) {
|
if (iovcnt == 1) {
|
||||||
n = write(stream->io_watcher.fd, iov[0].iov_base, iov[0].iov_len);
|
n = write(uv__stream_fd(stream), iov[0].iov_base, iov[0].iov_len);
|
||||||
} else {
|
} else {
|
||||||
n = writev(stream->io_watcher.fd, iov, iovcnt);
|
n = writev(uv__stream_fd(stream), iov, iovcnt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
while (n == -1 && errno == EINTR);
|
while (n == -1 && errno == EINTR);
|
||||||
@ -661,11 +912,11 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
|
|
||||||
assert(buf.len > 0);
|
assert(buf.len > 0);
|
||||||
assert(buf.base);
|
assert(buf.base);
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
|
|
||||||
if (stream->read_cb) {
|
if (stream->read_cb) {
|
||||||
do {
|
do {
|
||||||
nread = read(stream->io_watcher.fd, buf.base, buf.len);
|
nread = read(uv__stream_fd(stream), buf.base, buf.len);
|
||||||
}
|
}
|
||||||
while (nread < 0 && errno == EINTR);
|
while (nread < 0 && errno == EINTR);
|
||||||
} else {
|
} else {
|
||||||
@ -681,7 +932,7 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
msg.msg_control = (void *) cmsg_space;
|
msg.msg_control = (void *) cmsg_space;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
nread = recvmsg(stream->io_watcher.fd, &msg, 0);
|
nread = recvmsg(uv__stream_fd(stream), &msg, 0);
|
||||||
}
|
}
|
||||||
while (nread < 0 && errno == EINTR);
|
while (nread < 0 && errno == EINTR);
|
||||||
}
|
}
|
||||||
@ -790,7 +1041,7 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||||
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
||||||
"uv_shutdown (unix) only supports uv_handle_t right now");
|
"uv_shutdown (unix) only supports uv_handle_t right now");
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
|
|
||||||
if (!(stream->flags & UV_STREAM_WRITABLE) ||
|
if (!(stream->flags & UV_STREAM_WRITABLE) ||
|
||||||
stream->flags & UV_STREAM_SHUT ||
|
stream->flags & UV_STREAM_SHUT ||
|
||||||
@ -829,16 +1080,16 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (events & UV__POLLIN) {
|
if (events & UV__POLLIN) {
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
|
|
||||||
uv__read(stream);
|
uv__read(stream);
|
||||||
|
|
||||||
if (stream->io_watcher.fd == -1)
|
if (uv__stream_fd(stream) == -1)
|
||||||
return; /* read_cb closed stream. */
|
return; /* read_cb closed stream. */
|
||||||
}
|
}
|
||||||
|
|
||||||
if (events & UV__POLLOUT) {
|
if (events & UV__POLLOUT) {
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
uv__write(stream);
|
uv__write(stream);
|
||||||
uv__write_callbacks(stream);
|
uv__write_callbacks(stream);
|
||||||
}
|
}
|
||||||
@ -867,8 +1118,12 @@ static void uv__stream_connect(uv_stream_t* stream) {
|
|||||||
stream->delayed_error = 0;
|
stream->delayed_error = 0;
|
||||||
} else {
|
} else {
|
||||||
/* Normal situation: we need to get the socket error from the kernel. */
|
/* Normal situation: we need to get the socket error from the kernel. */
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
getsockopt(stream->io_watcher.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
|
getsockopt(uv__stream_fd(stream),
|
||||||
|
SOL_SOCKET,
|
||||||
|
SO_ERROR,
|
||||||
|
&error,
|
||||||
|
&errorsize);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (error == EINPROGRESS)
|
if (error == EINPROGRESS)
|
||||||
@ -898,7 +1153,7 @@ int uv_write2(uv_write_t* req,
|
|||||||
stream->type == UV_TTY) &&
|
stream->type == UV_TTY) &&
|
||||||
"uv_write (unix) does not yet support other types of streams");
|
"uv_write (unix) does not yet support other types of streams");
|
||||||
|
|
||||||
if (stream->io_watcher.fd < 0) {
|
if (uv__stream_fd(stream) < 0) {
|
||||||
uv__set_sys_error(stream->loop, EBADF);
|
uv__set_sys_error(stream->loop, EBADF);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -981,11 +1236,17 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
|||||||
*/
|
*/
|
||||||
stream->flags |= UV_STREAM_READING;
|
stream->flags |= UV_STREAM_READING;
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
/* Notify select() thread about state change */
|
||||||
|
if (stream->select != NULL)
|
||||||
|
uv__stream_osx_interrupt_select(stream);
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
/* TODO: try to do the read inline? */
|
/* TODO: try to do the read inline? */
|
||||||
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
|
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
|
||||||
* not start the IO watcher.
|
* not start the IO watcher.
|
||||||
*/
|
*/
|
||||||
assert(stream->io_watcher.fd >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
assert(alloc_cb);
|
assert(alloc_cb);
|
||||||
|
|
||||||
stream->read_cb = read_cb;
|
stream->read_cb = read_cb;
|
||||||
@ -1015,6 +1276,13 @@ int uv_read_stop(uv_stream_t* stream) {
|
|||||||
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
|
||||||
uv__handle_stop(stream);
|
uv__handle_stop(stream);
|
||||||
stream->flags &= ~UV_STREAM_READING;
|
stream->flags &= ~UV_STREAM_READING;
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
/* Notify select() thread about state change */
|
||||||
|
if (stream->select != NULL)
|
||||||
|
uv__stream_osx_interrupt_select(stream);
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
stream->read_cb = NULL;
|
stream->read_cb = NULL;
|
||||||
stream->read2_cb = NULL;
|
stream->read2_cb = NULL;
|
||||||
stream->alloc_cb = NULL;
|
stream->alloc_cb = NULL;
|
||||||
@ -1032,7 +1300,40 @@ int uv_is_writable(const uv_stream_t* stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
int uv___stream_fd(uv_stream_t* handle) {
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
|
||||||
|
s = handle->select;
|
||||||
|
if (s != NULL)
|
||||||
|
return s->fd;
|
||||||
|
|
||||||
|
return handle->io_watcher.fd;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
|
|
||||||
void uv__stream_close(uv_stream_t* handle) {
|
void uv__stream_close(uv_stream_t* handle) {
|
||||||
|
#if defined(__APPLE__)
|
||||||
|
/* Terminate select loop first */
|
||||||
|
if (handle->select != NULL) {
|
||||||
|
uv__stream_select_t* s;
|
||||||
|
|
||||||
|
s = handle->select;
|
||||||
|
|
||||||
|
uv_sem_post(&s->sem);
|
||||||
|
uv__stream_osx_interrupt_select(handle);
|
||||||
|
uv_thread_join(&s->thread);
|
||||||
|
uv_sem_destroy(&s->sem);
|
||||||
|
uv_mutex_destroy(&s->mutex);
|
||||||
|
close(s->fake_fd);
|
||||||
|
close(s->int_fd);
|
||||||
|
uv_close((uv_handle_t*) &s->async, uv__stream_osx_cb_close);
|
||||||
|
|
||||||
|
handle->select = NULL;
|
||||||
|
}
|
||||||
|
#endif /* defined(__APPLE__) */
|
||||||
|
|
||||||
uv_read_stop(handle);
|
uv_read_stop(handle);
|
||||||
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||||
|
|
||||||
|
|||||||
@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
|
|||||||
static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) {
|
static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) {
|
||||||
int sockfd;
|
int sockfd;
|
||||||
|
|
||||||
if (handle->io_watcher.fd != -1)
|
if (uv__stream_fd(handle) != -1)
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
sockfd = uv__socket(domain, SOCK_STREAM, 0);
|
sockfd = uv__socket(domain, SOCK_STREAM, 0);
|
||||||
@ -97,7 +97,7 @@ static int uv__connect(uv_connect_t* req,
|
|||||||
handle->delayed_error = 0;
|
handle->delayed_error = 0;
|
||||||
|
|
||||||
do
|
do
|
||||||
r = connect(handle->io_watcher.fd, addr, addrlen);
|
r = connect(uv__stream_fd(handle), addr, addrlen);
|
||||||
while (r == -1 && errno == EINTR);
|
while (r == -1 && errno == EINTR);
|
||||||
|
|
||||||
if (r == -1) {
|
if (r == -1) {
|
||||||
@ -166,7 +166,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
|
|||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle->io_watcher.fd < 0) {
|
if (uv__stream_fd(handle) < 0) {
|
||||||
uv__set_sys_error(handle->loop, EINVAL);
|
uv__set_sys_error(handle->loop, EINVAL);
|
||||||
rv = -1;
|
rv = -1;
|
||||||
goto out;
|
goto out;
|
||||||
@ -175,7 +175,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
|
|||||||
/* sizeof(socklen_t) != sizeof(int) on some systems. */
|
/* sizeof(socklen_t) != sizeof(int) on some systems. */
|
||||||
socklen = (socklen_t)*namelen;
|
socklen = (socklen_t)*namelen;
|
||||||
|
|
||||||
if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
|
if (getsockname(uv__stream_fd(handle), name, &socklen) == -1) {
|
||||||
uv__set_sys_error(handle->loop, errno);
|
uv__set_sys_error(handle->loop, errno);
|
||||||
rv = -1;
|
rv = -1;
|
||||||
} else {
|
} else {
|
||||||
@ -203,7 +203,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
|
|||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle->io_watcher.fd < 0) {
|
if (uv__stream_fd(handle) < 0) {
|
||||||
uv__set_sys_error(handle->loop, EINVAL);
|
uv__set_sys_error(handle->loop, EINVAL);
|
||||||
rv = -1;
|
rv = -1;
|
||||||
goto out;
|
goto out;
|
||||||
@ -212,7 +212,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
|
|||||||
/* sizeof(socklen_t) != sizeof(int) on some systems. */
|
/* sizeof(socklen_t) != sizeof(int) on some systems. */
|
||||||
socklen = (socklen_t)*namelen;
|
socklen = (socklen_t)*namelen;
|
||||||
|
|
||||||
if (getpeername(handle->io_watcher.fd, name, &socklen) == -1) {
|
if (getpeername(uv__stream_fd(handle), name, &socklen) == -1) {
|
||||||
uv__set_sys_error(handle->loop, errno);
|
uv__set_sys_error(handle->loop, errno);
|
||||||
rv = -1;
|
rv = -1;
|
||||||
} else {
|
} else {
|
||||||
@ -312,8 +312,8 @@ int uv__tcp_keepalive(int fd, int on, unsigned int delay) {
|
|||||||
|
|
||||||
|
|
||||||
int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
|
int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
|
||||||
if (handle->io_watcher.fd != -1)
|
if (uv__stream_fd(handle) != -1)
|
||||||
if (uv__tcp_nodelay(handle->io_watcher.fd, on))
|
if (uv__tcp_nodelay(uv__stream_fd(handle), on))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (on)
|
if (on)
|
||||||
@ -326,8 +326,8 @@ int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
|
|||||||
|
|
||||||
|
|
||||||
int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
|
int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
|
||||||
if (handle->io_watcher.fd != -1)
|
if (uv__stream_fd(handle) != -1)
|
||||||
if (uv__tcp_keepalive(handle->io_watcher.fd, on, delay))
|
if (uv__tcp_keepalive(uv__stream_fd(handle), on, delay))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (on)
|
if (on)
|
||||||
@ -335,7 +335,7 @@ int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
|
|||||||
else
|
else
|
||||||
handle->flags &= ~UV_TCP_KEEPALIVE;
|
handle->flags &= ~UV_TCP_KEEPALIVE;
|
||||||
|
|
||||||
/* TODO Store delay if handle->io_watcher.fd == -1 but don't want to enlarge
|
/* TODO Store delay if uv__stream_fd(handle) == -1 but don't want to enlarge
|
||||||
* uv_tcp_t with an int that's almost never used...
|
* uv_tcp_t with an int that's almost never used...
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|||||||
@ -54,7 +54,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) {
|
|||||||
struct termios raw;
|
struct termios raw;
|
||||||
int fd;
|
int fd;
|
||||||
|
|
||||||
fd = tty->io_watcher.fd;
|
fd = uv__stream_fd(tty);
|
||||||
|
|
||||||
if (mode && tty->mode == 0) {
|
if (mode && tty->mode == 0) {
|
||||||
/* on */
|
/* on */
|
||||||
@ -105,7 +105,7 @@ fatal:
|
|||||||
int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) {
|
int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) {
|
||||||
struct winsize ws;
|
struct winsize ws;
|
||||||
|
|
||||||
if (ioctl(tty->io_watcher.fd, TIOCGWINSZ, &ws) < 0) {
|
if (ioctl(uv__stream_fd(tty), TIOCGWINSZ, &ws) < 0) {
|
||||||
uv__set_sys_error(tty->loop, errno);
|
uv__set_sys_error(tty->loop, errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user