unix: replace ev_io with uv__io_t
Replace ev_io usage with wrapper constructs. This is preliminary work for the transition to a libev-less linux backend.
This commit is contained in:
parent
7a64ec49ff
commit
3bc9707054
@ -70,6 +70,17 @@ typedef struct {
|
||||
char* errmsg;
|
||||
} uv_lib_t;
|
||||
|
||||
struct uv__io_s;
|
||||
struct uv_loop_s;
|
||||
|
||||
typedef struct uv__io_s uv__io_t;
|
||||
typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events);
|
||||
|
||||
struct uv__io_s {
|
||||
ev_io io_watcher;
|
||||
uv__io_cb cb;
|
||||
};
|
||||
|
||||
#define UV_REQ_TYPE_PRIVATE /* empty */
|
||||
|
||||
#if __linux__
|
||||
@ -78,7 +89,7 @@ typedef struct {
|
||||
struct uv__inotify_watchers { \
|
||||
struct uv_fs_event_s* rbh_root; \
|
||||
} inotify_watchers; \
|
||||
ev_io inotify_read_watcher; \
|
||||
uv__io_t inotify_read_watcher; \
|
||||
int inotify_fd;
|
||||
#elif defined(PORT_SOURCE_FILE)
|
||||
# define UV_LOOP_PRIVATE_PLATFORM_FIELDS \
|
||||
@ -142,8 +153,8 @@ typedef struct {
|
||||
#define UV_STREAM_PRIVATE_FIELDS \
|
||||
uv_connect_t *connect_req; \
|
||||
uv_shutdown_t *shutdown_req; \
|
||||
ev_io read_watcher; \
|
||||
ev_io write_watcher; \
|
||||
uv__io_t read_watcher; \
|
||||
uv__io_t write_watcher; \
|
||||
ngx_queue_t write_queue; \
|
||||
ngx_queue_t write_completed_queue; \
|
||||
int delayed_error; \
|
||||
@ -160,8 +171,8 @@ typedef struct {
|
||||
#define UV_UDP_PRIVATE_FIELDS \
|
||||
uv_alloc_cb alloc_cb; \
|
||||
uv_udp_recv_cb recv_cb; \
|
||||
ev_io read_watcher; \
|
||||
ev_io write_watcher; \
|
||||
uv__io_t read_watcher; \
|
||||
uv__io_t write_watcher; \
|
||||
ngx_queue_t write_queue; \
|
||||
ngx_queue_t write_completed_queue; \
|
||||
|
||||
@ -173,7 +184,7 @@ typedef struct {
|
||||
|
||||
/* UV_POLL */
|
||||
#define UV_POLL_PRIVATE_FIELDS \
|
||||
ev_io io_watcher;
|
||||
uv__io_t io_watcher;
|
||||
|
||||
|
||||
/* UV_PREPARE */ \
|
||||
@ -238,7 +249,6 @@ typedef struct {
|
||||
struct uv_fs_event_s* rbe_parent; \
|
||||
int rbe_color; \
|
||||
} node; \
|
||||
ev_io read_watcher; \
|
||||
uv_fs_event_cb cb;
|
||||
|
||||
#elif defined(__APPLE__) \
|
||||
|
||||
@ -592,3 +592,42 @@ uv_err_t uv_chdir(const char* dir) {
|
||||
return uv__new_sys_error(errno);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) {
|
||||
uv_loop_t* loop = ev_userdata(ev);
|
||||
uv__io_t* handle = container_of(w, uv__io_t, io_watcher);
|
||||
handle->cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR));
|
||||
}
|
||||
|
||||
|
||||
void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
|
||||
ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE));
|
||||
handle->cb = cb;
|
||||
}
|
||||
|
||||
|
||||
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
|
||||
ev_io_set(&handle->io_watcher, fd, events);
|
||||
handle->cb = cb;
|
||||
}
|
||||
|
||||
|
||||
void uv__io_start(uv_loop_t* loop, uv__io_t* handle) {
|
||||
ev_io_start(loop->ev, &handle->io_watcher);
|
||||
}
|
||||
|
||||
|
||||
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) {
|
||||
ev_io_stop(loop->ev, &handle->io_watcher);
|
||||
}
|
||||
|
||||
|
||||
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) {
|
||||
ev_feed_event(loop->ev, &handle->io_watcher, event);
|
||||
}
|
||||
|
||||
|
||||
int uv__io_active(uv__io_t* handle) {
|
||||
return ev_is_active(&handle->io_watcher);
|
||||
}
|
||||
|
||||
@ -78,6 +78,10 @@
|
||||
} \
|
||||
while (0)
|
||||
|
||||
#define UV__IO_READ EV_READ
|
||||
#define UV__IO_WRITE EV_WRITE
|
||||
#define UV__IO_ERROR EV_ERROR
|
||||
|
||||
/* flags */
|
||||
enum {
|
||||
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
|
||||
@ -127,6 +131,13 @@ int uv__socket(int domain, int type, int protocol);
|
||||
int uv__dup(int fd);
|
||||
int uv_async_stop(uv_async_t* handle);
|
||||
|
||||
void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events);
|
||||
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events);
|
||||
void uv__io_start(uv_loop_t* loop, uv__io_t* handle);
|
||||
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle);
|
||||
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event);
|
||||
int uv__io_active(uv__io_t* handle);
|
||||
|
||||
/* loop */
|
||||
int uv__loop_init(uv_loop_t* loop, int default_loop);
|
||||
void uv__loop_delete(uv_loop_t* loop);
|
||||
@ -143,8 +154,7 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
|
||||
uv_handle_type type);
|
||||
int uv__stream_open(uv_stream_t*, int fd, int flags);
|
||||
void uv__stream_destroy(uv_stream_t* stream);
|
||||
void uv__stream_io(EV_P_ ev_io* watcher, int revents);
|
||||
void uv__server_io(EV_P_ ev_io* watcher, int revents);
|
||||
void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
|
||||
int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
|
||||
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
|
||||
socklen_t addrlen, uv_connect_cb cb);
|
||||
@ -156,11 +166,9 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
|
||||
|
||||
/* pipe */
|
||||
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
|
||||
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
|
||||
|
||||
/* poll */
|
||||
void uv__poll_close(uv_poll_t* handle);
|
||||
int uv__poll_active(const uv_poll_t* handle);
|
||||
|
||||
/* various */
|
||||
void uv__async_close(uv_async_t* handle);
|
||||
|
||||
@ -51,7 +51,7 @@ static int compare_watchers(const uv_fs_event_t* a, const uv_fs_event_t* b) {
|
||||
RB_GENERATE_STATIC(uv__inotify_watchers, uv_fs_event_s, node, compare_watchers)
|
||||
|
||||
|
||||
static void uv__inotify_read(EV_P_ ev_io* w, int revents);
|
||||
static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents);
|
||||
|
||||
|
||||
static int new_inotify_fd(void) {
|
||||
@ -85,11 +85,11 @@ static int init_inotify(uv_loop_t* loop) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ev_io_init(&loop->inotify_read_watcher,
|
||||
uv__inotify_read,
|
||||
loop->inotify_fd,
|
||||
EV_READ);
|
||||
ev_io_start(loop->ev, &loop->inotify_read_watcher);
|
||||
uv__io_init(&loop->inotify_read_watcher,
|
||||
uv__inotify_read,
|
||||
loop->inotify_fd,
|
||||
UV__IO_READ);
|
||||
uv__io_start(loop, &loop->inotify_read_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -112,22 +112,18 @@ static void remove_watcher(uv_fs_event_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__inotify_read(EV_P_ ev_io* w, int revents) {
|
||||
static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int events) {
|
||||
const struct uv__inotify_event* e;
|
||||
uv_fs_event_t* handle;
|
||||
uv_loop_t* uv_loop;
|
||||
const char* filename;
|
||||
ssize_t size;
|
||||
int events;
|
||||
const char *p;
|
||||
/* needs to be large enough for sizeof(inotify_event) + strlen(filename) */
|
||||
char buf[4096];
|
||||
|
||||
uv_loop = container_of(w, uv_loop_t, inotify_read_watcher);
|
||||
|
||||
while (1) {
|
||||
do {
|
||||
size = read(uv_loop->inotify_fd, buf, sizeof buf);
|
||||
size = read(loop->inotify_fd, buf, sizeof buf);
|
||||
}
|
||||
while (size == -1 && errno == EINTR);
|
||||
|
||||
@ -148,7 +144,7 @@ static void uv__inotify_read(EV_P_ ev_io* w, int revents) {
|
||||
if (e->mask & ~(UV__IN_ATTRIB|UV__IN_MODIFY))
|
||||
events |= UV_RENAME;
|
||||
|
||||
handle = find_watcher(uv_loop, e->wd);
|
||||
handle = find_watcher(loop, e->wd);
|
||||
if (handle == NULL)
|
||||
continue; /* Handle has already been closed. */
|
||||
|
||||
|
||||
@ -66,10 +66,11 @@ void uv__loop_delete(uv_loop_t* loop) {
|
||||
uv_ares_destroy(loop, loop->channel);
|
||||
ev_loop_destroy(loop->ev);
|
||||
#if __linux__
|
||||
if (loop->inotify_fd == -1) return;
|
||||
ev_io_stop(loop->ev, &loop->inotify_read_watcher);
|
||||
close(loop->inotify_fd);
|
||||
loop->inotify_fd = -1;
|
||||
if (loop->inotify_fd != -1) {
|
||||
uv__io_stop(loop, &loop->inotify_read_watcher);
|
||||
close(loop->inotify_fd);
|
||||
loop->inotify_fd = -1;
|
||||
}
|
||||
#endif
|
||||
#if HAVE_PORTS_FS
|
||||
if (loop->fs_fd != -1)
|
||||
|
||||
@ -29,6 +29,8 @@
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events);
|
||||
|
||||
|
||||
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
||||
uv__stream_init(loop, (uv_stream_t*)handle, UV_NAMED_PIPE);
|
||||
@ -138,8 +140,11 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
|
||||
uv__set_sys_error(handle->loop, errno);
|
||||
} else {
|
||||
handle->connection_cb = cb;
|
||||
ev_io_init(&handle->read_watcher, uv__pipe_accept, handle->fd, EV_READ);
|
||||
ev_io_start(handle->loop->ev, &handle->read_watcher);
|
||||
uv__io_init(&handle->read_watcher,
|
||||
uv__pipe_accept,
|
||||
handle->fd,
|
||||
UV__IO_READ);
|
||||
uv__io_start(handle->loop, &handle->read_watcher);
|
||||
}
|
||||
|
||||
out:
|
||||
@ -211,8 +216,8 @@ void uv_pipe_connect(uv_connect_t* req,
|
||||
uv__stream_open((uv_stream_t*)handle,
|
||||
sockfd,
|
||||
UV_STREAM_READABLE | UV_STREAM_WRITABLE);
|
||||
ev_io_start(handle->loop->ev, &handle->read_watcher);
|
||||
ev_io_start(handle->loop->ev, &handle->write_watcher);
|
||||
uv__io_start(handle->loop, &handle->read_watcher);
|
||||
uv__io_start(handle->loop, &handle->write_watcher);
|
||||
status = 0;
|
||||
|
||||
out:
|
||||
@ -235,14 +240,14 @@ out:
|
||||
|
||||
|
||||
/* TODO merge with uv__server_io()? */
|
||||
void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
|
||||
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
|
||||
struct sockaddr_un saddr;
|
||||
uv_pipe_t* pipe;
|
||||
int saved_errno;
|
||||
int sockfd;
|
||||
|
||||
saved_errno = errno;
|
||||
pipe = watcher->data;
|
||||
pipe = container_of(w, uv_pipe_t, read_watcher);
|
||||
|
||||
assert(pipe->type == UV_NAMED_PIPE);
|
||||
|
||||
@ -257,7 +262,7 @@ void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) {
|
||||
pipe->connection_cb((uv_stream_t*)pipe, 0);
|
||||
if (pipe->accepted_fd == sockfd) {
|
||||
/* The user hasn't called uv_accept() yet */
|
||||
ev_io_stop(pipe->loop->ev, &pipe->read_watcher);
|
||||
uv__io_stop(pipe->loop, &pipe->read_watcher);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -27,11 +27,13 @@
|
||||
#include <errno.h>
|
||||
|
||||
|
||||
static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
|
||||
uv_poll_t* handle = watcher->data;
|
||||
int events;
|
||||
static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
|
||||
uv_poll_t* handle;
|
||||
int pevents;
|
||||
|
||||
if (ev_events & EV_ERROR) {
|
||||
handle = container_of(w, uv_poll_t, io_watcher);
|
||||
|
||||
if (events & UV__IO_ERROR) {
|
||||
/* An error happened. Libev has implicitly stopped the watcher, but we */
|
||||
/* need to fix the refcount. */
|
||||
uv__handle_stop(handle);
|
||||
@ -40,16 +42,13 @@ static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) {
|
||||
return;
|
||||
}
|
||||
|
||||
assert(ev_events & (EV_READ | EV_WRITE));
|
||||
assert((ev_events & ~(EV_READ | EV_WRITE)) == 0);
|
||||
pevents = 0;
|
||||
if (events & UV__IO_READ)
|
||||
pevents |= UV_READABLE;
|
||||
if (events & UV__IO_WRITE)
|
||||
pevents |= UV_WRITABLE;
|
||||
|
||||
events = 0;
|
||||
if (ev_events & EV_READ)
|
||||
events |= UV_READABLE;
|
||||
if (ev_events & EV_WRITE)
|
||||
events |= UV_WRITABLE;
|
||||
|
||||
handle->poll_cb(handle, 0, events);
|
||||
handle->poll_cb(handle, 0, pevents);
|
||||
}
|
||||
|
||||
|
||||
@ -59,9 +58,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
|
||||
|
||||
handle->fd = fd;
|
||||
handle->poll_cb = NULL;
|
||||
|
||||
ev_init(&handle->io_watcher, uv__poll_io);
|
||||
handle->io_watcher.data = handle;
|
||||
uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -74,7 +71,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
|
||||
|
||||
|
||||
static void uv__poll_stop(uv_poll_t* handle) {
|
||||
ev_io_stop(handle->loop->ev, &handle->io_watcher);
|
||||
uv__io_stop(handle->loop, &handle->io_watcher);
|
||||
uv__handle_stop(handle);
|
||||
}
|
||||
|
||||
@ -86,25 +83,25 @@ int uv_poll_stop(uv_poll_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
|
||||
int ev_events;
|
||||
int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
|
||||
int events;
|
||||
|
||||
assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0);
|
||||
assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
|
||||
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));
|
||||
|
||||
if (events == 0) {
|
||||
if (pevents == 0) {
|
||||
uv__poll_stop(handle);
|
||||
return 0;
|
||||
}
|
||||
|
||||
ev_events = 0;
|
||||
if (events & UV_READABLE)
|
||||
ev_events |= EV_READ;
|
||||
if (events & UV_WRITABLE)
|
||||
ev_events |= EV_WRITE;
|
||||
events = 0;
|
||||
if (pevents & UV_READABLE)
|
||||
events |= UV__IO_READ;
|
||||
if (pevents & UV_WRITABLE)
|
||||
events |= UV__IO_WRITE;
|
||||
|
||||
ev_io_set(&handle->io_watcher, handle->fd, ev_events);
|
||||
ev_io_start(handle->loop->ev, &handle->io_watcher);
|
||||
uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
|
||||
uv__io_start(handle->loop, &handle->io_watcher);
|
||||
|
||||
handle->poll_cb = poll_cb;
|
||||
uv__handle_start(handle);
|
||||
@ -116,8 +113,3 @@ int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) {
|
||||
void uv__poll_close(uv_poll_t* handle) {
|
||||
uv__poll_stop(handle);
|
||||
}
|
||||
|
||||
|
||||
int uv__poll_active(const uv_poll_t* handle) {
|
||||
return ev_is_active(&handle->io_watcher);
|
||||
}
|
||||
|
||||
@ -38,6 +38,7 @@
|
||||
static void uv__stream_connect(uv_stream_t*);
|
||||
static void uv__write(uv_stream_t* stream);
|
||||
static void uv__read(uv_stream_t* stream);
|
||||
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events);
|
||||
|
||||
|
||||
static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
|
||||
@ -71,15 +72,8 @@ void uv__stream_init(uv_loop_t* loop,
|
||||
ngx_queue_init(&stream->write_completed_queue);
|
||||
stream->write_queue_size = 0;
|
||||
|
||||
ev_init(&stream->read_watcher, uv__stream_io);
|
||||
stream->read_watcher.data = stream;
|
||||
|
||||
ev_init(&stream->write_watcher, uv__stream_io);
|
||||
stream->write_watcher.data = stream;
|
||||
|
||||
assert(ngx_queue_empty(&stream->write_queue));
|
||||
assert(ngx_queue_empty(&stream->write_completed_queue));
|
||||
assert(stream->write_queue_size == 0);
|
||||
uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0);
|
||||
uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0);
|
||||
}
|
||||
|
||||
|
||||
@ -111,13 +105,9 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||
}
|
||||
}
|
||||
|
||||
/* Associate the fd with each ev_io watcher. */
|
||||
ev_io_set(&stream->read_watcher, fd, EV_READ);
|
||||
ev_io_set(&stream->write_watcher, fd, EV_WRITE);
|
||||
|
||||
/* These should have been set up by uv_tcp_init or uv_pipe_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
/* Associate the fd with each watcher. */
|
||||
uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ);
|
||||
uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -174,19 +164,16 @@ void uv__stream_destroy(uv_stream_t* stream) {
|
||||
}
|
||||
|
||||
|
||||
void uv__server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
|
||||
int fd;
|
||||
struct sockaddr_storage addr;
|
||||
uv_stream_t* stream = watcher->data;
|
||||
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
assert(revents == EV_READ);
|
||||
uv_stream_t* stream = container_of(w, uv_stream_t, read_watcher);
|
||||
|
||||
assert(events == UV__IO_READ);
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->accepted_fd >= 0) {
|
||||
ev_io_stop(EV_A, &stream->read_watcher);
|
||||
uv__io_stop(loop, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
|
||||
@ -216,7 +203,7 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
stream->connection_cb((uv_stream_t*)stream, 0);
|
||||
if (stream->accepted_fd >= 0) {
|
||||
/* The user hasn't yet accepted called uv_accept() */
|
||||
ev_io_stop(stream->loop->ev, &stream->read_watcher);
|
||||
uv__io_stop(stream->loop, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
}
|
||||
@ -252,7 +239,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
||||
goto out;
|
||||
}
|
||||
|
||||
ev_io_start(streamServer->loop->ev, &streamServer->read_watcher);
|
||||
uv__io_start(streamServer->loop, &streamServer->read_watcher);
|
||||
streamServer->accepted_fd = -1;
|
||||
status = 0;
|
||||
|
||||
@ -312,7 +299,7 @@ static void uv__drain(uv_stream_t* stream) {
|
||||
assert(!uv_write_queue_head(stream));
|
||||
assert(stream->write_queue_size == 0);
|
||||
|
||||
ev_io_stop(stream->loop->ev, &stream->write_watcher);
|
||||
uv__io_stop(stream->loop, &stream->write_watcher);
|
||||
|
||||
/* Shutdown? */
|
||||
if ((stream->flags & UV_STREAM_SHUTTING) &&
|
||||
@ -366,7 +353,7 @@ static void uv__write_req_finish(uv_write_t* req) {
|
||||
* callback called in the near future.
|
||||
*/
|
||||
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);
|
||||
}
|
||||
|
||||
|
||||
@ -517,7 +504,7 @@ start:
|
||||
assert(!stream->blocking);
|
||||
|
||||
/* We're not done. */
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
uv__io_start(stream->loop, &stream->write_watcher);
|
||||
}
|
||||
|
||||
|
||||
@ -576,7 +563,6 @@ static void uv__read(uv_stream_t* stream) {
|
||||
struct msghdr msg;
|
||||
struct cmsghdr* cmsg;
|
||||
char cmsg_space[64];
|
||||
struct ev_loop* ev = stream->loop->ev;
|
||||
|
||||
/* XXX: Maybe instead of having UV_STREAM_READING we just test if
|
||||
* tcp->read_cb is NULL or not?
|
||||
@ -619,7 +605,7 @@ static void uv__read(uv_stream_t* stream) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
/* Wait for the next one. */
|
||||
if (stream->flags & UV_STREAM_READING) {
|
||||
ev_io_start(ev, &stream->read_watcher);
|
||||
uv__io_start(stream->loop, &stream->read_watcher);
|
||||
}
|
||||
uv__set_sys_error(stream->loop, EAGAIN);
|
||||
|
||||
@ -647,7 +633,7 @@ static void uv__read(uv_stream_t* stream) {
|
||||
} else if (nread == 0) {
|
||||
/* EOF */
|
||||
uv__set_artificial_error(stream->loop, UV_EOF);
|
||||
ev_io_stop(ev, &stream->read_watcher);
|
||||
uv__io_stop(stream->loop, &stream->read_watcher);
|
||||
if (!ev_is_active(&stream->write_watcher))
|
||||
uv__handle_stop(stream);
|
||||
|
||||
@ -728,40 +714,43 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||
stream->shutdown_req = req;
|
||||
stream->flags |= UV_STREAM_SHUTTING;
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
uv__io_start(stream->loop, &stream->write_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__stream_pending(uv_stream_t* handle) {
|
||||
uv__stream_io(handle->loop->ev, &handle->write_watcher, EV_WRITE);
|
||||
uv__stream_io(handle->loop, &handle->write_watcher, UV__IO_WRITE);
|
||||
}
|
||||
|
||||
|
||||
void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
|
||||
uv_stream_t* stream = watcher->data;
|
||||
static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
|
||||
uv_stream_t* stream;
|
||||
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
|
||||
stream->type == UV_TTY);
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
/* either UV__IO_READ or UV__IO_WRITE but not both */
|
||||
assert(!!(events & UV__IO_READ) ^ !!(events & UV__IO_WRITE));
|
||||
|
||||
if (events & UV__IO_READ)
|
||||
stream = container_of(w, uv_stream_t, read_watcher);
|
||||
else
|
||||
stream = container_of(w, uv_stream_t, write_watcher);
|
||||
|
||||
assert(stream->type == UV_TCP ||
|
||||
stream->type == UV_NAMED_PIPE ||
|
||||
stream->type == UV_TTY);
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->connect_req) {
|
||||
if (stream->connect_req)
|
||||
uv__stream_connect(stream);
|
||||
} else {
|
||||
assert(revents & (EV_READ | EV_WRITE));
|
||||
else if (events & UV__IO_READ) {
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
if (revents & EV_READ) {
|
||||
uv__read((uv_stream_t*)stream);
|
||||
}
|
||||
|
||||
if (revents & EV_WRITE) {
|
||||
uv__write(stream);
|
||||
uv__write_callbacks(stream);
|
||||
}
|
||||
uv__read(stream);
|
||||
}
|
||||
else {
|
||||
assert(stream->fd >= 0);
|
||||
uv__write(stream);
|
||||
uv__write_callbacks(stream);
|
||||
}
|
||||
}
|
||||
|
||||
@ -796,7 +785,7 @@ static void uv__stream_connect(uv_stream_t* stream) {
|
||||
return;
|
||||
|
||||
if (error == 0)
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
uv__io_start(stream->loop, &stream->read_watcher);
|
||||
|
||||
stream->connect_req = NULL;
|
||||
uv__req_unregister(stream->loop, req);
|
||||
@ -869,8 +858,7 @@ int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
|
||||
}
|
||||
}
|
||||
|
||||
assert(stream->write_watcher.data == stream);
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
uv__io_start(stream->loop, &stream->write_watcher);
|
||||
|
||||
if (stream->delayed_error)
|
||||
uv__make_pending(stream);
|
||||
@ -931,8 +919,6 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
|
||||
|
||||
assert(!ngx_queue_empty(&stream->write_queue));
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.data == stream);
|
||||
assert(stream->write_watcher.fd == stream->fd);
|
||||
|
||||
/* If the queue was empty when this function began, we should attempt to
|
||||
* do the write immediately. Otherwise start the write_watcher and wait
|
||||
@ -948,7 +934,7 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
|
||||
*/
|
||||
assert(!stream->blocking);
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
uv__io_start(stream->loop, &stream->write_watcher);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@ -993,7 +979,7 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
/* These should have been set by uv_tcp_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
uv__io_start(stream->loop, &stream->read_watcher);
|
||||
uv__handle_start(stream);
|
||||
|
||||
return 0;
|
||||
@ -1013,7 +999,7 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
||||
|
||||
|
||||
int uv_read_stop(uv_stream_t* stream) {
|
||||
ev_io_stop(stream->loop->ev, &stream->read_watcher);
|
||||
uv__io_stop(stream->loop, &stream->read_watcher);
|
||||
uv__handle_stop(stream);
|
||||
stream->flags &= ~UV_STREAM_READING;
|
||||
stream->read_cb = NULL;
|
||||
@ -1035,7 +1021,7 @@ int uv_is_writable(const uv_stream_t* stream) {
|
||||
|
||||
void uv__stream_close(uv_stream_t* handle) {
|
||||
uv_read_stop(handle);
|
||||
ev_io_stop(handle->loop->ev, &handle->write_watcher);
|
||||
uv__io_stop(handle->loop, &handle->write_watcher);
|
||||
|
||||
close(handle->fd);
|
||||
handle->fd = -1;
|
||||
|
||||
@ -201,9 +201,8 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
|
||||
tcp->connection_cb = cb;
|
||||
|
||||
/* Start listening for connections. */
|
||||
ev_io_set(&tcp->read_watcher, tcp->fd, EV_READ);
|
||||
ev_set_cb(&tcp->read_watcher, uv__server_io);
|
||||
ev_io_start(tcp->loop->ev, &tcp->read_watcher);
|
||||
uv__io_set(&tcp->read_watcher, uv__server_io, tcp->fd, UV__IO_READ);
|
||||
uv__io_start(tcp->loop, &tcp->read_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -31,33 +31,31 @@
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle);
|
||||
static void uv__udp_run_pending(uv_udp_t* handle);
|
||||
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents);
|
||||
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents);
|
||||
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents);
|
||||
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents);
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
|
||||
static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
|
||||
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);
|
||||
|
||||
|
||||
static void uv__udp_start_watcher(uv_udp_t* handle,
|
||||
ev_io* w,
|
||||
void (*cb)(EV_P_ ev_io*, int),
|
||||
int flags) {
|
||||
if (ev_is_active(w)) return;
|
||||
ev_set_cb(w, cb);
|
||||
ev_io_set(w, handle->fd, flags);
|
||||
ev_io_start(handle->loop->ev, w);
|
||||
uv__io_t* w,
|
||||
uv__io_cb cb,
|
||||
int events) {
|
||||
if (uv__io_active(w)) return;
|
||||
uv__io_init(w, cb, handle->fd, events);
|
||||
uv__io_start(handle->loop, w);
|
||||
uv__handle_start(handle);
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) {
|
||||
if (!ev_is_active(w)) return;
|
||||
ev_io_stop(handle->loop->ev, w);
|
||||
ev_io_set(w, -1, 0);
|
||||
ev_set_cb(w, NULL);
|
||||
static void uv__udp_stop_watcher(uv_udp_t* handle, uv__io_t* w) {
|
||||
if (!uv__io_active(w)) return;
|
||||
uv__io_stop(handle->loop, w);
|
||||
|
||||
if (!ev_is_active(&handle->read_watcher) &&
|
||||
!ev_is_active(&handle->write_watcher)) {
|
||||
if (!uv__io_active(&handle->read_watcher) &&
|
||||
!uv__io_active(&handle->write_watcher))
|
||||
{
|
||||
uv__handle_stop(handle);
|
||||
}
|
||||
}
|
||||
@ -67,7 +65,7 @@ static void uv__udp_start_read_watcher(uv_udp_t* handle) {
|
||||
uv__udp_start_watcher(handle,
|
||||
&handle->read_watcher,
|
||||
uv__udp_recvmsg,
|
||||
EV_READ);
|
||||
UV__IO_READ);
|
||||
}
|
||||
|
||||
|
||||
@ -75,7 +73,7 @@ static void uv__udp_start_write_watcher(uv_udp_t* handle) {
|
||||
uv__udp_start_watcher(handle,
|
||||
&handle->write_watcher,
|
||||
uv__udp_sendmsg,
|
||||
EV_WRITE);
|
||||
UV__IO_WRITE);
|
||||
}
|
||||
|
||||
|
||||
@ -101,8 +99,8 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
ngx_queue_t* q;
|
||||
|
||||
assert(!ev_is_active(&handle->write_watcher));
|
||||
assert(!ev_is_active(&handle->read_watcher));
|
||||
assert(!uv__io_active(&handle->write_watcher));
|
||||
assert(!uv__io_active(&handle->read_watcher));
|
||||
assert(handle->fd == -1);
|
||||
|
||||
uv__udp_run_completed(handle);
|
||||
@ -216,7 +214,7 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) {
|
||||
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
|
||||
struct sockaddr_storage peer;
|
||||
struct msghdr h;
|
||||
uv_udp_t* handle;
|
||||
@ -278,7 +276,7 @@ static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) {
|
||||
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, int revents) {
|
||||
uv_udp_t* handle;
|
||||
|
||||
handle = container_of(w, uv_udp_t, write_watcher);
|
||||
@ -296,7 +294,7 @@ static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) {
|
||||
|
||||
if (!ngx_queue_empty(&handle->write_completed_queue)) {
|
||||
/* Schedule completion callbacks. */
|
||||
ev_feed_event(handle->loop->ev, &handle->write_watcher, EV_WRITE);
|
||||
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
|
||||
}
|
||||
else if (ngx_queue_empty(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
@ -649,7 +647,7 @@ int uv_udp_recv_start(uv_udp_t* handle,
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (ev_is_active(&handle->read_watcher)) {
|
||||
if (uv__io_active(&handle->read_watcher)) {
|
||||
uv__set_artificial_error(handle->loop, UV_EALREADY);
|
||||
return -1;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user