unix: try to write immediately in uv_udp_send
This commit is contained in:
parent
bf6e90f4d6
commit
41891222bc
171
src/unix/udp.c
171
src/unix/udp.c
@ -38,10 +38,9 @@
|
|||||||
|
|
||||||
|
|
||||||
static void uv__udp_run_completed(uv_udp_t* handle);
|
static void uv__udp_run_completed(uv_udp_t* handle);
|
||||||
static void uv__udp_run_pending(uv_udp_t* handle);
|
|
||||||
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
||||||
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
static void uv__udp_recvmsg(uv_udp_t* handle);
|
||||||
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
|
static void uv__udp_sendmsg(uv_udp_t* handle);
|
||||||
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
|
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
|
||||||
int domain,
|
int domain,
|
||||||
unsigned int flags);
|
unsigned int flags);
|
||||||
@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
|||||||
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
|
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
|
||||||
assert(handle->io_watcher.fd == -1);
|
assert(handle->io_watcher.fd == -1);
|
||||||
|
|
||||||
uv__udp_run_completed(handle);
|
|
||||||
|
|
||||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||||
q = QUEUE_HEAD(&handle->write_queue);
|
q = QUEUE_HEAD(&handle->write_queue);
|
||||||
QUEUE_REMOVE(q);
|
QUEUE_REMOVE(q);
|
||||||
|
|
||||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||||
uv__req_unregister(handle->loop, req);
|
req->status = -ECANCELED;
|
||||||
|
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||||
if (req->bufs != req->bufsml)
|
|
||||||
free(req->bufs);
|
|
||||||
req->bufs = NULL;
|
|
||||||
|
|
||||||
if (req->send_cb != NULL)
|
|
||||||
req->send_cb(req, -ECANCELED);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
handle->send_queue_size = 0;
|
uv__udp_run_completed(handle);
|
||||||
handle->send_queue_count = 0;
|
|
||||||
|
assert(handle->send_queue_size == 0);
|
||||||
|
assert(handle->send_queue_count == 0);
|
||||||
|
|
||||||
/* Now tear down the handle. */
|
/* Now tear down the handle. */
|
||||||
handle->recv_cb = NULL;
|
handle->recv_cb = NULL;
|
||||||
@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void uv__udp_run_pending(uv_udp_t* handle) {
|
|
||||||
uv_udp_send_t* req;
|
|
||||||
QUEUE* q;
|
|
||||||
struct msghdr h;
|
|
||||||
ssize_t size;
|
|
||||||
|
|
||||||
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
|
||||||
q = QUEUE_HEAD(&handle->write_queue);
|
|
||||||
assert(q != NULL);
|
|
||||||
|
|
||||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
|
||||||
assert(req != NULL);
|
|
||||||
|
|
||||||
memset(&h, 0, sizeof h);
|
|
||||||
h.msg_name = &req->addr;
|
|
||||||
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
|
|
||||||
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
|
|
||||||
h.msg_iov = (struct iovec*) req->bufs;
|
|
||||||
h.msg_iovlen = req->nbufs;
|
|
||||||
|
|
||||||
do {
|
|
||||||
size = sendmsg(handle->io_watcher.fd, &h, 0);
|
|
||||||
}
|
|
||||||
while (size == -1 && errno == EINTR);
|
|
||||||
|
|
||||||
/* TODO try to write once or twice more in the
|
|
||||||
* hope that the socket becomes readable again?
|
|
||||||
*/
|
|
||||||
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
|
||||||
break;
|
|
||||||
|
|
||||||
req->status = (size == -1 ? -errno : size);
|
|
||||||
|
|
||||||
/* Sending a datagram is an atomic operation: either all data
|
|
||||||
* is written or nothing is (and EMSGSIZE is raised). That is
|
|
||||||
* why we don't handle partial writes. Just pop the request
|
|
||||||
* off the write queue and onto the completed queue, done.
|
|
||||||
*/
|
|
||||||
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
|
||||||
handle->send_queue_count--;
|
|
||||||
QUEUE_REMOVE(&req->queue);
|
|
||||||
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void uv__udp_run_completed(uv_udp_t* handle) {
|
static void uv__udp_run_completed(uv_udp_t* handle) {
|
||||||
uv_udp_send_t* req;
|
uv_udp_send_t* req;
|
||||||
QUEUE* q;
|
QUEUE* q;
|
||||||
@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
|||||||
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||||
uv__req_unregister(handle->loop, req);
|
uv__req_unregister(handle->loop, req);
|
||||||
|
|
||||||
|
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
|
||||||
|
handle->send_queue_count--;
|
||||||
|
|
||||||
if (req->bufs != req->bufsml)
|
if (req->bufs != req->bufsml)
|
||||||
free(req->bufs);
|
free(req->bufs);
|
||||||
req->bufs = NULL;
|
req->bufs = NULL;
|
||||||
@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
|
|||||||
else
|
else
|
||||||
req->send_cb(req, req->status);
|
req->send_cb(req, req->status);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (QUEUE_EMPTY(&handle->write_queue)) {
|
||||||
|
/* Pending queue and completion queue empty, stop watcher. */
|
||||||
|
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||||
|
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
|
||||||
|
uv__handle_stop(handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
|
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
|
||||||
if (revents & UV__POLLIN)
|
uv_udp_t* handle;
|
||||||
uv__udp_recvmsg(loop, w, revents);
|
|
||||||
|
|
||||||
if (revents & UV__POLLOUT)
|
handle = container_of(w, uv_udp_t, io_watcher);
|
||||||
uv__udp_sendmsg(loop, w, revents);
|
assert(handle->type == UV_UDP);
|
||||||
|
|
||||||
|
if (revents & UV__POLLIN)
|
||||||
|
uv__udp_recvmsg(handle);
|
||||||
|
|
||||||
|
if (revents & UV__POLLOUT) {
|
||||||
|
uv__udp_sendmsg(handle);
|
||||||
|
uv__udp_run_completed(handle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void uv__udp_recvmsg(uv_loop_t* loop,
|
static void uv__udp_recvmsg(uv_udp_t* handle) {
|
||||||
uv__io_t* w,
|
|
||||||
unsigned int revents) {
|
|
||||||
struct sockaddr_storage peer;
|
struct sockaddr_storage peer;
|
||||||
struct msghdr h;
|
struct msghdr h;
|
||||||
uv_udp_t* handle;
|
|
||||||
ssize_t nread;
|
ssize_t nread;
|
||||||
uv_buf_t buf;
|
uv_buf_t buf;
|
||||||
int flags;
|
int flags;
|
||||||
int count;
|
int count;
|
||||||
|
|
||||||
handle = container_of(w, uv_udp_t, io_watcher);
|
|
||||||
assert(handle->type == UV_UDP);
|
|
||||||
assert(revents & UV__POLLIN);
|
|
||||||
|
|
||||||
assert(handle->recv_cb != NULL);
|
assert(handle->recv_cb != NULL);
|
||||||
assert(handle->alloc_cb != NULL);
|
assert(handle->alloc_cb != NULL);
|
||||||
|
|
||||||
@ -247,35 +204,47 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void uv__udp_sendmsg(uv_loop_t* loop,
|
static void uv__udp_sendmsg(uv_udp_t* handle) {
|
||||||
uv__io_t* w,
|
uv_udp_send_t* req;
|
||||||
unsigned int revents) {
|
QUEUE* q;
|
||||||
uv_udp_t* handle;
|
struct msghdr h;
|
||||||
|
ssize_t size;
|
||||||
handle = container_of(w, uv_udp_t, io_watcher);
|
|
||||||
assert(handle->type == UV_UDP);
|
|
||||||
assert(revents & UV__POLLOUT);
|
|
||||||
|
|
||||||
assert(!QUEUE_EMPTY(&handle->write_queue)
|
assert(!QUEUE_EMPTY(&handle->write_queue)
|
||||||
|| !QUEUE_EMPTY(&handle->write_completed_queue));
|
|| !QUEUE_EMPTY(&handle->write_completed_queue));
|
||||||
|
|
||||||
/* Write out pending data first. */
|
while (!QUEUE_EMPTY(&handle->write_queue)) {
|
||||||
uv__udp_run_pending(handle);
|
q = QUEUE_HEAD(&handle->write_queue);
|
||||||
|
assert(q != NULL);
|
||||||
|
|
||||||
/* Drain 'request completed' queue. */
|
req = QUEUE_DATA(q, uv_udp_send_t, queue);
|
||||||
uv__udp_run_completed(handle);
|
assert(req != NULL);
|
||||||
|
|
||||||
if (!QUEUE_EMPTY(&handle->write_completed_queue)) {
|
memset(&h, 0, sizeof h);
|
||||||
/* Schedule completion callbacks. */
|
h.msg_name = &req->addr;
|
||||||
|
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
|
||||||
|
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
|
||||||
|
h.msg_iov = (struct iovec*) req->bufs;
|
||||||
|
h.msg_iovlen = req->nbufs;
|
||||||
|
|
||||||
|
do {
|
||||||
|
size = sendmsg(handle->io_watcher.fd, &h, 0);
|
||||||
|
} while (size == -1 && errno == EINTR);
|
||||||
|
|
||||||
|
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
|
||||||
|
break;
|
||||||
|
|
||||||
|
req->status = (size == -1 ? -errno : size);
|
||||||
|
|
||||||
|
/* Sending a datagram is an atomic operation: either all data
|
||||||
|
* is written or nothing is (and EMSGSIZE is raised). That is
|
||||||
|
* why we don't handle partial writes. Just pop the request
|
||||||
|
* off the write queue and onto the completed queue, done.
|
||||||
|
*/
|
||||||
|
QUEUE_REMOVE(&req->queue);
|
||||||
|
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
|
||||||
uv__io_feed(handle->loop, &handle->io_watcher);
|
uv__io_feed(handle->loop, &handle->io_watcher);
|
||||||
}
|
}
|
||||||
else if (QUEUE_EMPTY(&handle->write_queue)) {
|
|
||||||
/* Pending queue and completion queue empty, stop watcher. */
|
|
||||||
uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
|
|
||||||
|
|
||||||
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
|
|
||||||
uv__handle_stop(handle);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -415,6 +384,7 @@ int uv__udp_send(uv_udp_send_t* req,
|
|||||||
unsigned int addrlen,
|
unsigned int addrlen,
|
||||||
uv_udp_send_cb send_cb) {
|
uv_udp_send_cb send_cb) {
|
||||||
int err;
|
int err;
|
||||||
|
int empty_queue;
|
||||||
|
|
||||||
assert(nbufs > 0);
|
assert(nbufs > 0);
|
||||||
|
|
||||||
@ -422,8 +392,13 @@ int uv__udp_send(uv_udp_send_t* req,
|
|||||||
if (err)
|
if (err)
|
||||||
return err;
|
return err;
|
||||||
|
|
||||||
uv__req_init(handle->loop, req, UV_UDP_SEND);
|
/* It's legal for send_queue_count > 0 even when the write_queue is empty;
|
||||||
|
* it means there are error-state requests in the write_completed_queue that
|
||||||
|
* will touch up send_queue_size/count later.
|
||||||
|
*/
|
||||||
|
empty_queue = (handle->send_queue_count == 0);
|
||||||
|
|
||||||
|
uv__req_init(handle->loop, req, UV_UDP_SEND);
|
||||||
assert(addrlen <= sizeof(req->addr));
|
assert(addrlen <= sizeof(req->addr));
|
||||||
memcpy(&req->addr, addr, addrlen);
|
memcpy(&req->addr, addr, addrlen);
|
||||||
req->send_cb = send_cb;
|
req->send_cb = send_cb;
|
||||||
@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
|
|||||||
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
|
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
|
||||||
handle->send_queue_count++;
|
handle->send_queue_count++;
|
||||||
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
|
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
|
||||||
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
|
||||||
uv__handle_start(handle);
|
uv__handle_start(handle);
|
||||||
|
|
||||||
|
if (empty_queue)
|
||||||
|
uv__udp_sendmsg(handle);
|
||||||
|
else
|
||||||
|
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user