diff --git a/src/unix/udp.c b/src/unix/udp.c index 17eaf251..177d39c9 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -28,60 +28,65 @@ #include -static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w); static void uv__udp_run_completed(uv_udp_t* handle); static void uv__udp_run_pending(uv_udp_t* handle); -static void uv__udp_recvmsg(uv_udp_t* handle); -static void uv__udp_sendmsg(uv_udp_t* handle); -static void uv__udp_io(EV_P_ ev_io* w, int events); +static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents); +static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], int bufcnt, struct sockaddr* addr, socklen_t addrlen, uv_udp_send_cb send_cb); -static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) { - int flags; - - if (ev_is_active(w)) { - return; - } - - assert(w == &handle->read_watcher - || w == &handle->write_watcher); - - flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); - - w->data = handle; - ev_set_cb(w, uv__udp_io); +static void uv__udp_start_watcher(uv_udp_t* handle, + ev_io* w, + void (*cb)(EV_P_ ev_io*, int), + int flags) { + if (ev_is_active(w)) return; + ev_set_cb(w, cb); ev_io_set(w, handle->fd, flags); ev_io_start(handle->loop->ev, w); ev_unref(handle->loop->ev); } -void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) { - int flags; - - if (!ev_is_active(w)) { - return; - } - - assert(w == &handle->read_watcher - || w == &handle->write_watcher); - - flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); - +static void uv__udp_stop_watcher(uv_udp_t* handle, ev_io* w) { + if (!ev_is_active(w)) return; ev_ref(handle->loop->ev); ev_io_stop(handle->loop->ev, w); - ev_io_set(w, -1, flags); + ev_io_set(w, -1, 0); ev_set_cb(w, NULL); - w->data = (void*)0xDEADBABE; +} + + +static void uv__udp_start_read_watcher(uv_udp_t* handle) { + uv__udp_start_watcher(handle, + &handle->read_watcher, + uv__udp_recvmsg, + EV_READ); +} + + +static void uv__udp_start_write_watcher(uv_udp_t* handle) { + uv__udp_start_watcher(handle, + &handle->write_watcher, + uv__udp_sendmsg, + EV_WRITE); +} + + +static void uv__udp_stop_read_watcher(uv_udp_t* handle) { + uv__udp_stop_watcher(handle, &handle->read_watcher); +} + + +static void uv__udp_stop_write_watcher(uv_udp_t* handle) { + uv__udp_stop_watcher(handle, &handle->write_watcher); } void uv__udp_start_close(uv_udp_t* handle) { - uv__udp_watcher_stop(handle, &handle->read_watcher); - uv__udp_watcher_stop(handle, &handle->write_watcher); + uv__udp_stop_write_watcher(handle); + uv__udp_stop_read_watcher(handle); uv__close(handle->fd); handle->fd = -1; } @@ -206,13 +211,18 @@ static void uv__udp_run_completed(uv_udp_t* handle) { } -static void uv__udp_recvmsg(uv_udp_t* handle) { +static void uv__udp_recvmsg(EV_P_ ev_io* w, int revents) { struct sockaddr_storage peer; struct msghdr h; + uv_udp_t* handle; ssize_t nread; uv_buf_t buf; int flags; + handle = container_of(w, uv_udp_t, read_watcher); + assert(handle->type == UV_UDP); + assert(revents & EV_READ); + assert(handle->recv_cb != NULL); assert(handle->alloc_cb != NULL); @@ -263,7 +273,13 @@ static void uv__udp_recvmsg(uv_udp_t* handle) { } -static void uv__udp_sendmsg(uv_udp_t* handle) { +static void uv__udp_sendmsg(EV_P_ ev_io* w, int revents) { + uv_udp_t* handle; + + handle = container_of(w, uv_udp_t, write_watcher); + assert(handle->type == UV_UDP); + assert(revents & EV_WRITE); + assert(!ngx_queue_empty(&handle->write_queue) || !ngx_queue_empty(&handle->write_completed_queue)); @@ -279,28 +295,11 @@ static void uv__udp_sendmsg(uv_udp_t* handle) { } else if (ngx_queue_empty(&handle->write_queue)) { /* Pending queue and completion queue empty, stop watcher. */ - uv__udp_watcher_stop(handle, &handle->write_watcher); + uv__udp_stop_write_watcher(handle); } } -static void uv__udp_io(EV_P_ ev_io* w, int events) { - uv_udp_t* handle; - - handle = w->data; - assert(handle != NULL); - assert(handle->type == UV_UDP); - assert(handle->fd >= 0); - assert(!(events & ~(EV_READ|EV_WRITE))); - - if (events & EV_READ) - uv__udp_recvmsg(handle); - - if (events & EV_WRITE) - uv__udp_sendmsg(handle); -} - - static int uv__bind(uv_udp_t* handle, int domain, struct sockaddr* addr, @@ -434,7 +433,7 @@ static int uv__udp_send(uv_udp_send_t* req, memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0])); ngx_queue_insert_tail(&handle->write_queue, &req->queue); - uv__udp_watcher_start(handle, &handle->write_watcher); + uv__udp_start_write_watcher(handle); return 0; } @@ -589,14 +588,14 @@ int uv_udp_recv_start(uv_udp_t* handle, handle->alloc_cb = alloc_cb; handle->recv_cb = recv_cb; - uv__udp_watcher_start(handle, &handle->read_watcher); + uv__udp_start_read_watcher(handle); return 0; } int uv_udp_recv_stop(uv_udp_t* handle) { - uv__udp_watcher_stop(handle, &handle->read_watcher); + uv__udp_stop_read_watcher(handle); handle->alloc_cb = NULL; handle->recv_cb = NULL; return 0;