conncache: connection shutdown, multi_socket handling

- implement the socket hash user/reader/writer processing also
  for connections that are being shut down by the connection cache.
- split out handling of current vs. last pollset socket event handling
  into a function available in other code parts
- add `shutdown_poll` pollset to `connectdata` struct so that changes
  in the pollset can be recorded during shutdown. (The internal handle
  cannot keep it since it might be used for many connections)

Reported-by: calvin2021y on github
Fixes #14252
Closes #14257
This commit is contained in:
Stefan Eissing 2024-07-22 17:04:30 +02:00 committed by Daniel Stenberg
parent 8193ca59e1
commit ae620a70a0
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
4 changed files with 67 additions and 61 deletions

View File

@ -62,9 +62,9 @@ static void connc_run_conn_shutdown(struct Curl_easy *data,
bool *done); bool *done);
static void connc_run_conn_shutdown_handler(struct Curl_easy *data, static void connc_run_conn_shutdown_handler(struct Curl_easy *data,
struct connectdata *conn); struct connectdata *conn);
static CURLcode connc_update_shutdown_ev(struct Curl_multi *multi, static CURLMcode connc_update_shutdown_ev(struct Curl_multi *multi,
struct Curl_easy *data, struct Curl_easy *data,
struct connectdata *conn); struct connectdata *conn);
static void connc_shutdown_all(struct conncache *connc, int timeout_ms); static void connc_shutdown_all(struct conncache *connc, int timeout_ms);
static CURLcode bundle_create(struct connectbundle **bundlep) static CURLcode bundle_create(struct connectbundle **bundlep)
@ -725,7 +725,10 @@ static void connc_discard_conn(struct conncache *connc,
if(data->multi && data->multi->socket_cb) { if(data->multi && data->multi->socket_cb) {
DEBUGASSERT(connc == &data->multi->conn_cache); DEBUGASSERT(connc == &data->multi->conn_cache);
if(connc_update_shutdown_ev(data->multi, data, conn)) { /* Start with an empty shutdown pollset, so out internal closure handle
* is added to the sockets. */
memset(&conn->shutdown_poll, 0, sizeof(conn->shutdown_poll));
if(connc_update_shutdown_ev(data->multi, connc->closure_handle, conn)) {
DEBUGF(infof(data, "[CCACHE] update events for shutdown failed, " DEBUGF(infof(data, "[CCACHE] update events for shutdown failed, "
"discarding #%" CURL_FORMAT_CURL_OFF_T, "discarding #%" CURL_FORMAT_CURL_OFF_T,
conn->connection_id)); conn->connection_id));
@ -738,11 +741,6 @@ static void connc_discard_conn(struct conncache *connc,
DEBUGF(infof(data, "[CCACHE] added #%" CURL_FORMAT_CURL_OFF_T DEBUGF(infof(data, "[CCACHE] added #%" CURL_FORMAT_CURL_OFF_T
" to shutdown list of length %zu", conn->connection_id, " to shutdown list of length %zu", conn->connection_id,
Curl_llist_count(&connc->shutdowns.conn_list))); Curl_llist_count(&connc->shutdowns.conn_list)));
/* Forget what this transfer last polled, the connection is ours now.
* If we do not clear this, the event handling for `data` will tell
* the callback to remove the connection socket after we return here. */
memset(&data->last_poll, 0, sizeof(data->last_poll));
} }
void Curl_conncache_disconnect(struct Curl_easy *data, void Curl_conncache_disconnect(struct Curl_easy *data,
@ -967,21 +965,16 @@ static void connc_disconnect(struct Curl_easy *data,
/* the transfer must be detached from the connection */ /* the transfer must be detached from the connection */
DEBUGASSERT(data && !data->conn); DEBUGASSERT(data && !data->conn);
if(connc && connc->multi && connc->multi->socket_cb) {
unsigned int i;
for(i = 0; i < 2; ++i) {
if(CURL_SOCKET_BAD == conn->sock[i])
continue;
/* remove all connection's sockets from event handling */
connc->multi->in_callback = TRUE;
connc->multi->socket_cb(data, conn->sock[i], CURL_POLL_REMOVE,
connc->multi->socket_userp, NULL);
connc->multi->in_callback = FALSE;
}
}
Curl_attach_connection(data, conn); Curl_attach_connection(data, conn);
if(connc && connc->multi && connc->multi->socket_cb) {
struct easy_pollset ps;
/* With an empty pollset, all previously polled sockets will be removed
* via the multi_socket API callback. */
memset(&ps, 0, sizeof(ps));
(void)Curl_multi_pollset_ev(connc->multi, data, &ps, &conn->shutdown_poll);
}
connc_run_conn_shutdown_handler(data, conn); connc_run_conn_shutdown_handler(data, conn);
if(do_shutdown) { if(do_shutdown) {
/* Make a last attempt to shutdown handlers and filters, if /* Make a last attempt to shutdown handlers and filters, if
@ -1003,13 +996,12 @@ static void connc_disconnect(struct Curl_easy *data,
} }
static CURLcode connc_update_shutdown_ev(struct Curl_multi *multi, static CURLMcode connc_update_shutdown_ev(struct Curl_multi *multi,
struct Curl_easy *data, struct Curl_easy *data,
struct connectdata *conn) struct connectdata *conn)
{ {
struct easy_pollset ps; struct easy_pollset ps;
unsigned int i; CURLMcode mresult;
int rc;
DEBUGASSERT(data); DEBUGASSERT(data);
DEBUGASSERT(multi); DEBUGASSERT(multi);
@ -1020,22 +1012,11 @@ static CURLcode connc_update_shutdown_ev(struct Curl_multi *multi,
Curl_conn_adjust_pollset(data, &ps); Curl_conn_adjust_pollset(data, &ps);
Curl_detach_connection(data); Curl_detach_connection(data);
if(!ps.num) mresult = Curl_multi_pollset_ev(multi, data, &ps, &conn->shutdown_poll);
return CURLE_FAILED_INIT;
for(i = 0; i < ps.num; ++i) { if(!mresult) /* Remember for next time */
DEBUGF(infof(data, "[CCACHE] set socket=%" CURL_FORMAT_SOCKET_T memcpy(&conn->shutdown_poll, &ps, sizeof(ps));
" events=%d on #%" CURL_FORMAT_CURL_OFF_T, return mresult;
ps.sockets[i], ps.actions[i], conn->connection_id));
multi->in_callback = TRUE;
rc = multi->socket_cb(data, ps.sockets[i], ps.actions[i],
multi->socket_userp, NULL);
multi->in_callback = FALSE;
if(rc == -1)
return CURLE_FAILED_INIT;
}
return CURLE_OK;
} }
void Curl_conncache_multi_socket(struct Curl_multi *multi, void Curl_conncache_multi_socket(struct Curl_multi *multi,

View File

@ -2914,34 +2914,48 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
struct Curl_easy *data) struct Curl_easy *data)
{ {
struct easy_pollset cur_poll; struct easy_pollset cur_poll;
CURLMcode mresult;
/* Fill in the 'current' struct with the state as it is now: what sockets to
supervise and for what actions */
multi_getsock(data, &cur_poll);
mresult = Curl_multi_pollset_ev(multi, data, &cur_poll, &data->last_poll);
if(!mresult) /* Remember for next time */
memcpy(&data->last_poll, &cur_poll, sizeof(cur_poll));
return mresult;
}
CURLMcode Curl_multi_pollset_ev(struct Curl_multi *multi,
struct Curl_easy *data,
struct easy_pollset *ps,
struct easy_pollset *last_ps)
{
unsigned int i; unsigned int i;
struct Curl_sh_entry *entry; struct Curl_sh_entry *entry;
curl_socket_t s; curl_socket_t s;
int rc; int rc;
/* Fill in the 'current' struct with the state as it is now: what sockets to
supervise and for what actions */
multi_getsock(data, &cur_poll);
/* We have 0 .. N sockets already and we get to know about the 0 .. M /* We have 0 .. N sockets already and we get to know about the 0 .. M
sockets we should have from now on. Detect the differences, remove no sockets we should have from now on. Detect the differences, remove no
longer supervised ones and add new ones */ longer supervised ones and add new ones */
/* walk over the sockets we got right now */ /* walk over the sockets we got right now */
for(i = 0; i < cur_poll.num; i++) { for(i = 0; i < ps->num; i++) {
unsigned char cur_action = cur_poll.actions[i]; unsigned char cur_action = ps->actions[i];
unsigned char last_action = 0; unsigned char last_action = 0;
int comboaction; int comboaction;
s = cur_poll.sockets[i]; s = ps->sockets[i];
/* get it from the hash */ /* get it from the hash */
entry = sh_getentry(&multi->sockhash, s); entry = sh_getentry(&multi->sockhash, s);
if(entry) { if(entry) {
/* check if new for this transfer */ /* check if new for this transfer */
unsigned int j; unsigned int j;
for(j = 0; j< data->last_poll.num; j++) { for(j = 0; j< last_ps->num; j++) {
if(s == data->last_poll.sockets[j]) { if(s == last_ps->sockets[j]) {
last_action = data->last_poll.actions[j]; last_action = last_ps->actions[j];
break; break;
} }
} }
@ -2964,14 +2978,15 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
if(cur_action & CURL_POLL_OUT) if(cur_action & CURL_POLL_OUT)
entry->writers++; entry->writers++;
} }
else if(!last_action) { else if(!last_action &&
!Curl_hash_pick(&entry->transfers, (char *)&data, /* hash key */
sizeof(struct Curl_easy *))) {
/* a new transfer using this socket */ /* a new transfer using this socket */
entry->users++; entry->users++;
if(cur_action & CURL_POLL_IN) if(cur_action & CURL_POLL_IN)
entry->readers++; entry->readers++;
if(cur_action & CURL_POLL_OUT) if(cur_action & CURL_POLL_OUT)
entry->writers++; entry->writers++;
/* add 'data' to the transfer hash on this socket! */ /* add 'data' to the transfer hash on this socket! */
if(!Curl_hash_add(&entry->transfers, (char *)&data, /* hash key */ if(!Curl_hash_add(&entry->transfers, (char *)&data, /* hash key */
sizeof(struct Curl_easy *), data)) { sizeof(struct Curl_easy *), data)) {
@ -3004,15 +3019,15 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
entry->action = (unsigned int)comboaction; entry->action = (unsigned int)comboaction;
} }
/* Check for last_poll.sockets that no longer appear in cur_poll.sockets. /* Check for last_poll.sockets that no longer appear in ps->sockets.
* Need to remove the easy handle from the multi->sockhash->transfers and * Need to remove the easy handle from the multi->sockhash->transfers and
* remove multi->sockhash entry when this was the last transfer */ * remove multi->sockhash entry when this was the last transfer */
for(i = 0; i< data->last_poll.num; i++) { for(i = 0; i < last_ps->num; i++) {
unsigned int j; unsigned int j;
bool stillused = FALSE; bool stillused = FALSE;
s = data->last_poll.sockets[i]; s = last_ps->sockets[i];
for(j = 0; j < cur_poll.num; j++) { for(j = 0; j < ps->num; j++) {
if(s == cur_poll.sockets[j]) { if(s == ps->sockets[j]) {
/* this is still supervised */ /* this is still supervised */
stillused = TRUE; stillused = TRUE;
break; break;
@ -3025,7 +3040,7 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
/* if this is NULL here, the socket has been closed and notified so /* if this is NULL here, the socket has been closed and notified so
already by Curl_multi_closed() */ already by Curl_multi_closed() */
if(entry) { if(entry) {
unsigned char oldactions = data->last_poll.actions[i]; unsigned char oldactions = last_ps->actions[i];
/* this socket has been removed. Decrease user count */ /* this socket has been removed. Decrease user count */
entry->users--; entry->users--;
if(oldactions & CURL_POLL_OUT) if(oldactions & CURL_POLL_OUT)
@ -3055,8 +3070,6 @@ static CURLMcode singlesocket(struct Curl_multi *multi,
} }
} /* for loop over num */ } /* for loop over num */
/* Remember for next time */
memcpy(&data->last_poll, &cur_poll, sizeof(data->last_poll));
return CURLM_OK; return CURLM_OK;
} }

View File

@ -84,6 +84,15 @@ void Curl_multiuse_state(struct Curl_easy *data,
void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s); void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s);
/* Compare the two pollsets to notify the multi_socket API of changes
* in socket polling, e.g calling multi->socket_cb() with the changes if
* differences are seen.
*/
CURLMcode Curl_multi_pollset_ev(struct Curl_multi *multi,
struct Curl_easy *data,
struct easy_pollset *ps,
struct easy_pollset *last_ps);
/* /*
* Add a handle and move it into PERFORM state at once. For pushed streams. * Add a handle and move it into PERFORM state at once. For pushed streams.
*/ */

View File

@ -854,6 +854,9 @@ struct connectdata {
struct curltime start[2]; /* when filter shutdown started */ struct curltime start[2]; /* when filter shutdown started */
unsigned int timeout_ms; /* 0 means no timeout */ unsigned int timeout_ms; /* 0 means no timeout */
} shutdown; } shutdown;
/* Last pollset used in connection shutdown. Used to detect changes
* for multi_socket API. */
struct easy_pollset shutdown_poll;
struct ssl_primary_config ssl_config; struct ssl_primary_config ssl_config;
#ifndef CURL_DISABLE_PROXY #ifndef CURL_DISABLE_PROXY