diff --git a/src/tool_operate.c b/src/tool_operate.c index 864ec959b7..90380063b1 100644 --- a/src/tool_operate.c +++ b/src/tool_operate.c @@ -2534,6 +2534,9 @@ struct parastate { }; #if defined(DEBUGBUILD) && defined(USE_LIBUV) + +#define DEBUG_UV 0 + /* object to pass to the callbacks */ struct datauv { uv_timer_t timeout; @@ -2549,9 +2552,24 @@ struct contextuv { static CURLcode check_finished(struct parastate *s); -static void check_multi_info(struct contextuv *context) +static void check_multi_info(struct datauv *uv) { - (void)check_finished(context->uv->s); + CURLcode result; + + result = check_finished(uv->s); + if(result && !uv->s->result) + uv->s->result = result; + + if(uv->s->more_transfers) { + result = add_parallel_transfers(uv->s->global, uv->s->multi, + uv->s->share, + &uv->s->more_transfers, + &uv->s->added_transfers); + if(result && !uv->s->result) + uv->s->result = result; + if(result) + uv_stop(uv->loop); + } } /* callback from libuv on socket activity */ @@ -2567,17 +2585,19 @@ static void on_uv_socket(uv_poll_t *req, int status, int events) curl_multi_socket_action(c->uv->s->multi, c->sockfd, flags, &c->uv->s->still_running); - check_multi_info(c); } /* callback from libuv when timeout expires */ static void on_uv_timeout(uv_timer_t *req) { - struct contextuv *c = (struct contextuv *) req->data; - if(c) { - curl_multi_socket_action(c->uv->s->multi, CURL_SOCKET_TIMEOUT, 0, - &c->uv->s->still_running); - check_multi_info(c); + struct datauv *uv = (struct datauv *) req->data; +#if DEBUG_UV + fprintf(tool_stderr, "parallel_event: on_uv_timeout\n"); +#endif + if(uv && uv->s) { + curl_multi_socket_action(uv->s->multi, CURL_SOCKET_TIMEOUT, 0, + &uv->s->still_running); + check_multi_info(uv); } } @@ -2586,6 +2606,9 @@ static int cb_timeout(CURLM *multi, long timeout_ms, struct datauv *uv) { (void)multi; +#if DEBUG_UV + fprintf(tool_stderr, "parallel_event: cb_timeout=%ld\n", timeout_ms); +#endif if(timeout_ms < 0) uv_timer_stop(&uv->timeout); else { @@ -2656,6 +2679,8 @@ static int cb_socket(CURL *easy, curl_socket_t s, int action, uv_poll_stop(&c->poll_handle); destroy_context(c); curl_multi_assign(uv->s->multi, s, NULL); + /* check if we can do more now */ + check_multi_info(uv); } break; default: @@ -2670,9 +2695,11 @@ static CURLcode parallel_event(struct parastate *s) CURLcode result = CURLE_OK; struct datauv uv = { 0 }; + s->result = CURLE_OK; + uv.s = s; uv.loop = uv_default_loop(); uv_timer_init(uv.loop, &uv.timeout); - uv.s = s; + uv.timeout.data = &uv; /* setup event callbacks */ curl_multi_setopt(s->multi, CURLMOPT_SOCKETFUNCTION, cb_socket); @@ -2682,10 +2709,49 @@ static CURLcode parallel_event(struct parastate *s) /* kickstart the thing */ curl_multi_socket_action(s->multi, CURL_SOCKET_TIMEOUT, 0, - &uv.s->still_running); - uv_run(uv.loop, UV_RUN_DEFAULT); + &s->still_running); - return result; + while(!s->mcode && (s->still_running || s->more_transfers)) { +#if DEBUG_UV + fprintf(tool_stderr, "parallel_event: uv_run(), mcode=%d, %d running, " + "%d more\n", s->mcode, uv.s->still_running, s->more_transfers); +#endif + uv_run(uv.loop, UV_RUN_DEFAULT); +#if DEBUG_UV + fprintf(tool_stderr, "parallel_event: uv_run() returned\n"); +#endif + + result = check_finished(s); + if(result && !s->result) + s->result = result; + + /* early exit called */ + if(s->wrapitup) { + if(s->still_running && !s->wrapitup_processed) { + struct per_transfer *per; + for(per = transfers; per; per = per->next) { + if(per->added) + per->abort = TRUE; + } + s->wrapitup_processed = TRUE; + } + break; + } + + if(s->more_transfers) { + result = add_parallel_transfers(s->global, s->multi, s->share, + &s->more_transfers, &s->added_transfers); + if(result && !s->result) + s->result = result; + } + } + +#if DEBUG_UV + fprintf(tool_stderr, "DONE parallel_event -> %d, mcode=%d, %d running, " + "%d more\n", + s->result, s->mcode, uv.s->still_running, s->more_transfers); +#endif + return s->result; } #endif diff --git a/tests/http/test_02_download.py b/tests/http/test_02_download.py index ff6a0bd146..55d0b0ce0a 100644 --- a/tests/http/test_02_download.py +++ b/tests/http/test_02_download.py @@ -87,18 +87,19 @@ class TestDownload: r.check_response(http_status=200, count=100, connect_count=1) # download 100 files parallel - @pytest.mark.parametrize("proto", ['h2', 'h3']) - def test_02_04_download_100_parallel(self, env: Env, + @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) + def test_02_04_download_20_parallel(self, env: Env, httpd, nghttpx, repeat, proto): if proto == 'h3' and not env.have_h3(): pytest.skip("h3 not supported") - max_parallel = 50 + count = 20 + max_parallel = 10 curl = CurlClient(env=env) - urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-99]' + urln = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]' r = curl.http_download(urls=[urln], alpn_proto=proto, extra_args=[ '--parallel', '--parallel-max', f'{max_parallel}' ]) - r.check_response(http_status=200, count=100) + r.check_response(http_status=200, count=count) if proto == 'http/1.1': # http/1.1 parallel transfers will open multiple connections assert r.total_connects > 1, r.dump_logs() diff --git a/tests/http/testenv/client.py b/tests/http/testenv/client.py index e8ffb040aa..0a0030c75e 100644 --- a/tests/http/testenv/client.py +++ b/tests/http/testenv/client.py @@ -50,7 +50,7 @@ class LocalClient: self.name = name self.path = os.path.join(env.project_dir, f'tests/http/clients/{name}') self.env = env - self._run_env= run_env + self._run_env = run_env self._timeout = timeout if timeout else env.test_timeout self._curl = os.environ['CURL'] if 'CURL' in os.environ else env.curl self._run_dir = run_dir if run_dir else os.path.join(env.gen_dir, name) @@ -92,12 +92,18 @@ class LocalClient: exception = None myargs = [self.path] myargs.extend(args) + run_env = None + if self._run_env: + run_env = self._run_env.copy() + for key in ['CURL_DEBUG']: + if key in os.environ and key not in run_env: + run_env[key] = os.environ[key] try: with open(self._stdoutfile, 'w') as cout: with open(self._stderrfile, 'w') as cerr: p = subprocess.run(myargs, stderr=cerr, stdout=cout, cwd=self._run_dir, shell=False, - input=None, env=self._run_env, + input=None, env=run_env, timeout=self._timeout) exitcode = p.returncode except subprocess.TimeoutExpired: diff --git a/tests/http/testenv/curl.py b/tests/http/testenv/curl.py index f89b2c9a8e..3201850387 100644 --- a/tests/http/testenv/curl.py +++ b/tests/http/testenv/curl.py @@ -824,6 +824,9 @@ class CurlClient: urls = [urls] args = [self._curl, "-s", "--path-as-is"] + if 'CURL_TEST_EVENT' in os.environ: + args.append('--test-event') + if with_headers: args.extend(["-D", self._headerfile]) if def_tracing is not False and not self._silent: