multi: multi_wait improvements

- only call `multi_getsock()` once for all transfers
 - realloc pollset array on demand
 - fold repeated sockets

Closes #13150
This commit is contained in:
Stefan Eissing 2024-03-20 08:08:43 +01:00 committed by Daniel Stenberg
parent 303bb8785c
commit 2d2c27e5a3
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
2 changed files with 87 additions and 61 deletions

View File

@ -1289,6 +1289,29 @@ static void reset_socket_fdwrite(curl_socket_t s)
} }
#endif #endif
static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
unsigned int inc, bool *is_malloced)
{
struct pollfd *new_fds, *old_fds = *pfds;
unsigned int new_len = *pfds_len + inc;
new_fds = calloc(new_len, sizeof(struct pollfd));
if(!new_fds) {
if(*is_malloced)
free(old_fds);
*pfds = NULL;
*pfds_len = 0;
return CURLM_OUT_OF_MEMORY;
}
memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
if(*is_malloced)
free(old_fds);
*pfds = new_fds;
*pfds_len = new_len;
*is_malloced = TRUE;
return CURLM_OK;
}
#define NUM_POLLS_ON_STACK 10 #define NUM_POLLS_ON_STACK 10
static CURLMcode multi_wait(struct Curl_multi *multi, static CURLMcode multi_wait(struct Curl_multi *multi,
@ -1302,12 +1325,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct Curl_easy *data; struct Curl_easy *data;
struct easy_pollset ps; struct easy_pollset ps;
size_t i; size_t i;
unsigned int nfds = 0;
unsigned int curlfds;
long timeout_internal; long timeout_internal;
int retcode = 0; int retcode = 0;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK]; struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0]; struct pollfd *ufds = &a_few_on_stack[0];
unsigned int ufds_len = NUM_POLLS_ON_STACK;
unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
bool ufds_malloc = FALSE; bool ufds_malloc = FALSE;
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
WSANETWORKEVENTS wsa_events; WSANETWORKEVENTS wsa_events;
@ -1326,13 +1349,6 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if(timeout_ms < 0) if(timeout_ms < 0)
return CURLM_BAD_FUNCTION_ARGUMENT; return CURLM_BAD_FUNCTION_ARGUMENT;
/* Count up how many fds we have from the multi handle */
memset(&ps, 0, sizeof(ps));
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
nfds += ps.num;
}
/* If the internally desired timeout is actually shorter than requested from /* If the internally desired timeout is actually shorter than requested from
the outside, then use the shorter time! But only if the internal timer the outside, then use the shorter time! But only if the internal timer
is actually larger than -1! */ is actually larger than -1! */
@ -1340,69 +1356,60 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms)) if((timeout_internal >= 0) && (timeout_internal < (long)timeout_ms))
timeout_ms = (int)timeout_internal; timeout_ms = (int)timeout_internal;
curlfds = nfds; /* number of internal file descriptors */
nfds += extra_nfds; /* add the externally provided ones */
#ifdef ENABLE_WAKEUP
#ifdef USE_WINSOCK
if(use_wakeup) {
#else
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
#endif
++nfds;
}
#endif
if(nfds > NUM_POLLS_ON_STACK) {
/* 'nfds' is a 32 bit value and 'struct pollfd' is typically 8 bytes
big, so at 2^29 sockets this value might wrap. When a process gets
the capability to actually handle over 500 million sockets this
calculation needs an integer overflow check. */
ufds = malloc(nfds * sizeof(struct pollfd));
if(!ufds)
return CURLM_OUT_OF_MEMORY;
ufds_malloc = TRUE;
}
nfds = 0; nfds = 0;
memset(ufds, 0, ufds_len * sizeof(struct pollfd));
memset(&ps, 0, sizeof(ps));
/* only do the second loop if we found descriptors in the first stage run
above */
if(curlfds) {
/* Add the curl handles to our pollfds first */ /* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) { for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps); multi_getsock(data, &ps);
for(i = 0; i < ps.num; i++) { for(i = 0; i < ps.num; i++) {
struct pollfd *ufd = &ufds[nfds++]; short events = 0;
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
long mask = 0; long mask = 0;
#endif #endif
ufd->fd = ps.sockets[i];
ufd->events = 0;
if(ps.actions[i] & CURL_POLL_IN) { if(ps.actions[i] & CURL_POLL_IN) {
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
mask |= FD_READ|FD_ACCEPT|FD_CLOSE; mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
#endif #endif
ufd->events |= POLLIN; events |= POLLIN;
} }
if(ps.actions[i] & CURL_POLL_OUT) { if(ps.actions[i] & CURL_POLL_OUT) {
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE; mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]); reset_socket_fdwrite(ps.sockets[i]);
#endif #endif
ufd->events |= POLLOUT; events |= POLLOUT;
}
if(events) {
if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
ufds[nfds-1].events |= events;
}
else {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = ps.sockets[i];
ufds[nfds].events = events;
++nfds;
}
} }
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
if(mask) {
if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) { if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
if(ufds_malloc) if(ufds_malloc)
free(ufds); free(ufds);
return CURLM_INTERNAL_ERROR; return CURLM_INTERNAL_ERROR;
} }
}
#endif #endif
} }
} }
}
curl_nfds = nfds; /* what curl internally used in ufds */
/* Add external file descriptions from poll-like struct curl_waitfd */ /* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) { for(i = 0; i < extra_nfds; i++) {
@ -1422,6 +1429,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
return CURLM_INTERNAL_ERROR; return CURLM_INTERNAL_ERROR;
} }
#endif #endif
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = extra_fds[i].fd; ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0; ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN) if(extra_fds[i].events & CURL_WAIT_POLLIN)
@ -1436,6 +1448,11 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef ENABLE_WAKEUP #ifdef ENABLE_WAKEUP
#ifndef USE_WINSOCK #ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = multi->wakeup_pair[0]; ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN; ufds[nfds].events = POLLIN;
++nfds; ++nfds;
@ -1475,7 +1492,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct, the bit values of the actual underlying poll() implementation struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */ may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) { for(i = 0; i < extra_nfds; i++) {
unsigned r = ufds[curlfds + i].revents; unsigned r = ufds[curl_nfds + i].revents;
unsigned short mask = 0; unsigned short mask = 0;
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
curl_socket_t s = extra_fds[i].fd; curl_socket_t s = extra_fds[i].fd;
@ -1508,7 +1525,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#ifdef USE_WINSOCK #ifdef USE_WINSOCK
/* Count up all our own sockets that had activity, /* Count up all our own sockets that had activity,
and remove them from the event. */ and remove them from the event. */
if(curlfds) { if(curl_nfds) {
for(data = multi->easyp; data; data = data->next) { for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps); multi_getsock(data, &ps);
@ -1529,7 +1546,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#else #else
#ifdef ENABLE_WAKEUP #ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) { if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curlfds + extra_nfds].revents & POLLIN) { if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
char buf[64]; char buf[64];
ssize_t nread; ssize_t nread;
while(1) { while(1) {

View File

@ -49,13 +49,15 @@ class ScoreCard:
nghttpx: Optional[Nghttpx], nghttpx: Optional[Nghttpx],
caddy: Optional[Caddy], caddy: Optional[Caddy],
verbose: int, verbose: int,
curl_verbose: int): curl_verbose: int,
download_parallel: int = 0):
self.verbose = verbose self.verbose = verbose
self.env = env self.env = env
self.httpd = httpd self.httpd = httpd
self.nghttpx = nghttpx self.nghttpx = nghttpx
self.caddy = caddy self.caddy = caddy
self._silent_curl = not curl_verbose self._silent_curl = not curl_verbose
self._download_parallel = download_parallel
def info(self, msg): def info(self, msg):
if self.verbose > 0: if self.verbose > 0:
@ -138,6 +140,7 @@ class ScoreCard:
return { return {
'count': count, 'count': count,
'samples': sample_size, 'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1, 'speed': mean(samples) if len(samples) else -1,
'errors': errors, 'errors': errors,
'stats': RunProfile.AverageStats(profiles), 'stats': RunProfile.AverageStats(profiles),
@ -164,6 +167,7 @@ class ScoreCard:
return { return {
'count': count, 'count': count,
'samples': sample_size, 'samples': sample_size,
'max-parallel': 1,
'speed': mean(samples) if len(samples) else -1, 'speed': mean(samples) if len(samples) else -1,
'errors': errors, 'errors': errors,
'stats': RunProfile.AverageStats(profiles), 'stats': RunProfile.AverageStats(profiles),
@ -174,6 +178,7 @@ class ScoreCard:
samples = [] samples = []
errors = [] errors = []
profiles = [] profiles = []
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]' url = f'{url}?[0-{count - 1}]'
self.info(f'parallel...') self.info(f'parallel...')
for i in range(sample_size): for i in range(sample_size):
@ -182,7 +187,7 @@ class ScoreCard:
with_headers=False, with_headers=False,
with_profile=True, with_profile=True,
extra_args=['--parallel', extra_args=['--parallel',
'--parallel-max', str(count)]) '--parallel-max', str(max_parallel)])
err = self._check_downloads(r, count) err = self._check_downloads(r, count)
if err: if err:
errors.append(err) errors.append(err)
@ -193,6 +198,7 @@ class ScoreCard:
return { return {
'count': count, 'count': count,
'samples': sample_size, 'samples': sample_size,
'max-parallel': max_parallel,
'speed': mean(samples) if len(samples) else -1, 'speed': mean(samples) if len(samples) else -1,
'errors': errors, 'errors': errors,
'stats': RunProfile.AverageStats(profiles), 'stats': RunProfile.AverageStats(profiles),
@ -436,7 +442,7 @@ class ScoreCard:
for mkey, mval in server_score[sskey].items(): for mkey, mval in server_score[sskey].items():
if mkey not in measures: if mkey not in measures:
measures.append(mkey) measures.append(mkey)
m_names[mkey] = f'{mkey}({mval["count"]}x)' m_names[mkey] = f'{mkey}({mval["count"]}x{mval["max-parallel"]})'
print('Downloads') print('Downloads')
print(f' {"Server":<8} {"Size":>8}', end='') print(f' {"Server":<8} {"Size":>8}', end='')
@ -543,6 +549,8 @@ def main():
default=None, help="evaluate download size") default=None, help="evaluate download size")
parser.add_argument("--download-count", action='store', type=int, parser.add_argument("--download-count", action='store', type=int,
default=50, help="perform that many downloads") default=50, help="perform that many downloads")
parser.add_argument("--download-parallel", action='store', type=int,
default=0, help="perform that many downloads in parallel (default all)")
parser.add_argument("-r", "--requests", action='store_true', parser.add_argument("-r", "--requests", action='store_true',
default=False, help="evaluate requests") default=False, help="evaluate requests")
parser.add_argument("--request-count", action='store', type=int, parser.add_argument("--request-count", action='store', type=int,
@ -607,7 +615,8 @@ def main():
assert caddy.start() assert caddy.start()
card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy, card = ScoreCard(env=env, httpd=httpd, nghttpx=nghttpx, caddy=caddy,
verbose=args.verbose, curl_verbose=args.curl_verbose) verbose=args.verbose, curl_verbose=args.curl_verbose,
download_parallel=args.download_parallel)
score = card.score_proto(proto=protocol, score = card.score_proto(proto=protocol,
handshakes=handshakes, handshakes=handshakes,
downloads=downloads, downloads=downloads,