multi: prepare multi_wait() for future shutdown usage

- new struct curl_pollfds and struct curl_waitfds
- add structs and methods to init/add/cleanup an array of pollfd and
  struct curl_waitfd. Use in multi_wait() and multi_waitfds() to
  populate the sets for polling.
- place USE_WINSOCK WSAEventSelect() setting into a separate loop over
  all collected pfds

Closes #13900
This commit is contained in:
Stefan Eissing 2024-06-06 12:40:38 +02:00 committed by Daniel Stenberg
parent c31041b17e
commit 374d178f14
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
3 changed files with 239 additions and 156 deletions

View File

@ -1227,12 +1227,8 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
unsigned int *fd_count)
{
struct Curl_easy *data;
unsigned int nfds = 0;
struct curl_waitfds cwfds;
struct easy_pollset ps;
unsigned int i;
CURLMcode result = CURLM_OK;
struct curl_waitfd *ufd;
unsigned int j;
if(!ufds)
return CURLM_BAD_FUNCTION_ARGUMENT;
@ -1243,44 +1239,17 @@ CURLMcode curl_multi_waitfds(struct Curl_multi *multi,
if(multi->in_callback)
return CURLM_RECURSIVE_API_CALL;
Curl_waitfds_init(&cwfds, ufds, size);
memset(&ps, 0, sizeof(ps));
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
for(i = 0; i < ps.num; i++) {
if(nfds < size) {
curl_socket_t fd = ps.sockets[i];
int fd_idx = -1;
/* Simple linear search to skip an already added descriptor */
for(j = 0; j < nfds; j++) {
if(ufds[j].fd == fd) {
fd_idx = (int)j;
break;
}
}
if(fd_idx < 0) {
ufd = &ufds[nfds++];
ufd->fd = ps.sockets[i];
ufd->events = 0;
}
else
ufd = &ufds[fd_idx];
if(ps.actions[i] & CURL_POLL_IN)
ufd->events |= CURL_WAIT_POLLIN;
if(ps.actions[i] & CURL_POLL_OUT)
ufd->events |= CURL_WAIT_POLLOUT;
}
else
return CURLM_OUT_OF_MEMORY;
}
if(Curl_waitfds_add_ps(&cwfds, &ps))
return CURLM_OUT_OF_MEMORY;
}
if(fd_count)
*fd_count = nfds;
return result;
*fd_count = cwfds.n;
return CURLM_OK;
}
#ifdef USE_WINSOCK
@ -1299,29 +1268,6 @@ static void reset_socket_fdwrite(curl_socket_t s)
}
#endif
static CURLMcode ufds_increase(struct pollfd **pfds, unsigned int *pfds_len,
unsigned int inc, bool *is_malloced)
{
struct pollfd *new_fds, *old_fds = *pfds;
unsigned int new_len = *pfds_len + inc;
new_fds = calloc(new_len, sizeof(struct pollfd));
if(!new_fds) {
if(*is_malloced)
free(old_fds);
*pfds = NULL;
*pfds_len = 0;
return CURLM_OUT_OF_MEMORY;
}
memcpy(new_fds, old_fds, (*pfds_len) * sizeof(struct pollfd));
if(*is_malloced)
free(old_fds);
*pfds = new_fds;
*pfds_len = new_len;
*is_malloced = TRUE;
return CURLM_OK;
}
#define NUM_POLLS_ON_STACK 10
static CURLMcode multi_wait(struct Curl_multi *multi,
@ -1338,10 +1284,8 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
long timeout_internal;
int retcode = 0;
struct pollfd a_few_on_stack[NUM_POLLS_ON_STACK];
struct pollfd *ufds = &a_few_on_stack[0];
unsigned int ufds_len = NUM_POLLS_ON_STACK;
unsigned int nfds = 0, curl_nfds = 0; /* how many ufds are in use */
bool ufds_malloc = FALSE;
struct curl_pollfds cpfds;
unsigned int curl_nfds = 0; /* how many pfds are for curl transfers */
#ifdef USE_WINSOCK
WSANETWORKEVENTS wsa_events;
DEBUGASSERT(multi->wsa_event != WSA_INVALID_EVENT);
@ -1359,105 +1303,62 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
if(timeout_ms < 0)
return CURLM_BAD_FUNCTION_ARGUMENT;
memset(ufds, 0, ufds_len * sizeof(struct pollfd));
Curl_pollfds_init(&cpfds, a_few_on_stack, NUM_POLLS_ON_STACK);
memset(&ps, 0, sizeof(ps));
/* Add the curl handles to our pollfds first */
for(data = multi->easyp; data; data = data->next) {
multi_getsock(data, &ps);
for(i = 0; i < ps.num; i++) {
short events = 0;
#ifdef USE_WINSOCK
long mask = 0;
#endif
if(ps.actions[i] & CURL_POLL_IN) {
#ifdef USE_WINSOCK
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
#endif
events |= POLLIN;
}
if(ps.actions[i] & CURL_POLL_OUT) {
#ifdef USE_WINSOCK
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(ps.sockets[i]);
#endif
events |= POLLOUT;
}
if(events) {
if(nfds && ps.sockets[i] == ufds[nfds-1].fd) {
ufds[nfds-1].events |= events;
}
else {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = ps.sockets[i];
ufds[nfds].events = events;
++nfds;
}
}
#ifdef USE_WINSOCK
if(mask) {
if(WSAEventSelect(ps.sockets[i], multi->wsa_event, mask) != 0) {
if(ufds_malloc)
free(ufds);
return CURLM_INTERNAL_ERROR;
}
}
#endif
if(Curl_pollfds_add_ps(&cpfds, &ps)) {
Curl_pollfds_cleanup(&cpfds);
return CURLM_OUT_OF_MEMORY;
}
}
curl_nfds = nfds; /* what curl internally used in ufds */
curl_nfds = cpfds.n; /* what curl internally uses in cpfds */
/* Add external file descriptions from poll-like struct curl_waitfd */
for(i = 0; i < extra_nfds; i++) {
#ifdef USE_WINSOCK
long mask = 0;
unsigned short events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
events |= POLLIN;
if(extra_fds[i].events & CURL_WAIT_POLLPRI)
mask |= FD_OOB;
if(extra_fds[i].events & CURL_WAIT_POLLOUT) {
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(extra_fds[i].fd);
}
if(WSAEventSelect(extra_fds[i].fd, multi->wsa_event, mask) != 0) {
if(ufds_malloc)
free(ufds);
return CURLM_INTERNAL_ERROR;
}
#endif
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = extra_fds[i].fd;
ufds[nfds].events = 0;
if(extra_fds[i].events & CURL_WAIT_POLLIN)
ufds[nfds].events |= POLLIN;
if(extra_fds[i].events & CURL_WAIT_POLLPRI)
ufds[nfds].events |= POLLPRI;
events |= POLLPRI;
if(extra_fds[i].events & CURL_WAIT_POLLOUT)
ufds[nfds].events |= POLLOUT;
++nfds;
events |= POLLOUT;
if(Curl_pollfds_add_sock(&cpfds, extra_fds[i].fd, events)) {
Curl_pollfds_cleanup(&cpfds);
return CURLM_OUT_OF_MEMORY;
}
}
#ifdef USE_WINSOCK
/* Set the WSA events based on the collected pollds */
for(i = 0; i < cpfds.n; i++) {
long mask = 0;
if(cpfds.pfds[i].events & POLLIN)
mask |= FD_READ|FD_ACCEPT|FD_CLOSE;
if(cpfds.pfds[i].events & POLLPRI)
mask |= FD_OOB;
if(cpfds.pfds[i].events & POLLOUT) {
mask |= FD_WRITE|FD_CONNECT|FD_CLOSE;
reset_socket_fdwrite(cpfds.pfds[i].fd);
}
if(mask) {
if(WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, mask) != 0) {
Curl_pollfds_cleanup(&cpfds);
return CURLM_INTERNAL_ERROR;
}
}
}
#endif
#ifdef ENABLE_WAKEUP
#ifndef USE_WINSOCK
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(nfds >= ufds_len) {
if(ufds_increase(&ufds, &ufds_len, 100, &ufds_malloc))
return CURLM_OUT_OF_MEMORY;
if(Curl_pollfds_add_sock(&cpfds, multi->wakeup_pair[0], POLLIN)) {
Curl_pollfds_cleanup(&cpfds);
return CURLM_OUT_OF_MEMORY;
}
DEBUGASSERT(nfds < ufds_len);
ufds[nfds].fd = multi->wakeup_pair[0];
ufds[nfds].events = POLLIN;
++nfds;
}
#endif
#endif
@ -1471,21 +1372,23 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
timeout_ms = (int)timeout_internal;
#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
if(nfds || use_wakeup) {
if(cpfds.n || use_wakeup) {
#else
if(nfds) {
if(cpfds.n) {
#endif
int pollrc;
#ifdef USE_WINSOCK
if(nfds)
pollrc = Curl_poll(ufds, nfds, 0); /* just pre-check with WinSock */
if(cpfds.n) /* just pre-check with WinSock */
pollrc = Curl_poll(cpfds.pfds, cpfds.n, 0);
else
pollrc = 0;
#else
pollrc = Curl_poll(ufds, nfds, timeout_ms); /* wait... */
pollrc = Curl_poll(cpfds.pfds, cpfds.n, timeout_ms); /* wait... */
#endif
if(pollrc < 0)
if(pollrc < 0) {
Curl_pollfds_cleanup(&cpfds);
return CURLM_UNRECOVERABLE_POLL;
}
if(pollrc > 0) {
retcode = pollrc;
@ -1503,7 +1406,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
struct, the bit values of the actual underlying poll() implementation
may not be the same as the ones in the public libcurl API! */
for(i = 0; i < extra_nfds; i++) {
unsigned r = (unsigned)ufds[curl_nfds + i].revents;
unsigned r = (unsigned)cpfds.pfds[curl_nfds + i].revents;
unsigned short mask = 0;
#ifdef USE_WINSOCK
curl_socket_t s = extra_fds[i].fd;
@ -1557,7 +1460,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
#else
#ifdef ENABLE_WAKEUP
if(use_wakeup && multi->wakeup_pair[0] != CURL_SOCKET_BAD) {
if(ufds[curl_nfds + extra_nfds].revents & POLLIN) {
if(cpfds.pfds[curl_nfds + extra_nfds].revents & POLLIN) {
char buf[64];
ssize_t nread;
while(1) {
@ -1581,14 +1484,12 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
}
}
if(ufds_malloc)
free(ufds);
if(ret)
*ret = retcode;
#if defined(ENABLE_WAKEUP) && defined(USE_WINSOCK)
if(extrawait && !nfds && !use_wakeup) {
if(extrawait && !cpfds.n && !use_wakeup) {
#else
if(extrawait && !nfds) {
if(extrawait && !cpfds.n) {
#endif
long sleep_ms = 0;
@ -1604,6 +1505,7 @@ static CURLMcode multi_wait(struct Curl_multi *multi,
}
}
Curl_pollfds_cleanup(&cpfds);
return CURLM_OK;
}

View File

@ -47,6 +47,10 @@
#include "select.h"
#include "timediff.h"
#include "warnless.h"
/* The last 3 #include files should be in this order */
#include "curl_printf.h"
#include "curl_memory.h"
#include "memdebug.h"
/*
* Internal function used for waiting a specific amount of ms
@ -401,3 +405,147 @@ int Curl_poll(struct pollfd ufds[], unsigned int nfds, timediff_t timeout_ms)
return r;
}
void Curl_pollfds_init(struct curl_pollfds *cpfds,
struct pollfd *static_pfds,
unsigned int static_count)
{
DEBUGASSERT(cpfds);
memset(cpfds, 0, sizeof(*cpfds));
if(static_pfds && static_count) {
cpfds->pfds = static_pfds;
cpfds->count = static_count;
}
}
void Curl_pollfds_cleanup(struct curl_pollfds *cpfds)
{
DEBUGASSERT(cpfds);
if(cpfds->allocated_pfds) {
free(cpfds->pfds);
}
memset(cpfds, 0, sizeof(*cpfds));
}
static CURLcode cpfds_increase(struct curl_pollfds *cpfds, unsigned int inc)
{
struct pollfd *new_fds;
unsigned int new_count = cpfds->count + inc;
new_fds = calloc(new_count, sizeof(struct pollfd));
if(!new_fds)
return CURLE_OUT_OF_MEMORY;
memcpy(new_fds, cpfds->pfds, cpfds->count * sizeof(struct pollfd));
if(cpfds->allocated_pfds)
free(cpfds->pfds);
cpfds->pfds = new_fds;
cpfds->count = new_count;
cpfds->allocated_pfds = TRUE;
return CURLE_OK;
}
static CURLcode cpfds_add_sock(struct curl_pollfds *cpfds,
curl_socket_t sock, short events, bool fold)
{
int i;
if(fold && cpfds->n <= INT_MAX) {
for(i = (int)cpfds->n - 1; i >= 0; --i) {
if(sock == cpfds->pfds[i].fd) {
cpfds->pfds[i].events |= events;
return CURLE_OK;
}
}
}
/* not folded, add new entry */
if(cpfds->n >= cpfds->count) {
if(cpfds_increase(cpfds, 100))
return CURLE_OUT_OF_MEMORY;
}
cpfds->pfds[cpfds->n].fd = sock;
cpfds->pfds[cpfds->n].events = events;
++cpfds->n;
return CURLE_OK;
}
CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds,
curl_socket_t sock, short events)
{
return cpfds_add_sock(cpfds, sock, events, FALSE);
}
CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds,
struct easy_pollset *ps)
{
size_t i;
DEBUGASSERT(cpfds);
DEBUGASSERT(ps);
for(i = 0; i < ps->num; i++) {
short events = 0;
if(ps->actions[i] & CURL_POLL_IN)
events |= POLLIN;
if(ps->actions[i] & CURL_POLL_OUT)
events |= POLLOUT;
if(events) {
if(cpfds_add_sock(cpfds, ps->sockets[i], events, TRUE))
return CURLE_OUT_OF_MEMORY;
}
}
return CURLE_OK;
}
void Curl_waitfds_init(struct curl_waitfds *cwfds,
struct curl_waitfd *static_wfds,
unsigned int static_count)
{
DEBUGASSERT(cwfds);
DEBUGASSERT(static_wfds);
memset(cwfds, 0, sizeof(*cwfds));
cwfds->wfds = static_wfds;
cwfds->count = static_count;
}
static CURLcode cwfds_add_sock(struct curl_waitfds *cwfds,
curl_socket_t sock, short events)
{
int i;
if(cwfds->n <= INT_MAX) {
for(i = (int)cwfds->n - 1; i >= 0; --i) {
if(sock == cwfds->wfds[i].fd) {
cwfds->wfds[i].events |= events;
return CURLE_OK;
}
}
}
/* not folded, add new entry */
if(cwfds->n >= cwfds->count)
return CURLE_OUT_OF_MEMORY;
cwfds->wfds[cwfds->n].fd = sock;
cwfds->wfds[cwfds->n].events = events;
++cwfds->n;
return CURLE_OK;
}
CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds,
struct easy_pollset *ps)
{
size_t i;
DEBUGASSERT(cwfds);
DEBUGASSERT(ps);
for(i = 0; i < ps->num; i++) {
short events = 0;
if(ps->actions[i] & CURL_POLL_IN)
events |= CURL_WAIT_POLLIN;
if(ps->actions[i] & CURL_POLL_OUT)
events |= CURL_WAIT_POLLOUT;
if(events) {
if(cwfds_add_sock(cwfds, ps->sockets[i], events))
return CURLE_OUT_OF_MEMORY;
}
}
return CURLE_OK;
}

View File

@ -111,4 +111,37 @@ int Curl_wait_ms(timediff_t timeout_ms);
} while(0)
#endif
struct curl_pollfds {
struct pollfd *pfds;
unsigned int n;
unsigned int count;
BIT(allocated_pfds);
};
void Curl_pollfds_init(struct curl_pollfds *cpfds,
struct pollfd *static_pfds,
unsigned int static_count);
void Curl_pollfds_cleanup(struct curl_pollfds *cpfds);
CURLcode Curl_pollfds_add_ps(struct curl_pollfds *cpfds,
struct easy_pollset *ps);
CURLcode Curl_pollfds_add_sock(struct curl_pollfds *cpfds,
curl_socket_t sock, short events);
struct curl_waitfds {
struct curl_waitfd *wfds;
unsigned int n;
unsigned int count;
};
void Curl_waitfds_init(struct curl_waitfds *cwfds,
struct curl_waitfd *static_wfds,
unsigned int static_count);
CURLcode Curl_waitfds_add_ps(struct curl_waitfds *cwfds,
struct easy_pollset *ps);
#endif /* HEADER_CURL_SELECT_H */