vquic: recv_mmsg, use fewer, but larger buffers

Reported-by: koujaz on github
Fixes #15267
Closes #15454
This commit is contained in:
Stefan Eissing 2024-10-30 12:09:07 +01:00 committed by Daniel Stenberg
parent 922235e56b
commit 9b863ac670
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
4 changed files with 97 additions and 12 deletions

View File

@ -4248,6 +4248,51 @@ void Curl_multi_xfer_ulbuf_release(struct Curl_easy *data, char *buf)
data->multi->xfer_ulbuf_borrowed = FALSE; data->multi->xfer_ulbuf_borrowed = FALSE;
} }
CURLcode Curl_multi_xfer_sockbuf_borrow(struct Curl_easy *data,
size_t blen, char **pbuf)
{
DEBUGASSERT(data);
DEBUGASSERT(data->multi);
*pbuf = NULL;
if(!data->multi) {
failf(data, "transfer has no multi handle");
return CURLE_FAILED_INIT;
}
if(data->multi->xfer_sockbuf_borrowed) {
failf(data, "attempt to borrow xfer_sockbuf when already borrowed");
return CURLE_AGAIN;
}
if(data->multi->xfer_sockbuf && blen > data->multi->xfer_sockbuf_len) {
/* not large enough, get a new one */
free(data->multi->xfer_sockbuf);
data->multi->xfer_sockbuf = NULL;
data->multi->xfer_sockbuf_len = 0;
}
if(!data->multi->xfer_sockbuf) {
data->multi->xfer_sockbuf = malloc(blen);
if(!data->multi->xfer_sockbuf) {
failf(data, "could not allocate xfer_sockbuf of %zu bytes", blen);
return CURLE_OUT_OF_MEMORY;
}
data->multi->xfer_sockbuf_len = blen;
}
data->multi->xfer_sockbuf_borrowed = TRUE;
*pbuf = data->multi->xfer_sockbuf;
return CURLE_OK;
}
void Curl_multi_xfer_sockbuf_release(struct Curl_easy *data, char *buf)
{
(void)buf;
DEBUGASSERT(data);
DEBUGASSERT(data->multi);
DEBUGASSERT(!buf || data->multi->xfer_sockbuf == buf);
data->multi->xfer_sockbuf_borrowed = FALSE;
}
static void multi_xfer_bufs_free(struct Curl_multi *multi) static void multi_xfer_bufs_free(struct Curl_multi *multi)
{ {
DEBUGASSERT(multi); DEBUGASSERT(multi);
@ -4257,6 +4302,9 @@ static void multi_xfer_bufs_free(struct Curl_multi *multi)
Curl_safefree(multi->xfer_ulbuf); Curl_safefree(multi->xfer_ulbuf);
multi->xfer_ulbuf_len = 0; multi->xfer_ulbuf_len = 0;
multi->xfer_ulbuf_borrowed = FALSE; multi->xfer_ulbuf_borrowed = FALSE;
Curl_safefree(multi->xfer_sockbuf);
multi->xfer_sockbuf_len = 0;
multi->xfer_sockbuf_borrowed = FALSE;
} }
struct Curl_easy *Curl_multi_get_handle(struct Curl_multi *multi, struct Curl_easy *Curl_multi_get_handle(struct Curl_multi *multi,

View File

@ -124,6 +124,9 @@ struct Curl_multi {
/* buffer used for upload data, lazy initialized */ /* buffer used for upload data, lazy initialized */
char *xfer_ulbuf; /* the actual buffer */ char *xfer_ulbuf; /* the actual buffer */
size_t xfer_ulbuf_len; /* the allocated length */ size_t xfer_ulbuf_len; /* the allocated length */
/* buffer used for socket I/O operations, lazy initialized */
char *xfer_sockbuf; /* the actual buffer */
size_t xfer_sockbuf_len; /* the allocated length */
/* 'sockhash' is the lookup hash for socket descriptor => easy handles (note /* 'sockhash' is the lookup hash for socket descriptor => easy handles (note
the pluralis form, there can be more than one easy handle waiting on the the pluralis form, there can be more than one easy handle waiting on the
@ -181,6 +184,7 @@ struct Curl_multi {
burn */ burn */
BIT(xfer_buf_borrowed); /* xfer_buf is currently being borrowed */ BIT(xfer_buf_borrowed); /* xfer_buf is currently being borrowed */
BIT(xfer_ulbuf_borrowed); /* xfer_ulbuf is currently being borrowed */ BIT(xfer_ulbuf_borrowed); /* xfer_ulbuf is currently being borrowed */
BIT(xfer_sockbuf_borrowed); /* xfer_sockbuf is currently being borrowed */
#ifdef DEBUGBUILD #ifdef DEBUGBUILD
BIT(warned); /* true after user warned of DEBUGBUILD */ BIT(warned); /* true after user warned of DEBUGBUILD */
#endif #endif

View File

@ -144,6 +144,30 @@ CURLcode Curl_multi_xfer_ulbuf_borrow(struct Curl_easy *data,
*/ */
void Curl_multi_xfer_ulbuf_release(struct Curl_easy *data, char *buf); void Curl_multi_xfer_ulbuf_release(struct Curl_easy *data, char *buf);
/**
* Borrow the socket scratch buffer from the multi, suitable
* for the given transfer `data`. The buffer may only be used for
* direct socket I/O operation by one connection at a time and MUST be
* returned to the multi before the I/O call returns.
* Pointers into the buffer remain only valid as long as it is borrowed.
*
* @param data the easy handle
* @param blen requested length of the buffer
* @param pbuf on return, the buffer to use or NULL on error
* @return CURLE_OK when buffer is available and is returned.
* CURLE_OUT_OF_MEMORy on failure to allocate the buffer,
* CURLE_FAILED_INIT if the easy handle is without multi.
* CURLE_AGAIN if the buffer is borrowed already.
*/
CURLcode Curl_multi_xfer_sockbuf_borrow(struct Curl_easy *data,
size_t blen, char **pbuf);
/**
* Release the borrowed buffer. All references into the buffer become
* invalid after this.
* @param buf the buffer pointer borrowed for coding error checks.
*/
void Curl_multi_xfer_sockbuf_release(struct Curl_easy *data, char *buf);
/** /**
* Get the transfer handle for the given id. Returns NULL if not found. * Get the transfer handle for the given id. Returns NULL if not found.
*/ */

View File

@ -39,6 +39,7 @@
#include "curl_ngtcp2.h" #include "curl_ngtcp2.h"
#include "curl_osslq.h" #include "curl_osslq.h"
#include "curl_quiche.h" #include "curl_quiche.h"
#include "multiif.h"
#include "rand.h" #include "rand.h"
#include "vquic.h" #include "vquic.h"
#include "vquic_int.h" #include "vquic_int.h"
@ -141,8 +142,8 @@ static CURLcode do_sendmsg(struct Curl_cfilter *cf,
/* Only set this, when we need it. macOS, for example, /* Only set this, when we need it. macOS, for example,
* does not seem to like a msg_control of length 0. */ * does not seem to like a msg_control of length 0. */
msg.msg_control = msg_ctrl; msg.msg_control = msg_ctrl;
assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(uint16_t))); assert(sizeof(msg_ctrl) >= CMSG_SPACE(sizeof(int)));
msg.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); msg.msg_controllen = CMSG_SPACE(sizeof(int));
cm = CMSG_FIRSTHDR(&msg); cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP; cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT; cm->cmsg_type = UDP_SEGMENT;
@ -321,7 +322,7 @@ CURLcode vquic_send_tail_split(struct Curl_cfilter *cf, struct Curl_easy *data,
} }
#if defined(HAVE_SENDMMSG) || defined(HAVE_SENDMSG) #if defined(HAVE_SENDMMSG) || defined(HAVE_SENDMSG)
static size_t msghdr_get_udp_gro(struct msghdr *msg) static size_t vquic_msghdr_get_udp_gro(struct msghdr *msg)
{ {
int gso_size = 0; int gso_size = 0;
#if defined(__linux__) && defined(UDP_GRO) #if defined(__linux__) && defined(UDP_GRO)
@ -357,21 +358,28 @@ static CURLcode recvmmsg_packets(struct Curl_cfilter *cf,
size_t max_pkts, size_t max_pkts,
vquic_recv_pkt_cb *recv_cb, void *userp) vquic_recv_pkt_cb *recv_cb, void *userp)
{ {
#define MMSG_NUM 64 #define MMSG_NUM 16
struct iovec msg_iov[MMSG_NUM]; struct iovec msg_iov[MMSG_NUM];
struct mmsghdr mmsg[MMSG_NUM]; struct mmsghdr mmsg[MMSG_NUM];
uint8_t msg_ctrl[MMSG_NUM * CMSG_SPACE(sizeof(uint16_t))]; uint8_t msg_ctrl[MMSG_NUM * CMSG_SPACE(sizeof(int))];
uint8_t bufs[MMSG_NUM][2*1024];
struct sockaddr_storage remote_addr[MMSG_NUM]; struct sockaddr_storage remote_addr[MMSG_NUM];
size_t total_nread, pkts; size_t total_nread = 0, pkts;
int mcount, i, n; int mcount, i, n;
char errstr[STRERROR_LEN]; char errstr[STRERROR_LEN];
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
size_t gso_size; size_t gso_size;
size_t pktlen; size_t pktlen;
size_t offset, to; size_t offset, to;
char *sockbuf = NULL;
uint8_t (*bufs)[64*1024] = NULL;
DEBUGASSERT(max_pkts > 0); DEBUGASSERT(max_pkts > 0);
result = Curl_multi_xfer_sockbuf_borrow(data, MMSG_NUM * sizeof(bufs[0]),
&sockbuf);
if(result)
goto out;
bufs = (uint8_t (*)[64*1024])sockbuf;
pkts = 0; pkts = 0;
total_nread = 0; total_nread = 0;
while(pkts < max_pkts) { while(pkts < max_pkts) {
@ -384,8 +392,8 @@ static CURLcode recvmmsg_packets(struct Curl_cfilter *cf,
mmsg[i].msg_hdr.msg_iovlen = 1; mmsg[i].msg_hdr.msg_iovlen = 1;
mmsg[i].msg_hdr.msg_name = &remote_addr[i]; mmsg[i].msg_hdr.msg_name = &remote_addr[i];
mmsg[i].msg_hdr.msg_namelen = sizeof(remote_addr[i]); mmsg[i].msg_hdr.msg_namelen = sizeof(remote_addr[i]);
mmsg[i].msg_hdr.msg_control = &msg_ctrl[i]; mmsg[i].msg_hdr.msg_control = &msg_ctrl[i * CMSG_SPACE(sizeof(int))];
mmsg[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t)); mmsg[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(int));
} }
while((mcount = recvmmsg(qctx->sockfd, mmsg, n, 0, NULL)) == -1 && while((mcount = recvmmsg(qctx->sockfd, mmsg, n, 0, NULL)) == -1 &&
@ -415,7 +423,7 @@ static CURLcode recvmmsg_packets(struct Curl_cfilter *cf,
for(i = 0; i < mcount; ++i) { for(i = 0; i < mcount; ++i) {
total_nread += mmsg[i].msg_len; total_nread += mmsg[i].msg_len;
gso_size = msghdr_get_udp_gro(&mmsg[i].msg_hdr); gso_size = vquic_msghdr_get_udp_gro(&mmsg[i].msg_hdr);
if(gso_size == 0) { if(gso_size == 0) {
gso_size = mmsg[i].msg_len; gso_size = mmsg[i].msg_len;
} }
@ -443,6 +451,7 @@ out:
if(total_nread || result) if(total_nread || result)
CURL_TRC_CF(data, cf, "recvd %zu packets with %zu bytes -> %d", CURL_TRC_CF(data, cf, "recvd %zu packets with %zu bytes -> %d",
pkts, total_nread, result); pkts, total_nread, result);
Curl_multi_xfer_sockbuf_release(data, sockbuf);
return result; return result;
} }
@ -461,7 +470,7 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
ssize_t nread; ssize_t nread;
char errstr[STRERROR_LEN]; char errstr[STRERROR_LEN];
CURLcode result = CURLE_OK; CURLcode result = CURLE_OK;
uint8_t msg_ctrl[CMSG_SPACE(sizeof(uint16_t))]; uint8_t msg_ctrl[CMSG_SPACE(sizeof(int))];
size_t gso_size; size_t gso_size;
size_t pktlen; size_t pktlen;
size_t offset, to; size_t offset, to;
@ -503,7 +512,7 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
total_nread += (size_t)nread; total_nread += (size_t)nread;
gso_size = msghdr_get_udp_gro(&msg); gso_size = vquic_msghdr_get_udp_gro(&msg);
if(gso_size == 0) { if(gso_size == 0) {
gso_size = (size_t)nread; gso_size = (size_t)nread;
} }