curl: fix --test-event --parallel
(in debug-builds) Fix implementation in curl using libuv to process parallel transfers. Add pytest capabilities to run test cases with --test-event. - fix uv_timer handling to carry correct 'data' pointing to uv context. - fix uv_loop handling to reap and add transfers when possible - fix return code when a transfer errored Closes #14413
This commit is contained in:
parent
1b25448760
commit
b102763c19
@ -2534,6 +2534,9 @@ struct parastate {
|
||||
};
|
||||
|
||||
#if defined(DEBUGBUILD) && defined(USE_LIBUV)
|
||||
|
||||
#define DEBUG_UV 0
|
||||
|
||||
/* object to pass to the callbacks */
|
||||
struct datauv {
|
||||
uv_timer_t timeout;
|
||||
@ -2549,9 +2552,24 @@ struct contextuv {
|
||||
|
||||
static CURLcode check_finished(struct parastate *s);
|
||||
|
||||
static void check_multi_info(struct contextuv *context)
|
||||
static void check_multi_info(struct datauv *uv)
|
||||
{
|
||||
(void)check_finished(context->uv->s);
|
||||
CURLcode result;
|
||||
|
||||
result = check_finished(uv->s);
|
||||
if(result && !uv->s->result)
|
||||
uv->s->result = result;
|
||||
|
||||
if(uv->s->more_transfers) {
|
||||
result = add_parallel_transfers(uv->s->global, uv->s->multi,
|
||||
uv->s->share,
|
||||
&uv->s->more_transfers,
|
||||
&uv->s->added_transfers);
|
||||
if(result && !uv->s->result)
|
||||
uv->s->result = result;
|
||||
if(result)
|
||||
uv_stop(uv->loop);
|
||||
}
|
||||
}
|
||||
|
||||
/* callback from libuv on socket activity */
|
||||
@ -2567,17 +2585,19 @@ static void on_uv_socket(uv_poll_t *req, int status, int events)
|
||||
|
||||
curl_multi_socket_action(c->uv->s->multi, c->sockfd, flags,
|
||||
&c->uv->s->still_running);
|
||||
check_multi_info(c);
|
||||
}
|
||||
|
||||
/* callback from libuv when timeout expires */
|
||||
static void on_uv_timeout(uv_timer_t *req)
|
||||
{
|
||||
struct contextuv *c = (struct contextuv *) req->data;
|
||||
if(c) {
|
||||
curl_multi_socket_action(c->uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
|
||||
&c->uv->s->still_running);
|
||||
check_multi_info(c);
|
||||
struct datauv *uv = (struct datauv *) req->data;
|
||||
#if DEBUG_UV
|
||||
fprintf(tool_stderr, "parallel_event: on_uv_timeout\n");
|
||||
#endif
|
||||
if(uv && uv->s) {
|
||||
curl_multi_socket_action(uv->s->multi, CURL_SOCKET_TIMEOUT, 0,
|
||||
&uv->s->still_running);
|
||||
check_multi_info(uv);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2586,6 +2606,9 @@ static int cb_timeout(CURLM *multi, long timeout_ms,
|
||||
struct datauv *uv)
|
||||
{
|
||||
(void)multi;
|
||||
#if DEBUG_UV
|
||||
fprintf(tool_stderr, "parallel_event: cb_timeout=%ld\n", timeout_ms);
|
||||
#endif
|
||||
if(timeout_ms < 0)
|
||||
uv_timer_stop(&uv->timeout);
|
||||
else {
|
||||
@ -2656,6 +2679,8 @@ static int cb_socket(CURL *easy, curl_socket_t s, int action,
|
||||
uv_poll_stop(&c->poll_handle);
|
||||
destroy_context(c);
|
||||
curl_multi_assign(uv->s->multi, s, NULL);
|
||||
/* check if we can do more now */
|
||||
check_multi_info(uv);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
@ -2670,9 +2695,11 @@ static CURLcode parallel_event(struct parastate *s)
|
||||
CURLcode result = CURLE_OK;
|
||||
struct datauv uv = { 0 };
|
||||
|
||||
s->result = CURLE_OK;
|
||||
uv.s = s;
|
||||
uv.loop = uv_default_loop();
|
||||
uv_timer_init(uv.loop, &uv.timeout);
|
||||
uv.s = s;
|
||||
uv.timeout.data = &uv;
|
||||
|
||||
/* setup event callbacks */
|
||||
curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, cb_socket);
|
||||
@ -2682,10 +2709,49 @@ static CURLcode parallel_event(struct parastate *s)
|
||||
|
||||
/* kickstart the thing */
|
||||
curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0,
|
||||
&uv.s->still_running);
|
||||
uv_run(uv.loop, UV_RUN_DEFAULT);
|
||||
&s->still_running);
|
||||
|
||||
return result;
|
||||
while(!s->mcode && (s->still_running || s->more_transfers)) {
|
||||
#if DEBUG_UV
|
||||
fprintf(tool_stderr, "parallel_event: uv_run(), mcode=%d, %d running, "
|
||||
"%d more\n", s->mcode, uv.s->still_running, s->more_transfers);
|
||||
#endif
|
||||
uv_run(uv.loop, UV_RUN_DEFAULT);
|
||||
#if DEBUG_UV
|
||||
fprintf(tool_stderr, "parallel_event: uv_run() returned\n");
|
||||
#endif
|
||||
|
||||
result = check_finished(s);
|
||||
if(result && !s->result)
|
||||
s->result = result;
|
||||
|
||||
/* early exit called */
|
||||
if(s->wrapitup) {
|
||||
if(s->still_running && !s->wrapitup_processed) {
|
||||
struct per_transfer *per;
|
||||
for(per = transfers; per; per = per->next) {
|
||||
if(per->added)
|
||||
per->abort = TRUE;
|
||||
}
|
||||
s->wrapitup_processed = TRUE;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
if(s->more_transfers) {
|
||||
result = add_parallel_transfers(s->global, s->multi, s->share,
|
||||
&s->more_transfers, &s->added_transfers);
|
||||
if(result && !s->result)
|
||||
s->result = result;
|
||||
}
|
||||
}
|
||||
|
||||
#if DEBUG_UV
|
||||
fprintf(tool_stderr, "DONE parallel_event -> %d, mcode=%d, %d running, "
|
||||
"%d more\n",
|
||||
s->result, s->mcode, uv.s->still_running, s->more_transfers);
|
||||
#endif
|
||||
return s->result;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
@ -87,18 +87,19 @@ class TestDownload:
|
||||
r.check_response(http_status=200, count=100, connect_count=1)
|
||||
|
||||
# download 100 files parallel
|
||||
@pytest.mark.parametrize("proto", ['h2', 'h3'])
|
||||
def test_02_04_download_100_parallel(self, env: Env,
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
def test_02_04_download_20_parallel(self, env: Env,
|
||||
httpd, nghttpx, repeat, proto):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
max_parallel = 50
|
||||
count = 20
|
||||
max_parallel = 10
|
||||
curl = CurlClient(env=env)
|
||||
urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-99]'
|
||||
urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]'
|
||||
r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[
|
||||
'--parallel', '--parallel-max', f'{max_parallel}'
|
||||
])
|
||||
r.check_response(http_status=200, count=100)
|
||||
r.check_response(http_status=200, count=count)
|
||||
if proto == 'http/1.1':
|
||||
# http/1.1 parallel transfers will open multiple connections
|
||||
assert r.total_connects > 1, r.dump_logs()
|
||||
|
||||
@ -50,7 +50,7 @@ class LocalClient:
|
||||
self.name = name
|
||||
self.path = os.path.join(env.project_dir, f'tests/http/clients/{name}')
|
||||
self.env = env
|
||||
self._run_env= run_env
|
||||
self._run_env = run_env
|
||||
self._timeout = timeout if timeout else env.test_timeout
|
||||
self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl
|
||||
self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, name)
|
||||
@ -92,12 +92,18 @@ class LocalClient:
|
||||
exception = None
|
||||
myargs = [self.path]
|
||||
myargs.extend(args)
|
||||
run_env = None
|
||||
if self._run_env:
|
||||
run_env = self._run_env.copy()
|
||||
for key in ['CURL_DEBUG']:
|
||||
if key in os.environ and key not in run_env:
|
||||
run_env[key] = os.environ[key]
|
||||
try:
|
||||
with open(self._stdoutfile, 'w') as cout:
|
||||
with open(self._stderrfile, 'w') as cerr:
|
||||
p = subprocess.run(myargs, stderr=cerr, stdout=cout,
|
||||
cwd=self._run_dir, shell=False,
|
||||
input=None, env=self._run_env,
|
||||
input=None, env=run_env,
|
||||
timeout=self._timeout)
|
||||
exitcode = p.returncode
|
||||
except subprocess.TimeoutExpired:
|
||||
|
||||
@ -824,6 +824,9 @@ class CurlClient:
|
||||
urls = [urls]
|
||||
|
||||
args = [self._curl, "-s", "--path-as-is"]
|
||||
if 'CURL_TEST_EVENT' in os.environ:
|
||||
args.append('--test-event')
|
||||
|
||||
if with_headers:
|
||||
args.extend(["-D", self._headerfile])
|
||||
if def_tracing is not False and not self._silent:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user