conncache: count shutdowns against host and max limits
Count connections to a host against a possibly configured destination limit. Trigger multi `connchange` when a connection has been shutdown, so pending transfers can try to get a connection once again. Reported-by: baranyaib90 on github Fixes #15857 Closes #15879
This commit is contained in:
parent
508861eb80
commit
bd3c027ac9
@ -100,6 +100,8 @@ static void cpool_shutdown_all(struct cpool *cpool,
|
||||
struct Curl_easy *data, int timeout_ms);
|
||||
static void cpool_close_and_destroy_all(struct cpool *cpool);
|
||||
static struct connectdata *cpool_get_oldest_idle(struct cpool *cpool);
|
||||
static size_t cpool_shutdown_dest_count(struct cpool *cpool,
|
||||
const char *destination);
|
||||
|
||||
static struct cpool_bundle *cpool_bundle_create(const char *dest,
|
||||
size_t dest_len)
|
||||
@ -285,6 +287,7 @@ int Curl_cpool_check_limits(struct Curl_easy *data,
|
||||
struct cpool_bundle *bundle;
|
||||
size_t dest_limit = 0;
|
||||
size_t total_limit = 0;
|
||||
size_t shutdowns;
|
||||
int result = CPOOL_LIMIT_OK;
|
||||
|
||||
if(!cpool)
|
||||
@ -300,8 +303,12 @@ int Curl_cpool_check_limits(struct Curl_easy *data,
|
||||
|
||||
CPOOL_LOCK(cpool);
|
||||
if(dest_limit) {
|
||||
size_t live;
|
||||
|
||||
bundle = cpool_find_bundle(cpool, conn);
|
||||
while(bundle && (Curl_llist_count(&bundle->conns) >= dest_limit)) {
|
||||
live = bundle ? Curl_llist_count(&bundle->conns) : 0;
|
||||
shutdowns = cpool_shutdown_dest_count(cpool, conn->destination);
|
||||
while(!shutdowns && bundle && live >= dest_limit) {
|
||||
struct connectdata *oldest_idle = NULL;
|
||||
/* The bundle is full. Extract the oldest connection that may
|
||||
* be removed now, if there is one. */
|
||||
@ -317,15 +324,18 @@ int Curl_cpool_check_limits(struct Curl_easy *data,
|
||||
|
||||
/* in case the bundle was destroyed in disconnect, look it up again */
|
||||
bundle = cpool_find_bundle(cpool, conn);
|
||||
live = bundle ? Curl_llist_count(&bundle->conns) : 0;
|
||||
shutdowns = cpool_shutdown_dest_count(cpool, conn->destination);
|
||||
}
|
||||
if(bundle && (Curl_llist_count(&bundle->conns) >= dest_limit)) {
|
||||
if((live + shutdowns) >= dest_limit) {
|
||||
result = CPOOL_LIMIT_DEST;
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
if(total_limit) {
|
||||
while(cpool->num_conn >= total_limit) {
|
||||
shutdowns = Curl_llist_count(&cpool->shutdowns);
|
||||
while((cpool->num_conn + shutdowns) >= total_limit) {
|
||||
struct connectdata *oldest_idle = cpool_get_oldest_idle(cpool);
|
||||
if(!oldest_idle)
|
||||
break;
|
||||
@ -335,8 +345,9 @@ int Curl_cpool_check_limits(struct Curl_easy *data,
|
||||
"limit of %zu",
|
||||
oldest_idle->connection_id, cpool->num_conn, total_limit));
|
||||
Curl_cpool_disconnect(data, oldest_idle, FALSE);
|
||||
shutdowns = Curl_llist_count(&cpool->shutdowns);
|
||||
}
|
||||
if(cpool->num_conn >= total_limit) {
|
||||
if((cpool->num_conn + shutdowns) >= total_limit) {
|
||||
result = CPOOL_LIMIT_TOTAL;
|
||||
goto out;
|
||||
}
|
||||
@ -374,7 +385,8 @@ CURLcode Curl_cpool_add_conn(struct Curl_easy *data,
|
||||
cpool->num_conn++;
|
||||
DEBUGF(infof(data, "Added connection %" FMT_OFF_T ". "
|
||||
"The cache now contains %zu members",
|
||||
conn->connection_id, cpool->num_conn));
|
||||
conn->connection_id,
|
||||
cpool->num_conn + Curl_llist_count(&cpool->shutdowns)));
|
||||
out:
|
||||
CPOOL_UNLOCK(cpool);
|
||||
|
||||
@ -612,6 +624,21 @@ bool Curl_cpool_find(struct Curl_easy *data,
|
||||
return result;
|
||||
}
|
||||
|
||||
/* How many connections to the given destination are in shutdown? */
|
||||
static size_t cpool_shutdown_dest_count(struct cpool *cpool,
|
||||
const char *destination)
|
||||
{
|
||||
size_t n = 0;
|
||||
struct Curl_llist_node *e = Curl_llist_head(&cpool->shutdowns);
|
||||
while(e) {
|
||||
struct connectdata *conn = Curl_node_elem(e);
|
||||
if(!strcmp(destination, conn->destination))
|
||||
++n;
|
||||
e = Curl_node_next(e);
|
||||
}
|
||||
return n;
|
||||
}
|
||||
|
||||
static void cpool_shutdown_discard_all(struct cpool *cpool)
|
||||
{
|
||||
struct Curl_llist_node *e = Curl_llist_head(&cpool->shutdowns);
|
||||
@ -742,12 +769,12 @@ static void cpool_discard_conn(struct cpool *cpool,
|
||||
|
||||
/* Add the connection to our shutdown list for non-blocking shutdown
|
||||
* during multi processing. */
|
||||
if(data->multi && data->multi->max_shutdown_connections > 0 &&
|
||||
(data->multi->max_shutdown_connections >=
|
||||
(long)Curl_llist_count(&cpool->shutdowns))) {
|
||||
if(data->multi && data->multi->max_total_connections > 0 &&
|
||||
(data->multi->max_total_connections <=
|
||||
(long)(cpool->num_conn + Curl_llist_count(&cpool->shutdowns)))) {
|
||||
DEBUGF(infof(data, "[CCACHE] discarding oldest shutdown connection "
|
||||
"due to limit of %ld",
|
||||
data->multi->max_shutdown_connections));
|
||||
"due to connection limit of %ld",
|
||||
data->multi->max_total_connections));
|
||||
cpool_shutdown_destroy_oldest(cpool);
|
||||
}
|
||||
|
||||
@ -767,8 +794,8 @@ static void cpool_discard_conn(struct cpool *cpool,
|
||||
|
||||
Curl_llist_append(&cpool->shutdowns, conn, &conn->cpool_node);
|
||||
DEBUGF(infof(data, "[CCACHE] added #%" FMT_OFF_T
|
||||
" to shutdown list of length %zu", conn->connection_id,
|
||||
Curl_llist_count(&cpool->shutdowns)));
|
||||
" to shutdowns, now %zu conns in shutdown",
|
||||
conn->connection_id, Curl_llist_count(&cpool->shutdowns)));
|
||||
}
|
||||
|
||||
void Curl_cpool_disconnect(struct Curl_easy *data,
|
||||
@ -1049,6 +1076,11 @@ static void cpool_close_and_destroy(struct cpool *cpool,
|
||||
Curl_detach_connection(data);
|
||||
|
||||
Curl_conn_free(data, conn);
|
||||
|
||||
if(cpool && cpool->multi) {
|
||||
DEBUGF(infof(data, "[CCACHE] trigger multi connchanged"));
|
||||
Curl_multi_connchanged(cpool->multi);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
10
lib/multi.c
10
lib/multi.c
@ -3031,9 +3031,11 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles)
|
||||
|
||||
sigpipe_apply(multi->cpool.idata, &pipe_st);
|
||||
Curl_cpool_multi_perform(multi);
|
||||
|
||||
sigpipe_restore(&pipe_st);
|
||||
|
||||
if(multi_ischanged(m, TRUE))
|
||||
process_pending_handles(m);
|
||||
|
||||
/*
|
||||
* Simply remove all expired timers from the splay since handles are dealt
|
||||
* with unconditionally by this function and curl_multi_timeout() requires
|
||||
@ -3629,6 +3631,9 @@ out:
|
||||
}
|
||||
sigpipe_restore(&mrc.pipe_st);
|
||||
|
||||
if(multi_ischanged(multi, TRUE))
|
||||
process_pending_handles(multi);
|
||||
|
||||
if(running_handles)
|
||||
*running_handles = (int)multi->num_alive;
|
||||
|
||||
@ -3686,9 +3691,6 @@ CURLMcode curl_multi_setopt(CURLM *m,
|
||||
break;
|
||||
case CURLMOPT_MAX_TOTAL_CONNECTIONS:
|
||||
multi->max_total_connections = va_arg(param, long);
|
||||
/* for now, let this also decide the max number of connections
|
||||
* in shutdown handling */
|
||||
multi->max_shutdown_connections = va_arg(param, long);
|
||||
break;
|
||||
/* options formerly used for pipelining */
|
||||
case CURLMOPT_MAX_PIPELINE_LENGTH:
|
||||
|
||||
@ -148,8 +148,6 @@ struct Curl_multi {
|
||||
|
||||
long max_total_connections; /* if >0, a fixed limit of the maximum number
|
||||
of connections in total */
|
||||
long max_shutdown_connections; /* if >0, a fixed limit of the maximum number
|
||||
of connections in shutdown handling */
|
||||
|
||||
/* timer callback and user data pointer for the *socket() API */
|
||||
curl_multi_timer_callback timer_cb;
|
||||
|
||||
@ -312,10 +312,11 @@ int main(int argc, char *argv[])
|
||||
struct curl_slist *host = NULL;
|
||||
char *resolve = NULL;
|
||||
size_t max_host_conns = 0;
|
||||
size_t max_total_conns = 0;
|
||||
int fresh_connect = 0;
|
||||
int result = 0;
|
||||
|
||||
while((ch = getopt(argc, argv, "aefhm:n:xA:F:M:P:r:V:")) != -1) {
|
||||
while((ch = getopt(argc, argv, "aefhm:n:xA:F:M:P:r:T:V:")) != -1) {
|
||||
switch(ch) {
|
||||
case 'h':
|
||||
usage(NULL);
|
||||
@ -355,6 +356,9 @@ int main(int argc, char *argv[])
|
||||
free(resolve);
|
||||
resolve = strdup(optarg);
|
||||
break;
|
||||
case 'T':
|
||||
max_total_conns = (size_t)strtol(optarg, NULL, 10);
|
||||
break;
|
||||
case 'V': {
|
||||
if(!strcmp("http/1.1", optarg))
|
||||
http_version = CURL_HTTP_VERSION_1_1;
|
||||
@ -413,6 +417,8 @@ int main(int argc, char *argv[])
|
||||
|
||||
multi_handle = curl_multi_init();
|
||||
curl_multi_setopt(multi_handle, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
|
||||
curl_multi_setopt(multi_handle, CURLMOPT_MAX_TOTAL_CONNECTIONS,
|
||||
(long)max_total_conns);
|
||||
curl_multi_setopt(multi_handle, CURLMOPT_MAX_HOST_CONNECTIONS,
|
||||
(long)max_host_conns);
|
||||
|
||||
|
||||
@ -638,8 +638,8 @@ class TestDownload:
|
||||
def test_02_33_max_host_conns(self, env: Env, httpd, nghttpx, proto, max_host_conns):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
count = 100
|
||||
max_parallel = 100
|
||||
count = 50
|
||||
max_parallel = 50
|
||||
docname = 'data-10k'
|
||||
port = env.port_for(proto)
|
||||
url = f'https://{env.domain1}:{port}/{docname}'
|
||||
@ -657,3 +657,46 @@ class TestDownload:
|
||||
r.check_exit_code(0)
|
||||
srcfile = os.path.join(httpd.docs_dir, docname)
|
||||
self.check_downloads(client, srcfile, count)
|
||||
if max_host_conns > 0:
|
||||
matched_lines = 0
|
||||
for line in r.trace_lines:
|
||||
m = re.match(r'.*The cache now contains (\d+) members.*', line)
|
||||
if m:
|
||||
matched_lines += 1
|
||||
n = int(m.group(1))
|
||||
assert n <= max_host_conns
|
||||
assert matched_lines > 0
|
||||
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
|
||||
@pytest.mark.parametrize("max_total_conns", [0, 1, 5])
|
||||
def test_02_34_max_total_conns(self, env: Env, httpd, nghttpx, proto, max_total_conns):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
count = 50
|
||||
max_parallel = 50
|
||||
docname = 'data-10k'
|
||||
port = env.port_for(proto)
|
||||
url = f'https://{env.domain1}:{port}/{docname}'
|
||||
client = LocalClient(name='hx-download', env=env)
|
||||
if not client.exists():
|
||||
pytest.skip(f'example client not built: {client.name}')
|
||||
r = client.run(args=[
|
||||
'-n', f'{count}',
|
||||
'-m', f'{max_parallel}',
|
||||
'-x', # always use a fresh connection
|
||||
'-T', str(max_total_conns), # limit total connections
|
||||
'-r', f'{env.domain1}:{port}:127.0.0.1',
|
||||
'-V', proto, url
|
||||
])
|
||||
r.check_exit_code(0)
|
||||
srcfile = os.path.join(httpd.docs_dir, docname)
|
||||
self.check_downloads(client, srcfile, count)
|
||||
if max_total_conns > 0:
|
||||
matched_lines = 0
|
||||
for line in r.trace_lines:
|
||||
m = re.match(r'.*The cache now contains (\d+) members.*', line)
|
||||
if m:
|
||||
matched_lines += 1
|
||||
n = int(m.group(1))
|
||||
assert n <= max_total_conns
|
||||
assert matched_lines > 0
|
||||
|
||||
Loading…
Reference in New Issue
Block a user