diff --git a/docs/src/udp.rst b/docs/src/udp.rst index 53b1fea4..2ca0736b 100644 --- a/docs/src/udp.rst +++ b/docs/src/udp.rst @@ -42,6 +42,12 @@ Data types * any traffic, in effect "stealing" the port from the previous listener. */ UV_UDP_REUSEADDR = 4 + /* + * Indicates that the message was received by recvmmsg and that it's not at + * the beginning of the buffer allocated by alloc_cb - so the buffer provided + * must not be freed by the recv_cb callback. + */ + UV_UDP_MMSG_CHUNK = 8 }; .. c:type:: void (*uv_udp_send_cb)(uv_udp_send_t* req, int status) @@ -62,12 +68,13 @@ Data types * `buf`: :c:type:`uv_buf_t` with the received data. * `addr`: ``struct sockaddr*`` containing the address of the sender. Can be NULL. Valid for the duration of the callback only. - * `flags`: One or more or'ed UV_UDP_* constants. Right now only - ``UV_UDP_PARTIAL`` is used. + * `flags`: One or more or'ed UV_UDP_* constants. The callee is responsible for freeing the buffer, libuv does not reuse it. The buffer may be a null buffer (where `buf->base` == NULL and `buf->len` == 0) - on error. + on error. Don't free the buffer when the UV_UDP_MMSG_CHUNK flag is set. + The final callback receives the whole buffer (containing the first chunk) + with the UV_UDP_MMSG_CHUNK flag cleared. .. note:: The receive callback will be called with `nread` == 0 and `addr` == NULL when there is diff --git a/include/uv.h b/include/uv.h index 626cebab..defc0ac4 100644 --- a/include/uv.h +++ b/include/uv.h @@ -605,7 +605,13 @@ enum uv_udp_flags { * (provided they all set the flag) but only the last one to bind will receive * any traffic, in effect "stealing" the port from the previous listener. */ - UV_UDP_REUSEADDR = 4 + UV_UDP_REUSEADDR = 4, + /* + * Indicates that the message was received by recvmmsg and that it's not at + * the beginning of the buffer allocated by alloc_cb - so the buffer provided + * must not be freed by the recv_cb callback. + */ + UV_UDP_MMSG_CHUNK = 8 }; typedef void (*uv_udp_send_cb)(uv_udp_send_t* req, int status); diff --git a/src/unix/freebsd.c b/src/unix/freebsd.c index 57bd04e2..ef77e127 100644 --- a/src/unix/freebsd.c +++ b/src/unix/freebsd.c @@ -288,3 +288,28 @@ int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count) { uv__free(cp_times); return 0; } + + +int uv__sendmmsg(int fd, + struct uv__mmsghdr* mmsg, + unsigned int vlen, + unsigned int flags) { +#if __FreeBSD__ >= 11 + return sendmmsg(fd, mmsg, vlen, flags); +#else + return errno = ENOSYS, -1; +#endif +} + + +int uv__recvmmsg(int fd, + struct uv__mmsghdr* mmsg, + unsigned int vlen, + unsigned int flags, + struct timespec* timeout) { +#if __FreeBSD__ >= 11 + return recvmmsg(fd, mmsg, vlen, flags, timeout); +#else + return errno = ENOSYS, -1; +#endif +} diff --git a/src/unix/internal.h b/src/unix/internal.h index 469fd7d2..598554b6 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -31,6 +31,7 @@ #include /* O_CLOEXEC and O_NONBLOCK, if supported. */ #include #include +#include #if defined(__STRICT_ANSI__) # define inline __inline @@ -327,4 +328,27 @@ int uv__getsockpeername(const uv_handle_t* handle, struct sockaddr* name, int* namelen); +#if defined(__linux__) || \ + defined(__FreeBSD__) || \ + defined(__FreeBSD_kernel__) +#define HAVE_MMSG 1 +struct uv__mmsghdr { + struct msghdr msg_hdr; + unsigned int msg_len; +}; + +int uv__recvmmsg(int fd, + struct uv__mmsghdr* mmsg, + unsigned int vlen, + unsigned int flags, + struct timespec* timeout); +int uv__sendmmsg(int fd, + struct uv__mmsghdr* mmsg, + unsigned int vlen, + unsigned int flags); +#else +#define HAVE_MMSG 0 +#endif + + #endif /* UV_UNIX_INTERNAL_H_ */ diff --git a/src/unix/linux-syscalls.c b/src/unix/linux-syscalls.c index eb5a8fd2..742f26ad 100644 --- a/src/unix/linux-syscalls.c +++ b/src/unix/linux-syscalls.c @@ -126,6 +126,7 @@ # endif #endif /* __NR_getrandom */ +struct uv__mmsghdr; int uv__sendmmsg(int fd, struct uv__mmsghdr* mmsg, diff --git a/src/unix/linux-syscalls.h b/src/unix/linux-syscalls.h index 7ee1511a..2e8fa2a5 100644 --- a/src/unix/linux-syscalls.h +++ b/src/unix/linux-syscalls.h @@ -61,20 +61,6 @@ struct uv__statx { uint64_t unused1[14]; }; -struct uv__mmsghdr { - struct msghdr msg_hdr; - unsigned int msg_len; -}; - -int uv__recvmmsg(int fd, - struct uv__mmsghdr* mmsg, - unsigned int vlen, - unsigned int flags, - struct timespec* timeout); -int uv__sendmmsg(int fd, - struct uv__mmsghdr* mmsg, - unsigned int vlen, - unsigned int flags); ssize_t uv__preadv(int fd, const struct iovec *iov, int iovcnt, int64_t offset); ssize_t uv__pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset); int uv__dup3(int oldfd, int newfd, int flags); diff --git a/src/unix/udp.c b/src/unix/udp.c index 7c5926e3..ed6b6c25 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -32,6 +32,8 @@ #endif #include +#define UV__UDP_DGRAM_MAXSIZE (64 * 1024) + #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP #endif @@ -49,6 +51,36 @@ static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain, unsigned int flags); +#if HAVE_MMSG + +#define UV__MMSG_MAXWIDTH 20 + +static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf); +static void uv__udp_sendmmsg(uv_udp_t* handle); + +static int uv__recvmmsg_avail; +static int uv__sendmmsg_avail; +static uv_once_t once = UV_ONCE_INIT; + +static void uv__udp_mmsg_init(void) { + int ret; + int s; + s = uv__socket(AF_INET, SOCK_DGRAM, 0); + if (s < 0) + return; + ret = uv__sendmmsg(s, NULL, 0, 0); + if (ret == 0 || errno != ENOSYS) { + uv__sendmmsg_avail = 1; + uv__recvmmsg_avail = 1; + } else { + ret = uv__recvmmsg(s, NULL, 0, 0, NULL); + if (ret == 0 || errno != ENOSYS) + uv__recvmmsg_avail = 1; + } + uv__close(s); +} + +#endif void uv__udp_close(uv_udp_t* handle) { uv__io_close(handle->loop, &handle->io_watcher); @@ -148,6 +180,60 @@ static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) { } } +#if HAVE_MMSG +static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) { + struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH]; + struct iovec iov[UV__MMSG_MAXWIDTH]; + struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH]; + ssize_t nread; + uv_buf_t chunk_buf; + size_t chunks; + int flags; + size_t k; + + /* prepare structures for recvmmsg */ + chunks = buf->len / UV__UDP_DGRAM_MAXSIZE; + if (chunks > ARRAY_SIZE(iov)) + chunks = ARRAY_SIZE(iov); + for (k = 0; k < chunks; ++k) { + iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE; + iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE; + msgs[k].msg_hdr.msg_iov = iov + k; + msgs[k].msg_hdr.msg_iovlen = 1; + msgs[k].msg_hdr.msg_name = peers + k; + msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]); + } + + do + nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL); + while (nread == -1 && errno == EINTR); + + if (nread < 1) { + if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK) + handle->recv_cb(handle, 0, buf, NULL, 0); + else + handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0); + } else { + /* count to zero, so the buffer base comes last */ + for (k = nread; k > 0 && handle->recv_cb != NULL;) { + k--; + flags = 0; + if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC) + flags |= UV_UDP_PARTIAL; + if (k != 0) + flags |= UV_UDP_MMSG_CHUNK; + + chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len); + handle->recv_cb(handle, + msgs[k].msg_len, + &chunk_buf, + msgs[k].msg_hdr.msg_name, + flags); + } + } + return nread; +} +#endif static void uv__udp_recvmsg(uv_udp_t* handle) { struct sockaddr_storage peer; @@ -167,13 +253,27 @@ static void uv__udp_recvmsg(uv_udp_t* handle) { do { buf = uv_buf_init(NULL, 0); - handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf); + handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf); if (buf.base == NULL || buf.len == 0) { handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); return; } assert(buf.base != NULL); +#if HAVE_MMSG + uv_once(&once, uv__udp_mmsg_init); + if (uv__recvmmsg_avail) { + /* Returned space for more than 1 datagram, use it to receive + * multiple datagrams. */ + if (buf.len >= 2 * UV__UDP_DGRAM_MAXSIZE) { + nread = uv__udp_recvmmsg(handle, &buf); + if (nread > 0) + count -= nread; + continue; + } + } +#endif + memset(&h, 0, sizeof(h)); memset(&peer, 0, sizeof(peer)); h.msg_name = &peer; @@ -199,21 +299,120 @@ static void uv__udp_recvmsg(uv_udp_t* handle) { handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); } + count--; } /* recv_cb callback may decide to pause or close the handle */ while (nread != -1 - && count-- > 0 + && count > 0 && handle->io_watcher.fd != -1 && handle->recv_cb != NULL); } +#if HAVE_MMSG +static void uv__udp_sendmmsg(uv_udp_t* handle) { + uv_udp_send_t* req; + struct uv__mmsghdr h[UV__MMSG_MAXWIDTH]; + struct uv__mmsghdr *p; + QUEUE* q; + ssize_t npkts; + size_t pkts; + size_t i; + + if (QUEUE_EMPTY(&handle->write_queue)) + return; + +write_queue_drain: + for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue); + pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue; + ++pkts, q = QUEUE_HEAD(q)) { + assert(q != NULL); + req = QUEUE_DATA(q, uv_udp_send_t, queue); + assert(req != NULL); + + p = &h[pkts]; + memset(p, 0, sizeof(*p)); + if (req->addr.ss_family == AF_UNSPEC) { + p->msg_hdr.msg_name = NULL; + p->msg_hdr.msg_namelen = 0; + } else { + p->msg_hdr.msg_name = &req->addr; + if (req->addr.ss_family == AF_INET6) + p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6); + else if (req->addr.ss_family == AF_INET) + p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in); + else if (req->addr.ss_family == AF_UNIX) + p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un); + else { + assert(0 && "unsupported address family"); + abort(); + } + } + h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs; + h[pkts].msg_hdr.msg_iovlen = req->nbufs; + } + + do + npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0); + while (npkts == -1 && errno == EINTR); + + if (npkts < 1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) + return; + for (i = 0, q = QUEUE_HEAD(&handle->write_queue); + i < pkts && q != &handle->write_queue; + ++i, q = QUEUE_HEAD(q)) { + assert(q != NULL); + req = QUEUE_DATA(q, uv_udp_send_t, queue); + assert(req != NULL); + + req->status = UV__ERR(errno); + QUEUE_REMOVE(&req->queue); + QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + } + uv__io_feed(handle->loop, &handle->io_watcher); + return; + } + + for (i = 0, q = QUEUE_HEAD(&handle->write_queue); + i < pkts && q != &handle->write_queue; + ++i, q = QUEUE_HEAD(&handle->write_queue)) { + assert(q != NULL); + req = QUEUE_DATA(q, uv_udp_send_t, queue); + assert(req != NULL); + + req->status = req->bufs[0].len; + + /* Sending a datagram is an atomic operation: either all data + * is written or nothing is (and EMSGSIZE is raised). That is + * why we don't handle partial writes. Just pop the request + * off the write queue and onto the completed queue, done. + */ + QUEUE_REMOVE(&req->queue); + QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + } + + /* couldn't batch everything, continue sending (jump to avoid stack growth) */ + if (!QUEUE_EMPTY(&handle->write_queue)) + goto write_queue_drain; + uv__io_feed(handle->loop, &handle->io_watcher); + return; +} +#endif static void uv__udp_sendmsg(uv_udp_t* handle) { uv_udp_send_t* req; - QUEUE* q; struct msghdr h; + QUEUE* q; ssize_t size; +#if HAVE_MMSG + uv_once(&once, uv__udp_mmsg_init); + if (uv__sendmmsg_avail) { + uv__udp_sendmmsg(handle); + return; + } +#endif + while (!QUEUE_EMPTY(&handle->write_queue)) { q = QUEUE_HEAD(&handle->write_queue); assert(q != NULL); @@ -263,7 +462,6 @@ static void uv__udp_sendmsg(uv_udp_t* handle) { } } - /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional * refinements for programs that use multicast. * @@ -870,7 +1068,7 @@ int uv_udp_set_source_membership(uv_udp_t* handle, src_addr6, membership); } - + err = uv_ip4_addr(source_addr, 0, src_addr4); if (err) return err; @@ -1012,7 +1210,7 @@ int uv_udp_set_multicast_loop(uv_udp_t* handle, int on) { * and use the general uv__setsockopt_maybe_char call otherwise. */ #if defined(__sun) || defined(_AIX) || defined(__OpenBSD__) || \ - defined(__MVS__) + defined(__MVS__) if (handle->flags & UV_HANDLE_IPV6) return uv__setsockopt(handle, IP_MULTICAST_LOOP,