freebsd,linux: add recvmmsg() + sendmmsg() udp implementation

This commits adds support for recvmmsg() and sendmmsg() extensions to
recvmsg() and sendmsg() that allows the caller to receive and send
multiple message from a socket using a single system call. This has
performance benefits for some applications.

Co-authored-by: Ondřej Surý <ondrej@sury.org>
Co-authored-by: Witold Kręcicki <wpk@culm.net>

PR-URL: https://github.com/libuv/libuv/pull/2532
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Saúl Ibarra Corretgé <s@saghul.net>
Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
This commit is contained in:
Marek Vavrusa 2015-07-08 18:57:40 +02:00 committed by Saúl Ibarra Corretgé
parent 56598f3d10
commit 3d7136639a
7 changed files with 271 additions and 24 deletions

View File

@ -42,6 +42,12 @@ Data types
* any traffic, in effect "stealing" the port from the previous listener. * any traffic, in effect "stealing" the port from the previous listener.
*/ */
UV_UDP_REUSEADDR = 4 UV_UDP_REUSEADDR = 4
/*
* Indicates that the message was received by recvmmsg and that it's not at
* the beginning of the buffer allocated by alloc_cb - so the buffer provided
* must not be freed by the recv_cb callback.
*/
UV_UDP_MMSG_CHUNK = 8
}; };
.. c:type:: void (*uv_udp_send_cb)(uv_udp_send_t* req, int status) .. c:type:: void (*uv_udp_send_cb)(uv_udp_send_t* req, int status)
@ -62,12 +68,13 @@ Data types
* `buf`: :c:type:`uv_buf_t` with the received data. * `buf`: :c:type:`uv_buf_t` with the received data.
* `addr`: ``struct sockaddr*`` containing the address of the sender. * `addr`: ``struct sockaddr*`` containing the address of the sender.
Can be NULL. Valid for the duration of the callback only. Can be NULL. Valid for the duration of the callback only.
* `flags`: One or more or'ed UV_UDP_* constants. Right now only * `flags`: One or more or'ed UV_UDP_* constants.
``UV_UDP_PARTIAL`` is used.
The callee is responsible for freeing the buffer, libuv does not reuse it. The callee is responsible for freeing the buffer, libuv does not reuse it.
The buffer may be a null buffer (where `buf->base` == NULL and `buf->len` == 0) The buffer may be a null buffer (where `buf->base` == NULL and `buf->len` == 0)
on error. on error. Don't free the buffer when the UV_UDP_MMSG_CHUNK flag is set.
The final callback receives the whole buffer (containing the first chunk)
with the UV_UDP_MMSG_CHUNK flag cleared.
.. note:: .. note::
The receive callback will be called with `nread` == 0 and `addr` == NULL when there is The receive callback will be called with `nread` == 0 and `addr` == NULL when there is

View File

@ -605,7 +605,13 @@ enum uv_udp_flags {
* (provided they all set the flag) but only the last one to bind will receive * (provided they all set the flag) but only the last one to bind will receive
* any traffic, in effect "stealing" the port from the previous listener. * any traffic, in effect "stealing" the port from the previous listener.
*/ */
UV_UDP_REUSEADDR = 4 UV_UDP_REUSEADDR = 4,
/*
* Indicates that the message was received by recvmmsg and that it's not at
* the beginning of the buffer allocated by alloc_cb - so the buffer provided
* must not be freed by the recv_cb callback.
*/
UV_UDP_MMSG_CHUNK = 8
}; };
typedef void (*uv_udp_send_cb)(uv_udp_send_t* req, int status); typedef void (*uv_udp_send_cb)(uv_udp_send_t* req, int status);

View File

@ -288,3 +288,28 @@ int uv_cpu_info(uv_cpu_info_t** cpu_infos, int* count) {
uv__free(cp_times); uv__free(cp_times);
return 0; return 0;
} }
int uv__sendmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags) {
#if __FreeBSD__ >= 11
return sendmmsg(fd, mmsg, vlen, flags);
#else
return errno = ENOSYS, -1;
#endif
}
int uv__recvmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags,
struct timespec* timeout) {
#if __FreeBSD__ >= 11
return recvmmsg(fd, mmsg, vlen, flags, timeout);
#else
return errno = ENOSYS, -1;
#endif
}

View File

@ -31,6 +31,7 @@
#include <fcntl.h> /* O_CLOEXEC and O_NONBLOCK, if supported. */ #include <fcntl.h> /* O_CLOEXEC and O_NONBLOCK, if supported. */
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
#include <sys/socket.h>
#if defined(__STRICT_ANSI__) #if defined(__STRICT_ANSI__)
# define inline __inline # define inline __inline
@ -327,4 +328,27 @@ int uv__getsockpeername(const uv_handle_t* handle,
struct sockaddr* name, struct sockaddr* name,
int* namelen); int* namelen);
#if defined(__linux__) || \
defined(__FreeBSD__) || \
defined(__FreeBSD_kernel__)
#define HAVE_MMSG 1
struct uv__mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
int uv__recvmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags,
struct timespec* timeout);
int uv__sendmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags);
#else
#define HAVE_MMSG 0
#endif
#endif /* UV_UNIX_INTERNAL_H_ */ #endif /* UV_UNIX_INTERNAL_H_ */

View File

@ -126,6 +126,7 @@
# endif # endif
#endif /* __NR_getrandom */ #endif /* __NR_getrandom */
struct uv__mmsghdr;
int uv__sendmmsg(int fd, int uv__sendmmsg(int fd,
struct uv__mmsghdr* mmsg, struct uv__mmsghdr* mmsg,

View File

@ -61,20 +61,6 @@ struct uv__statx {
uint64_t unused1[14]; uint64_t unused1[14];
}; };
struct uv__mmsghdr {
struct msghdr msg_hdr;
unsigned int msg_len;
};
int uv__recvmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags,
struct timespec* timeout);
int uv__sendmmsg(int fd,
struct uv__mmsghdr* mmsg,
unsigned int vlen,
unsigned int flags);
ssize_t uv__preadv(int fd, const struct iovec *iov, int iovcnt, int64_t offset); ssize_t uv__preadv(int fd, const struct iovec *iov, int iovcnt, int64_t offset);
ssize_t uv__pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset); ssize_t uv__pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset);
int uv__dup3(int oldfd, int newfd, int flags); int uv__dup3(int oldfd, int newfd, int flags);

View File

@ -32,6 +32,8 @@
#endif #endif
#include <sys/un.h> #include <sys/un.h>
#define UV__UDP_DGRAM_MAXSIZE (64 * 1024)
#if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP) #if defined(IPV6_JOIN_GROUP) && !defined(IPV6_ADD_MEMBERSHIP)
# define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP # define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP
#endif #endif
@ -49,6 +51,36 @@ static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
int domain, int domain,
unsigned int flags); unsigned int flags);
#if HAVE_MMSG
#define UV__MMSG_MAXWIDTH 20
static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf);
static void uv__udp_sendmmsg(uv_udp_t* handle);
static int uv__recvmmsg_avail;
static int uv__sendmmsg_avail;
static uv_once_t once = UV_ONCE_INIT;
static void uv__udp_mmsg_init(void) {
int ret;
int s;
s = uv__socket(AF_INET, SOCK_DGRAM, 0);
if (s < 0)
return;
ret = uv__sendmmsg(s, NULL, 0, 0);
if (ret == 0 || errno != ENOSYS) {
uv__sendmmsg_avail = 1;
uv__recvmmsg_avail = 1;
} else {
ret = uv__recvmmsg(s, NULL, 0, 0, NULL);
if (ret == 0 || errno != ENOSYS)
uv__recvmmsg_avail = 1;
}
uv__close(s);
}
#endif
void uv__udp_close(uv_udp_t* handle) { void uv__udp_close(uv_udp_t* handle) {
uv__io_close(handle->loop, &handle->io_watcher); uv__io_close(handle->loop, &handle->io_watcher);
@ -148,6 +180,60 @@ static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
} }
} }
#if HAVE_MMSG
static int uv__udp_recvmmsg(uv_udp_t* handle, uv_buf_t* buf) {
struct sockaddr_in6 peers[UV__MMSG_MAXWIDTH];
struct iovec iov[UV__MMSG_MAXWIDTH];
struct uv__mmsghdr msgs[UV__MMSG_MAXWIDTH];
ssize_t nread;
uv_buf_t chunk_buf;
size_t chunks;
int flags;
size_t k;
/* prepare structures for recvmmsg */
chunks = buf->len / UV__UDP_DGRAM_MAXSIZE;
if (chunks > ARRAY_SIZE(iov))
chunks = ARRAY_SIZE(iov);
for (k = 0; k < chunks; ++k) {
iov[k].iov_base = buf->base + k * UV__UDP_DGRAM_MAXSIZE;
iov[k].iov_len = UV__UDP_DGRAM_MAXSIZE;
msgs[k].msg_hdr.msg_iov = iov + k;
msgs[k].msg_hdr.msg_iovlen = 1;
msgs[k].msg_hdr.msg_name = peers + k;
msgs[k].msg_hdr.msg_namelen = sizeof(peers[0]);
}
do
nread = uv__recvmmsg(handle->io_watcher.fd, msgs, chunks, 0, NULL);
while (nread == -1 && errno == EINTR);
if (nread < 1) {
if (nread == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
handle->recv_cb(handle, 0, buf, NULL, 0);
else
handle->recv_cb(handle, UV__ERR(errno), buf, NULL, 0);
} else {
/* count to zero, so the buffer base comes last */
for (k = nread; k > 0 && handle->recv_cb != NULL;) {
k--;
flags = 0;
if (msgs[k].msg_hdr.msg_flags & MSG_TRUNC)
flags |= UV_UDP_PARTIAL;
if (k != 0)
flags |= UV_UDP_MMSG_CHUNK;
chunk_buf = uv_buf_init(iov[k].iov_base, iov[k].iov_len);
handle->recv_cb(handle,
msgs[k].msg_len,
&chunk_buf,
msgs[k].msg_hdr.msg_name,
flags);
}
}
return nread;
}
#endif
static void uv__udp_recvmsg(uv_udp_t* handle) { static void uv__udp_recvmsg(uv_udp_t* handle) {
struct sockaddr_storage peer; struct sockaddr_storage peer;
@ -167,13 +253,27 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
do { do {
buf = uv_buf_init(NULL, 0); buf = uv_buf_init(NULL, 0);
handle->alloc_cb((uv_handle_t*) handle, 64 * 1024, &buf); handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
if (buf.base == NULL || buf.len == 0) { if (buf.base == NULL || buf.len == 0) {
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0); handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
return; return;
} }
assert(buf.base != NULL); assert(buf.base != NULL);
#if HAVE_MMSG
uv_once(&once, uv__udp_mmsg_init);
if (uv__recvmmsg_avail) {
/* Returned space for more than 1 datagram, use it to receive
* multiple datagrams. */
if (buf.len >= 2 * UV__UDP_DGRAM_MAXSIZE) {
nread = uv__udp_recvmmsg(handle, &buf);
if (nread > 0)
count -= nread;
continue;
}
}
#endif
memset(&h, 0, sizeof(h)); memset(&h, 0, sizeof(h));
memset(&peer, 0, sizeof(peer)); memset(&peer, 0, sizeof(peer));
h.msg_name = &peer; h.msg_name = &peer;
@ -199,21 +299,120 @@ static void uv__udp_recvmsg(uv_udp_t* handle) {
handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags); handle->recv_cb(handle, nread, &buf, (const struct sockaddr*) &peer, flags);
} }
count--;
} }
/* recv_cb callback may decide to pause or close the handle */ /* recv_cb callback may decide to pause or close the handle */
while (nread != -1 while (nread != -1
&& count-- > 0 && count > 0
&& handle->io_watcher.fd != -1 && handle->io_watcher.fd != -1
&& handle->recv_cb != NULL); && handle->recv_cb != NULL);
} }
#if HAVE_MMSG
static void uv__udp_sendmmsg(uv_udp_t* handle) {
uv_udp_send_t* req;
struct uv__mmsghdr h[UV__MMSG_MAXWIDTH];
struct uv__mmsghdr *p;
QUEUE* q;
ssize_t npkts;
size_t pkts;
size_t i;
if (QUEUE_EMPTY(&handle->write_queue))
return;
write_queue_drain:
for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue);
pkts < UV__MMSG_MAXWIDTH && q != &handle->write_queue;
++pkts, q = QUEUE_HEAD(q)) {
assert(q != NULL);
req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);
p = &h[pkts];
memset(p, 0, sizeof(*p));
if (req->addr.ss_family == AF_UNSPEC) {
p->msg_hdr.msg_name = NULL;
p->msg_hdr.msg_namelen = 0;
} else {
p->msg_hdr.msg_name = &req->addr;
if (req->addr.ss_family == AF_INET6)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in6);
else if (req->addr.ss_family == AF_INET)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_in);
else if (req->addr.ss_family == AF_UNIX)
p->msg_hdr.msg_namelen = sizeof(struct sockaddr_un);
else {
assert(0 && "unsupported address family");
abort();
}
}
h[pkts].msg_hdr.msg_iov = (struct iovec*) req->bufs;
h[pkts].msg_hdr.msg_iovlen = req->nbufs;
}
do
npkts = uv__sendmmsg(handle->io_watcher.fd, h, pkts, 0);
while (npkts == -1 && errno == EINTR);
if (npkts < 1) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS)
return;
for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
i < pkts && q != &handle->write_queue;
++i, q = QUEUE_HEAD(q)) {
assert(q != NULL);
req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);
req->status = UV__ERR(errno);
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}
uv__io_feed(handle->loop, &handle->io_watcher);
return;
}
for (i = 0, q = QUEUE_HEAD(&handle->write_queue);
i < pkts && q != &handle->write_queue;
++i, q = QUEUE_HEAD(&handle->write_queue)) {
assert(q != NULL);
req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);
req->status = req->bufs[0].len;
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}
/* couldn't batch everything, continue sending (jump to avoid stack growth) */
if (!QUEUE_EMPTY(&handle->write_queue))
goto write_queue_drain;
uv__io_feed(handle->loop, &handle->io_watcher);
return;
}
#endif
static void uv__udp_sendmsg(uv_udp_t* handle) { static void uv__udp_sendmsg(uv_udp_t* handle) {
uv_udp_send_t* req; uv_udp_send_t* req;
QUEUE* q;
struct msghdr h; struct msghdr h;
QUEUE* q;
ssize_t size; ssize_t size;
#if HAVE_MMSG
uv_once(&once, uv__udp_mmsg_init);
if (uv__sendmmsg_avail) {
uv__udp_sendmmsg(handle);
return;
}
#endif
while (!QUEUE_EMPTY(&handle->write_queue)) { while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue); q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL); assert(q != NULL);
@ -263,7 +462,6 @@ static void uv__udp_sendmsg(uv_udp_t* handle) {
} }
} }
/* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
* refinements for programs that use multicast. * refinements for programs that use multicast.
* *