diff --git a/src/unix/udp.c b/src/unix/udp.c index d99fe104..d8b07e18 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -38,10 +38,9 @@ static void uv__udp_run_completed(uv_udp_t* handle); -static void uv__udp_run_pending(uv_udp_t* handle); static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents); -static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents); -static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents); +static void uv__udp_recvmsg(uv_udp_t* handle); +static void uv__udp_sendmsg(uv_udp_t* handle); static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain, unsigned int flags); @@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) { assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT)); assert(handle->io_watcher.fd == -1); - uv__udp_run_completed(handle); - while (!QUEUE_EMPTY(&handle->write_queue)) { q = QUEUE_HEAD(&handle->write_queue); QUEUE_REMOVE(q); req = QUEUE_DATA(q, uv_udp_send_t, queue); - uv__req_unregister(handle->loop, req); - - if (req->bufs != req->bufsml) - free(req->bufs); - req->bufs = NULL; - - if (req->send_cb != NULL) - req->send_cb(req, -ECANCELED); + req->status = -ECANCELED; + QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); } - handle->send_queue_size = 0; - handle->send_queue_count = 0; + uv__udp_run_completed(handle); + + assert(handle->send_queue_size == 0); + assert(handle->send_queue_count == 0); /* Now tear down the handle. */ handle->recv_cb = NULL; @@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) { } -static void uv__udp_run_pending(uv_udp_t* handle) { - uv_udp_send_t* req; - QUEUE* q; - struct msghdr h; - ssize_t size; - - while (!QUEUE_EMPTY(&handle->write_queue)) { - q = QUEUE_HEAD(&handle->write_queue); - assert(q != NULL); - - req = QUEUE_DATA(q, uv_udp_send_t, queue); - assert(req != NULL); - - memset(&h, 0, sizeof h); - h.msg_name = &req->addr; - h.msg_namelen = (req->addr.ss_family == AF_INET6 ? - sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); - h.msg_iov = (struct iovec*) req->bufs; - h.msg_iovlen = req->nbufs; - - do { - size = sendmsg(handle->io_watcher.fd, &h, 0); - } - while (size == -1 && errno == EINTR); - - /* TODO try to write once or twice more in the - * hope that the socket becomes readable again? - */ - if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) - break; - - req->status = (size == -1 ? -errno : size); - - /* Sending a datagram is an atomic operation: either all data - * is written or nothing is (and EMSGSIZE is raised). That is - * why we don't handle partial writes. Just pop the request - * off the write queue and onto the completed queue, done. - */ - handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); - handle->send_queue_count--; - QUEUE_REMOVE(&req->queue); - QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); - } -} - - static void uv__udp_run_completed(uv_udp_t* handle) { uv_udp_send_t* req; QUEUE* q; @@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) { req = QUEUE_DATA(q, uv_udp_send_t, queue); uv__req_unregister(handle->loop, req); + handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); + handle->send_queue_count--; + if (req->bufs != req->bufsml) free(req->bufs); req->bufs = NULL; @@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) { else req->send_cb(req, req->status); } + + if (QUEUE_EMPTY(&handle->write_queue)) { + /* Pending queue and completion queue empty, stop watcher. */ + uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT); + if (!uv__io_active(&handle->io_watcher, UV__POLLIN)) + uv__handle_stop(handle); + } } static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { - if (revents & UV__POLLIN) - uv__udp_recvmsg(loop, w, revents); + uv_udp_t* handle; - if (revents & UV__POLLOUT) - uv__udp_sendmsg(loop, w, revents); + handle = container_of(w, uv_udp_t, io_watcher); + assert(handle->type == UV_UDP); + + if (revents & UV__POLLIN) + uv__udp_recvmsg(handle); + + if (revents & UV__POLLOUT) { + uv__udp_sendmsg(handle); + uv__udp_run_completed(handle); + } } -static void uv__udp_recvmsg(uv_loop_t* loop, - uv__io_t* w, - unsigned int revents) { +static void uv__udp_recvmsg(uv_udp_t* handle) { struct sockaddr_storage peer; struct msghdr h; - uv_udp_t* handle; ssize_t nread; uv_buf_t buf; int flags; int count; - handle = container_of(w, uv_udp_t, io_watcher); - assert(handle->type == UV_UDP); - assert(revents & UV__POLLIN); - assert(handle->recv_cb != NULL); assert(handle->alloc_cb != NULL); @@ -247,35 +204,47 @@ static void uv__udp_recvmsg(uv_loop_t* loop, } -static void uv__udp_sendmsg(uv_loop_t* loop, - uv__io_t* w, - unsigned int revents) { - uv_udp_t* handle; - - handle = container_of(w, uv_udp_t, io_watcher); - assert(handle->type == UV_UDP); - assert(revents & UV__POLLOUT); +static void uv__udp_sendmsg(uv_udp_t* handle) { + uv_udp_send_t* req; + QUEUE* q; + struct msghdr h; + ssize_t size; assert(!QUEUE_EMPTY(&handle->write_queue) || !QUEUE_EMPTY(&handle->write_completed_queue)); - /* Write out pending data first. */ - uv__udp_run_pending(handle); + while (!QUEUE_EMPTY(&handle->write_queue)) { + q = QUEUE_HEAD(&handle->write_queue); + assert(q != NULL); - /* Drain 'request completed' queue. */ - uv__udp_run_completed(handle); + req = QUEUE_DATA(q, uv_udp_send_t, queue); + assert(req != NULL); - if (!QUEUE_EMPTY(&handle->write_completed_queue)) { - /* Schedule completion callbacks. */ + memset(&h, 0, sizeof h); + h.msg_name = &req->addr; + h.msg_namelen = (req->addr.ss_family == AF_INET6 ? + sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in)); + h.msg_iov = (struct iovec*) req->bufs; + h.msg_iovlen = req->nbufs; + + do { + size = sendmsg(handle->io_watcher.fd, &h, 0); + } while (size == -1 && errno == EINTR); + + if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + break; + + req->status = (size == -1 ? -errno : size); + + /* Sending a datagram is an atomic operation: either all data + * is written or nothing is (and EMSGSIZE is raised). That is + * why we don't handle partial writes. Just pop the request + * off the write queue and onto the completed queue, done. + */ + QUEUE_REMOVE(&req->queue); + QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); uv__io_feed(handle->loop, &handle->io_watcher); } - else if (QUEUE_EMPTY(&handle->write_queue)) { - /* Pending queue and completion queue empty, stop watcher. */ - uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT); - - if (!uv__io_active(&handle->io_watcher, UV__POLLIN)) - uv__handle_stop(handle); - } } @@ -415,6 +384,7 @@ int uv__udp_send(uv_udp_send_t* req, unsigned int addrlen, uv_udp_send_cb send_cb) { int err; + int empty_queue; assert(nbufs > 0); @@ -422,8 +392,13 @@ int uv__udp_send(uv_udp_send_t* req, if (err) return err; - uv__req_init(handle->loop, req, UV_UDP_SEND); + /* It's legal for send_queue_count > 0 even when the write_queue is empty; + * it means there are error-state requests in the write_completed_queue that + * will touch up send_queue_size/count later. + */ + empty_queue = (handle->send_queue_count == 0); + uv__req_init(handle->loop, req, UV_UDP_SEND); assert(addrlen <= sizeof(req->addr)); memcpy(&req->addr, addr, addrlen); req->send_cb = send_cb; @@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req, handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); handle->send_queue_count++; QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue); - uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT); uv__handle_start(handle); + if (empty_queue) + uv__udp_sendmsg(handle); + else + uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT); + return 0; }