unix: try to write immediately in uv_udp_send
This commit is contained in:
parent
bf6e90f4d6
commit
41891222bc
171
src/unix/udp.c
171
src/unix/udp.c
@ -38,10 +38,9 @@
|
||||
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle);
|
||||
static void uv__udp_run_pending(uv_udp_t* handle);
|
||||
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
||||
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
||||
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle);
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle);
|
||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
|
||||
int domain,
|
||||
unsigned int flags);
|
||||
@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
|
||||
assert(handle->io_watcher.fd == -1);
|
||||
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_queue);
|
||||
QUEUE_REMOVE(q);
|
||||
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
uv__req_unregister(handle->loop, req);
|
||||
|
||||
if (req->bufs != req->bufsml)
|
||||
free(req->bufs);
|
||||
req->bufs = NULL;
|
||||
|
||||
if (req->send_cb != NULL)
|
||||
req->send_cb(req, -ECANCELED);
|
||||
req->status = -ECANCELED;
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
|
||||
handle->send_queue_size = 0;
|
||||
handle->send_queue_count = 0;
|
||||
uv__udp_run_completed(handle);
|
||||
|
||||
assert(handle->send_queue_size == 0);
|
||||
assert(handle->send_queue_count == 0);
|
||||
|
||||
/* Now tear down the handle. */
|
||||
handle->recv_cb = NULL;
|
||||
@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_pending(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
QUEUE* q;
|
||||
struct msghdr h;
|
||||
ssize_t size;
|
||||
|
||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &req->addr;
|
||||
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
|
||||
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
|
||||
h.msg_iov = (struct iovec*) req->bufs;
|
||||
h.msg_iovlen = req->nbufs;
|
||||
|
||||
do {
|
||||
size = sendmsg(handle->io_watcher.fd, &h, 0);
|
||||
}
|
||||
while (size == -1 && errno == EINTR);
|
||||
|
||||
/* TODO try to write once or twice more in the
|
||||
* hope that the socket becomes readable again?
|
||||
*/
|
||||
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
break;
|
||||
|
||||
req->status = (size == -1 ? -errno : size);
|
||||
|
||||
/* Sending a datagram is an atomic operation: either all data
|
||||
* is written or nothing is (and EMSGSIZE is raised). That is
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count--;
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
QUEUE* q;
|
||||
@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
uv__req_unregister(handle->loop, req);
|
||||
|
||||
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count--;
|
||||
|
||||
if (req->bufs != req->bufsml)
|
||||
free(req->bufs);
|
||||
req->bufs = NULL;
|
||||
@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||
else
|
||||
req->send_cb(req, req->status);
|
||||
}
|
||||
|
||||
if (QUEUE_EMPTY(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
|
||||
uv__handle_stop(handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
|
||||
if (revents & UV__POLLIN)
|
||||
uv__udp_recvmsg(loop, w, revents);
|
||||
uv_udp_t* handle;
|
||||
|
||||
if (revents & UV__POLLOUT)
|
||||
uv__udp_sendmsg(loop, w, revents);
|
||||
handle = container_of(w, uv_udp_t, io_watcher);
|
||||
assert(handle->type == UV_UDP);
|
||||
|
||||
if (revents & UV__POLLIN)
|
||||
uv__udp_recvmsg(handle);
|
||||
|
||||
if (revents & UV__POLLOUT) {
|
||||
uv__udp_sendmsg(handle);
|
||||
uv__udp_run_completed(handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_recvmsg(uv_loop_t* loop,
|
||||
uv__io_t* w,
|
||||
unsigned int revents) {
|
||||
static void uv__udp_recvmsg(uv_udp_t* handle) {
|
||||
struct sockaddr_storage peer;
|
||||
struct msghdr h;
|
||||
uv_udp_t* handle;
|
||||
ssize_t nread;
|
||||
uv_buf_t buf;
|
||||
int flags;
|
||||
int count;
|
||||
|
||||
handle = container_of(w, uv_udp_t, io_watcher);
|
||||
assert(handle->type == UV_UDP);
|
||||
assert(revents & UV__POLLIN);
|
||||
|
||||
assert(handle->recv_cb != NULL);
|
||||
assert(handle->alloc_cb != NULL);
|
||||
|
||||
@ -247,35 +204,47 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
|
||||
}
|
||||
|
||||
|
||||
static void uv__udp_sendmsg(uv_loop_t* loop,
|
||||
uv__io_t* w,
|
||||
unsigned int revents) {
|
||||
uv_udp_t* handle;
|
||||
|
||||
handle = container_of(w, uv_udp_t, io_watcher);
|
||||
assert(handle->type == UV_UDP);
|
||||
assert(revents & UV__POLLOUT);
|
||||
static void uv__udp_sendmsg(uv_udp_t* handle) {
|
||||
uv_udp_send_t* req;
|
||||
QUEUE* q;
|
||||
struct msghdr h;
|
||||
ssize_t size;
|
||||
|
||||
assert(!QUEUE_EMPTY(&handle->write_queue)
|
||||
|| !QUEUE_EMPTY(&handle->write_completed_queue));
|
||||
|
||||
/* Write out pending data first. */
|
||||
uv__udp_run_pending(handle);
|
||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||
q = QUEUE_HEAD(&handle->write_queue);
|
||||
assert(q != NULL);
|
||||
|
||||
/* Drain 'request completed' queue. */
|
||||
uv__udp_run_completed(handle);
|
||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||
assert(req != NULL);
|
||||
|
||||
if (!QUEUE_EMPTY(&handle->write_completed_queue)) {
|
||||
/* Schedule completion callbacks. */
|
||||
memset(&h, 0, sizeof h);
|
||||
h.msg_name = &req->addr;
|
||||
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
|
||||
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
|
||||
h.msg_iov = (struct iovec*) req->bufs;
|
||||
h.msg_iovlen = req->nbufs;
|
||||
|
||||
do {
|
||||
size = sendmsg(handle->io_watcher.fd, &h, 0);
|
||||
} while (size == -1 && errno == EINTR);
|
||||
|
||||
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||
break;
|
||||
|
||||
req->status = (size == -1 ? -errno : size);
|
||||
|
||||
/* Sending a datagram is an atomic operation: either all data
|
||||
* is written or nothing is (and EMSGSIZE is raised). That is
|
||||
* why we don't handle partial writes. Just pop the request
|
||||
* off the write queue and onto the completed queue, done.
|
||||
*/
|
||||
QUEUE_REMOVE(&req->queue);
|
||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||
uv__io_feed(handle->loop, &handle->io_watcher);
|
||||
}
|
||||
else if (QUEUE_EMPTY(&handle->write_queue)) {
|
||||
/* Pending queue and completion queue empty, stop watcher. */
|
||||
uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
|
||||
|
||||
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
|
||||
uv__handle_stop(handle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -415,6 +384,7 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
unsigned int addrlen,
|
||||
uv_udp_send_cb send_cb) {
|
||||
int err;
|
||||
int empty_queue;
|
||||
|
||||
assert(nbufs > 0);
|
||||
|
||||
@ -422,8 +392,13 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
if (err)
|
||||
return err;
|
||||
|
||||
uv__req_init(handle->loop, req, UV_UDP_SEND);
|
||||
/* It's legal for send_queue_count > 0 even when the write_queue is empty;
|
||||
* it means there are error-state requests in the write_completed_queue that
|
||||
* will touch up send_queue_size/count later.
|
||||
*/
|
||||
empty_queue = (handle->send_queue_count == 0);
|
||||
|
||||
uv__req_init(handle->loop, req, UV_UDP_SEND);
|
||||
assert(addrlen <= sizeof(req->addr));
|
||||
memcpy(&req->addr, addr, addrlen);
|
||||
req->send_cb = send_cb;
|
||||
@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
|
||||
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
|
||||
handle->send_queue_count++;
|
||||
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
|
||||
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||
uv__handle_start(handle);
|
||||
|
||||
if (empty_queue)
|
||||
uv__udp_sendmsg(handle);
|
||||
else
|
||||
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user