http2: fix upload busy loop
- Set KEEP_SEND_PAUSE when exhausting remote HTTP/2 window size of a
stream.
- Clear KEEP_SEND_PAUSE when receiving HTTP/2 window updates on a paused
stream.
- Also fix http2 send compiler warnings reported in #10449.
Prior to this change, starting in 71b7e016 which precedes 7.88.0,
libcurl may eat CPU during HTTP/2 upload.
Reported-by: Jay Satiro
Fixes https://github.com/curl/curl/issues/10449
Fixes https://github.com/curl/curl/issues/10618
Closes https://github.com/curl/curl/pull/10627
This commit is contained in:
parent
c50a6eee04
commit
d9ccc75b00
93
lib/http2.c
93
lib/http2.c
@ -942,6 +942,17 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
|
||||
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv RST", stream_id));
|
||||
stream->reset = TRUE;
|
||||
break;
|
||||
case NGHTTP2_WINDOW_UPDATE:
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv WINDOW_UPDATE", stream_id));
|
||||
if((data_s->req.keepon & KEEP_SEND_PAUSE) &&
|
||||
(data_s->req.keepon & KEEP_SEND)) {
|
||||
data_s->req.keepon &= ~KEEP_SEND_PAUSE;
|
||||
drain_this(cf, data_s);
|
||||
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] unpausing after win update",
|
||||
stream_id));
|
||||
}
|
||||
break;
|
||||
default:
|
||||
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] recv frame %x",
|
||||
stream_id, frame->hd.type));
|
||||
@ -1006,18 +1017,6 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
|
||||
return NGHTTP2_ERR_PAUSE;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* pause execution of nghttp2 if we received data for another handle
|
||||
in order to process them first. */
|
||||
if(CF_DATA_CURRENT(cf) != data_s) {
|
||||
ctx->pause_stream_id = stream_id;
|
||||
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%u] not call_data -> NGHTTP2_ERR_PAUSE",
|
||||
stream_id));
|
||||
drain_this(cf, data_s);
|
||||
return NGHTTP2_ERR_PAUSE;
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1763,7 +1762,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
goto out;
|
||||
}
|
||||
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] recv: win %u/%u",
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_recv: win %u/%u",
|
||||
stream->stream_id,
|
||||
nghttp2_session_get_local_window_size(ctx->h2),
|
||||
nghttp2_session_get_stream_local_window_size(ctx->h2,
|
||||
@ -1976,19 +1975,20 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
CURLcode result;
|
||||
struct h2h3req *hreq;
|
||||
struct cf_call_data save;
|
||||
ssize_t nwritten;
|
||||
|
||||
CF_DATA_SAVE(save, cf, data);
|
||||
DEBUGF(LOG_CF(data, cf, "send len=%zu", len));
|
||||
DEBUGF(LOG_CF(data, cf, "cf_send(len=%zu) start", len));
|
||||
|
||||
if(stream->stream_id != -1) {
|
||||
if(stream->close_handled) {
|
||||
infof(data, "stream %u closed", stream->stream_id);
|
||||
*err = CURLE_HTTP2_STREAM;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
else if(stream->closed) {
|
||||
len = http2_handle_stream_close(cf, data, stream, err);
|
||||
nwritten = http2_handle_stream_close(cf, data, stream, err);
|
||||
goto out;
|
||||
}
|
||||
/* If stream_id != -1, we have dispatched request HEADERS, and now
|
||||
@ -1998,26 +1998,24 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
rv = nghttp2_session_resume_data(ctx->h2, stream->stream_id);
|
||||
if(nghttp2_is_fatal(rv)) {
|
||||
*err = CURLE_SEND_ERROR;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
result = h2_session_send(cf, data);
|
||||
if(result) {
|
||||
*err = result;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
len -= stream->upload_len;
|
||||
|
||||
/* Nullify here because we call nghttp2_session_send() and they
|
||||
might refer to the old buffer. */
|
||||
nwritten = (ssize_t)len - (ssize_t)stream->upload_len;
|
||||
stream->upload_mem = NULL;
|
||||
stream->upload_len = 0;
|
||||
|
||||
if(should_close_session(ctx)) {
|
||||
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
||||
*err = CURLE_HTTP2;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
@ -2029,26 +2027,36 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
nghttp2_session_resume_data(ctx->h2, stream->stream_id);
|
||||
}
|
||||
|
||||
#ifdef DEBUG_HTTP2
|
||||
if(!len) {
|
||||
infof(data, "http2_send: easy %p (stream %u) win %u/%u",
|
||||
data, stream->stream_id,
|
||||
nghttp2_session_get_remote_window_size(ctx->h2),
|
||||
nghttp2_session_get_stream_remote_window_size(ctx->h2,
|
||||
stream->stream_id)
|
||||
);
|
||||
|
||||
if(!nwritten) {
|
||||
size_t rwin = nghttp2_session_get_stream_remote_window_size(ctx->h2,
|
||||
stream->stream_id);
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send: win %u/%zu",
|
||||
stream->stream_id,
|
||||
nghttp2_session_get_remote_window_size(ctx->h2), rwin));
|
||||
if(rwin == 0) {
|
||||
/* We cannot upload more as the stream's remote window size
|
||||
* is 0. We need to receive WIN_UPDATEs before we can continue.
|
||||
*/
|
||||
data->req.keepon |= KEEP_SEND_PAUSE;
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] pausing send as remote flow "
|
||||
"window is exhausted", stream->stream_id));
|
||||
}
|
||||
}
|
||||
infof(data, "http2_send returns %zu for stream %u", len,
|
||||
stream->stream_id);
|
||||
#endif
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%u] cf_send returns %zd ",
|
||||
stream->stream_id, nwritten));
|
||||
|
||||
/* handled writing BODY for open stream. */
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* Stream has not been opened yet. `buf` is expected to contain
|
||||
* request headers. */
|
||||
/* TODO: this assumes that the `buf` and `len` we are called with
|
||||
* is *all* HEADERs and no body. We have no way to determine here
|
||||
* if that is indeed the case. */
|
||||
result = Curl_pseudo_headers(data, buf, len, NULL, &hreq);
|
||||
if(result) {
|
||||
*err = result;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
nheader = hreq->entries;
|
||||
@ -2057,7 +2065,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
if(!nva) {
|
||||
Curl_pseudo_free(hreq);
|
||||
*err = CURLE_OUT_OF_MEMORY;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
else {
|
||||
@ -2104,25 +2112,28 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
DEBUGF(LOG_CF(data, cf, "send: nghttp2_submit_request error (%s)%u",
|
||||
nghttp2_strerror(stream_id), stream_id));
|
||||
*err = CURLE_SEND_ERROR;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
infof(data, "Using Stream ID: %u (easy handle %p)",
|
||||
stream_id, (void *)data);
|
||||
stream->stream_id = stream_id;
|
||||
/* See TODO above. We assume that the whole buf was consumed by
|
||||
* generating the request headers. */
|
||||
nwritten = len;
|
||||
|
||||
result = h2_session_send(cf, data);
|
||||
if(result) {
|
||||
*err = result;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
if(should_close_session(ctx)) {
|
||||
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
||||
*err = CURLE_HTTP2;
|
||||
len = -1;
|
||||
nwritten = -1;
|
||||
goto out;
|
||||
}
|
||||
|
||||
@ -2137,7 +2148,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
|
||||
out:
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
return len;
|
||||
return nwritten;
|
||||
}
|
||||
|
||||
static int cf_h2_get_select_socks(struct Curl_cfilter *cf,
|
||||
|
||||
@ -181,3 +181,41 @@ class TestUpload:
|
||||
respdata = open(curl.response_file(i)).readlines()
|
||||
assert respdata == indata
|
||||
|
||||
# PUT 100k
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
def test_07_30_put_100k(self, env: Env, httpd, nghttpx, repeat, proto):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
fdata = os.path.join(env.gen_dir, 'data-100k')
|
||||
count = 1
|
||||
curl = CurlClient(env=env)
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]'
|
||||
r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
|
||||
extra_args=['--parallel'])
|
||||
assert r.exit_code == 0, f'{r}'
|
||||
r.check_stats(count=count, exp_status=200)
|
||||
exp_data = [f'{os.path.getsize(fdata)}']
|
||||
r.check_stats(count=count, exp_status=200)
|
||||
for i in range(count):
|
||||
respdata = open(curl.response_file(i)).readlines()
|
||||
assert respdata == exp_data
|
||||
|
||||
# PUT 10m
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
def test_07_31_put_10m(self, env: Env, httpd, nghttpx, repeat, proto):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
fdata = os.path.join(env.gen_dir, 'data-10m')
|
||||
count = 1
|
||||
curl = CurlClient(env=env)
|
||||
url = f'https://{env.authority_for(env.domain1, proto)}/curltest/put?id=[0-{count-1}]&chunk_delay=10ms'
|
||||
r = curl.http_put(urls=[url], fdata=fdata, alpn_proto=proto,
|
||||
extra_args=['--parallel'])
|
||||
assert r.exit_code == 0, f'{r}'
|
||||
r.check_stats(count=count, exp_status=200)
|
||||
exp_data = [f'{os.path.getsize(fdata)}']
|
||||
r.check_stats(count=count, exp_status=200)
|
||||
for i in range(count):
|
||||
respdata = open(curl.response_file(i)).readlines()
|
||||
assert respdata == exp_data
|
||||
|
||||
|
||||
@ -270,6 +270,29 @@ class CurlClient:
|
||||
with_stats=with_stats,
|
||||
with_headers=with_headers)
|
||||
|
||||
def http_put(self, urls: List[str], data=None, fdata=None,
|
||||
alpn_proto: Optional[str] = None,
|
||||
with_stats: bool = True,
|
||||
with_headers: bool = False,
|
||||
extra_args: Optional[List[str]] = None):
|
||||
if extra_args is None:
|
||||
extra_args = []
|
||||
if fdata is not None:
|
||||
extra_args.extend(['-T', fdata])
|
||||
elif data is not None:
|
||||
extra_args.extend(['-T', '-'])
|
||||
extra_args.extend([
|
||||
'-o', 'download_#1.data',
|
||||
])
|
||||
if with_stats:
|
||||
extra_args.extend([
|
||||
'-w', '%{json}\\n'
|
||||
])
|
||||
return self._raw(urls, intext=data,
|
||||
alpn_proto=alpn_proto, options=extra_args,
|
||||
with_stats=with_stats,
|
||||
with_headers=with_headers)
|
||||
|
||||
def response_file(self, idx: int):
|
||||
return os.path.join(self._run_dir, f'download_{idx}.data')
|
||||
|
||||
@ -303,7 +326,7 @@ class CurlClient:
|
||||
duration=datetime.now() - start,
|
||||
with_stats=with_stats)
|
||||
|
||||
def _raw(self, urls, timeout=10, options=None, insecure=False,
|
||||
def _raw(self, urls, intext='', timeout=10, options=None, insecure=False,
|
||||
alpn_proto: Optional[str] = None,
|
||||
force_resolve=True,
|
||||
with_stats=False,
|
||||
@ -312,7 +335,7 @@ class CurlClient:
|
||||
urls=urls, timeout=timeout, options=options, insecure=insecure,
|
||||
alpn_proto=alpn_proto, force_resolve=force_resolve,
|
||||
with_headers=with_headers)
|
||||
r = self._run(args, with_stats=with_stats)
|
||||
r = self._run(args, intext=intext, with_stats=with_stats)
|
||||
if r.exit_code == 0 and with_headers:
|
||||
self._parse_headerfile(self._headerfile, r=r)
|
||||
if r.json:
|
||||
|
||||
@ -325,6 +325,9 @@ class Httpd:
|
||||
f' <Location /curltest/echo>',
|
||||
f' SetHandler curltest-echo',
|
||||
f' </Location>',
|
||||
f' <Location /curltest/put>',
|
||||
f' SetHandler curltest-put',
|
||||
f' </Location>',
|
||||
f' <Location /curltest/tweak>',
|
||||
f' SetHandler curltest-tweak',
|
||||
f' </Location>',
|
||||
|
||||
@ -35,6 +35,7 @@
|
||||
|
||||
static void curltest_hooks(apr_pool_t *pool);
|
||||
static int curltest_echo_handler(request_rec *r);
|
||||
static int curltest_put_handler(request_rec *r);
|
||||
static int curltest_tweak_handler(request_rec *r);
|
||||
|
||||
AP_DECLARE_MODULE(curltest) = {
|
||||
@ -81,6 +82,7 @@ static void curltest_hooks(apr_pool_t *pool)
|
||||
|
||||
/* curl test handlers */
|
||||
ap_hook_handler(curltest_echo_handler, NULL, NULL, APR_HOOK_MIDDLE);
|
||||
ap_hook_handler(curltest_put_handler, NULL, NULL, APR_HOOK_MIDDLE);
|
||||
ap_hook_handler(curltest_tweak_handler, NULL, NULL, APR_HOOK_MIDDLE);
|
||||
}
|
||||
|
||||
@ -229,9 +231,9 @@ static int curltest_echo_handler(request_rec *r)
|
||||
rv = ap_pass_brigade(r->output_filters, bb);
|
||||
|
||||
cleanup:
|
||||
if(rv == APR_SUCCESS
|
||||
|| r->status != HTTP_OK
|
||||
|| c->aborted) {
|
||||
if(rv == APR_SUCCESS ||
|
||||
r->status != HTTP_OK ||
|
||||
c->aborted) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "echo_handler: done");
|
||||
return OK;
|
||||
}
|
||||
@ -409,3 +411,104 @@ cleanup:
|
||||
}
|
||||
return AP_FILTER_ERROR;
|
||||
}
|
||||
|
||||
static int curltest_put_handler(request_rec *r)
|
||||
{
|
||||
conn_rec *c = r->connection;
|
||||
apr_bucket_brigade *bb;
|
||||
apr_bucket *b;
|
||||
apr_status_t rv;
|
||||
char buffer[16*1024];
|
||||
const char *ct;
|
||||
apr_off_t rbody_len = 0;
|
||||
const char *request_id = "none";
|
||||
apr_time_t chunk_delay = 0;
|
||||
apr_array_header_t *args = NULL;
|
||||
long l;
|
||||
int i;
|
||||
|
||||
if(strcmp(r->handler, "curltest-put")) {
|
||||
return DECLINED;
|
||||
}
|
||||
if(r->method_number != M_PUT) {
|
||||
return DECLINED;
|
||||
}
|
||||
|
||||
if(r->args) {
|
||||
args = apr_cstr_split(r->args, "&", 1, r->pool);
|
||||
for(i = 0; i < args->nelts; ++i) {
|
||||
char *s, *val, *arg = APR_ARRAY_IDX(args, i, char*);
|
||||
s = strchr(arg, '=');
|
||||
if(s) {
|
||||
*s = '\0';
|
||||
val = s + 1;
|
||||
if(!strcmp("id", arg)) {
|
||||
/* just an id for repeated requests with curl's url globbing */
|
||||
request_id = val;
|
||||
continue;
|
||||
}
|
||||
else if(!strcmp("chunk_delay", arg)) {
|
||||
rv = duration_parse(&chunk_delay, val, "s");
|
||||
if(APR_SUCCESS == rv) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "query parameter not "
|
||||
"understood: '%s' in %s",
|
||||
arg, r->args);
|
||||
ap_die(HTTP_BAD_REQUEST, r);
|
||||
return OK;
|
||||
}
|
||||
}
|
||||
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: processing");
|
||||
r->status = 200;
|
||||
r->clength = -1;
|
||||
r->chunked = 1;
|
||||
apr_table_unset(r->headers_out, "Content-Length");
|
||||
/* Discourage content-encodings */
|
||||
apr_table_unset(r->headers_out, "Content-Encoding");
|
||||
apr_table_setn(r->subprocess_env, "no-brotli", "1");
|
||||
apr_table_setn(r->subprocess_env, "no-gzip", "1");
|
||||
|
||||
ct = apr_table_get(r->headers_in, "content-type");
|
||||
ap_set_content_type(r, ct? ct : "text/plain");
|
||||
|
||||
bb = apr_brigade_create(r->pool, c->bucket_alloc);
|
||||
/* copy any request body into the response */
|
||||
if((rv = ap_setup_client_block(r, REQUEST_CHUNKED_DECHUNK))) goto cleanup;
|
||||
if(ap_should_client_block(r)) {
|
||||
while(0 < (l = ap_get_client_block(r, &buffer[0], sizeof(buffer)))) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r,
|
||||
"put_handler: read %ld bytes from request body", l);
|
||||
if(chunk_delay) {
|
||||
apr_sleep(chunk_delay);
|
||||
}
|
||||
rbody_len += l;
|
||||
}
|
||||
}
|
||||
/* we are done */
|
||||
rv = apr_brigade_printf(bb, NULL, NULL, "%"APR_OFF_T_FMT, rbody_len);
|
||||
if(APR_SUCCESS != rv) goto cleanup;
|
||||
b = apr_bucket_eos_create(c->bucket_alloc);
|
||||
APR_BRIGADE_INSERT_TAIL(bb, b);
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, r, "put_handler: request read");
|
||||
|
||||
rv = ap_pass_brigade(r->output_filters, bb);
|
||||
|
||||
cleanup:
|
||||
if(rv == APR_SUCCESS
|
||||
|| r->status != HTTP_OK
|
||||
|| c->aborted) {
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler: done");
|
||||
return OK;
|
||||
}
|
||||
else {
|
||||
/* no way to know what type of error occurred */
|
||||
ap_log_rerror(APLOG_MARK, APLOG_TRACE1, rv, r, "put_handler failed");
|
||||
return AP_FILTER_ERROR;
|
||||
}
|
||||
return DECLINED;
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user