unix: clean up udp read/write watchers

This commit is contained in:
Ben Noordhuis 2012-01-14 01:58:52 +01:00
parent 28b0867f03
commit 454e0212b0

View File

@ -28,60 +28,65 @@
#include <stdlib.h>
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_recvmsg(uv_udp_t* handle);
static void uv__udp_sendmsg(uv_udp_t* handle);
static void uv__udp_io(EV_P_ ev_io* w, int events);
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents);
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[],
int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb);
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
int flags;
if (ev_is_active(w)) {
return;
}
assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
w->data = handle;
ev_set_cb(w, uv__udp_io);
static void uv__udp_start_watcher(uv_udp_t* handle,
ev_io* w,
void (*cb)(EV_P_ ev_io*, int),
int flags) {
if (ev_is_active(w)) return;
ev_set_cb(w, cb);
ev_io_set(w, handle->fd, flags);
ev_io_start(handle->loop->ev, w);
ev_unref(handle->loop->ev);
}
void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
int flags;
if (!ev_is_active(w)) {
return;
}
assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) {
if (!ev_is_active(w)) return;
ev_ref(handle->loop->ev);
ev_io_stop(handle->loop->ev, w);
ev_io_set(w, -1, flags);
ev_io_set(w, -1, 0);
ev_set_cb(w, NULL);
w->data = (void*)0xDEADBABE;
}
static void uv__udp_start_read_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle,
&handle->read_watcher,
uv__udp_recvmsg,
EV_READ);
}
static void uv__udp_start_write_watcher(uv_udp_t* handle) {
uv__udp_start_watcher(handle,
&handle->write_watcher,
uv__udp_sendmsg,
EV_WRITE);
}
static void uv__udp_stop_read_watcher(uv_udp_t* handle) {
uv__udp_stop_watcher(handle, &handle->read_watcher);
}
static void uv__udp_stop_write_watcher(uv_udp_t* handle) {
uv__udp_stop_watcher(handle, &handle->write_watcher);
}
void uv__udp_start_close(uv_udp_t* handle) {
uv__udp_watcher_stop(handle, &handle->read_watcher);
uv__udp_watcher_stop(handle, &handle->write_watcher);
uv__udp_stop_write_watcher(handle);
uv__udp_stop_read_watcher(handle);
uv__close(handle->fd);
handle->fd = -1;
}
@ -206,13 +211,18 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
}
static void uv__udp_recvmsg(uv_udp_t* handle) {
static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) {
struct sockaddr_storage peer;
struct msghdr h;
uv_udp_t* handle;
ssize_t nread;
uv_buf_t buf;
int flags;
handle = container_of(w, uv_udp_t, read_watcher);
assert(handle->type == UV_UDP);
assert(revents & EV_READ);
assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);
@ -263,7 +273,13 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
}
static void uv__udp_sendmsg(uv_udp_t* handle) {
static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) {
uv_udp_t* handle;
handle = container_of(w, uv_udp_t, write_watcher);
assert(handle->type == UV_UDP);
assert(revents & EV_WRITE);
assert(!ngx_queue_empty(&handle->write_queue)
|| !ngx_queue_empty(&handle->write_completed_queue));
@ -279,28 +295,11 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
}
else if (ngx_queue_empty(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__udp_watcher_stop(handle, &handle->write_watcher);
uv__udp_stop_write_watcher(handle);
}
}
static void uv__udp_io(EV_P_ ev_io* w, int events) {
uv_udp_t* handle;
handle = w->data;
assert(handle != NULL);
assert(handle->type == UV_UDP);
assert(handle->fd >= 0);
assert(!(events & ~(EV_READ|EV_WRITE)));
if (events & EV_READ)
uv__udp_recvmsg(handle);
if (events & EV_WRITE)
uv__udp_sendmsg(handle);
}
static int uv__bind(uv_udp_t* handle,
int domain,
struct sockaddr* addr,
@ -434,7 +433,7 @@ static int uv__udp_send(uv_udp_send_t* req,
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
ngx_queue_insert_tail(&handle->write_queue, &req->queue);
uv__udp_watcher_start(handle, &handle->write_watcher);
uv__udp_start_write_watcher(handle);
return 0;
}
@ -589,14 +588,14 @@ int uv_udp_recv_start(uv_udp_t* handle,
handle->alloc_cb = alloc_cb;
handle->recv_cb = recv_cb;
uv__udp_watcher_start(handle, &handle->read_watcher);
uv__udp_start_read_watcher(handle);
return 0;
}
int uv_udp_recv_stop(uv_udp_t* handle) {
uv__udp_watcher_stop(handle, &handle->read_watcher);
uv__udp_stop_read_watcher(handle);
handle->alloc_cb = NULL;
handle->recv_cb = NULL;
return 0;