http/2: simplify eos/blocked handling
- rely on the new flush to handle blocked sends. No longer do simulated EAGAIN on (partially) blocked sends with their need to handle repeats. - fix some debug handling CURL_SMALLREQSEND env var - add some assertings in request.c for affirming we do it right - enhance assertion output in test_16 for easier analysis Closes #14435
This commit is contained in:
parent
1e9c1e8f2e
commit
0bc5b2e37c
53
lib/http2.c
53
lib/http2.c
@ -207,8 +207,6 @@ struct h2_stream_ctx {
|
|||||||
BIT(reset); /* TRUE on stream reset */
|
BIT(reset); /* TRUE on stream reset */
|
||||||
BIT(close_handled); /* TRUE if stream closure is handled by libcurl */
|
BIT(close_handled); /* TRUE if stream closure is handled by libcurl */
|
||||||
BIT(bodystarted);
|
BIT(bodystarted);
|
||||||
BIT(send_closed); /* transfer is done sending, we might have still
|
|
||||||
buffered data in stream->sendbuf to upload. */
|
|
||||||
BIT(body_eos); /* the complete body has been added to `sendbuf` and
|
BIT(body_eos); /* the complete body has been added to `sendbuf` and
|
||||||
* is being/has been processed from there. */
|
* is being/has been processed from there. */
|
||||||
};
|
};
|
||||||
@ -347,10 +345,10 @@ static void drain_stream(struct Curl_cfilter *cf,
|
|||||||
|
|
||||||
(void)cf;
|
(void)cf;
|
||||||
bits = CURL_CSELECT_IN;
|
bits = CURL_CSELECT_IN;
|
||||||
if(!stream->send_closed &&
|
if(!stream->closed &&
|
||||||
(!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
|
(!stream->body_eos || !Curl_bufq_is_empty(&stream->sendbuf)))
|
||||||
bits |= CURL_CSELECT_OUT;
|
bits |= CURL_CSELECT_OUT;
|
||||||
if(data->state.select_bits != bits) {
|
if(stream->closed || (data->state.select_bits != bits)) {
|
||||||
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
|
CURL_TRC_CF(data, cf, "[%d] DRAIN select_bits=%x",
|
||||||
stream->id, bits);
|
stream->id, bits);
|
||||||
data->state.select_bits = bits;
|
data->state.select_bits = bits;
|
||||||
@ -406,7 +404,6 @@ static void http2_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
|
|||||||
stream->id);
|
stream->id);
|
||||||
stream->closed = TRUE;
|
stream->closed = TRUE;
|
||||||
stream->reset = TRUE;
|
stream->reset = TRUE;
|
||||||
stream->send_closed = TRUE;
|
|
||||||
nghttp2_submit_rst_stream(ctx->h2, NGHTTP2_FLAG_NONE,
|
nghttp2_submit_rst_stream(ctx->h2, NGHTTP2_FLAG_NONE,
|
||||||
stream->id, NGHTTP2_STREAM_CLOSED);
|
stream->id, NGHTTP2_STREAM_CLOSED);
|
||||||
flush_egress = TRUE;
|
flush_egress = TRUE;
|
||||||
@ -1164,7 +1161,6 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
|||||||
if(frame->rst_stream.error_code) {
|
if(frame->rst_stream.error_code) {
|
||||||
stream->reset = TRUE;
|
stream->reset = TRUE;
|
||||||
}
|
}
|
||||||
stream->send_closed = TRUE;
|
|
||||||
drain_stream(cf, data, stream);
|
drain_stream(cf, data, stream);
|
||||||
break;
|
break;
|
||||||
case NGHTTP2_WINDOW_UPDATE:
|
case NGHTTP2_WINDOW_UPDATE:
|
||||||
@ -1430,7 +1426,6 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
|
|||||||
stream->error = error_code;
|
stream->error = error_code;
|
||||||
if(stream->error) {
|
if(stream->error) {
|
||||||
stream->reset = TRUE;
|
stream->reset = TRUE;
|
||||||
stream->send_closed = TRUE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if(stream->error)
|
if(stream->error)
|
||||||
@ -1742,35 +1737,6 @@ CURLcode Curl_http2_request_upgrade(struct dynbuf *req,
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
|
|
||||||
struct Curl_easy *data)
|
|
||||||
{
|
|
||||||
struct cf_h2_ctx *ctx = cf->ctx;
|
|
||||||
CURLcode result = CURLE_OK;
|
|
||||||
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
|
|
||||||
|
|
||||||
if(!ctx || !ctx->h2 || !stream)
|
|
||||||
goto out;
|
|
||||||
|
|
||||||
CURL_TRC_CF(data, cf, "[%d] data done send", stream->id);
|
|
||||||
if(!stream->send_closed) {
|
|
||||||
stream->send_closed = TRUE;
|
|
||||||
if(!Curl_bufq_is_empty(&stream->sendbuf)) {
|
|
||||||
/* TODO: if we had not seen EOS on send(), it seems the request
|
|
||||||
* is now aborted? */
|
|
||||||
/* we now know that everything that is buffered is all there is. */
|
|
||||||
stream->body_eos = TRUE;
|
|
||||||
/* resume sending here to trigger the callback to get called again so
|
|
||||||
that it can signal EOF to nghttp2 */
|
|
||||||
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
|
|
||||||
drain_stream(cf, data, stream);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
out:
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
|
static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
|
||||||
struct Curl_easy *data,
|
struct Curl_easy *data,
|
||||||
struct h2_stream_ctx *stream,
|
struct h2_stream_ctx *stream,
|
||||||
@ -2339,18 +2305,6 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
|||||||
nwritten = -1;
|
nwritten = -1;
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
else if(stream && stream->body_eos &&
|
|
||||||
(!Curl_bufq_is_empty(&stream->sendbuf) ||
|
|
||||||
!Curl_bufq_is_empty(&ctx->outbufq))) {
|
|
||||||
/* We added the last send chunk to stream->sendbuf, but were unable
|
|
||||||
* to send it all off. Either the socket EAGAINed or the HTTP/2 flow
|
|
||||||
* control prevents it. This should be a call with `eos` set and
|
|
||||||
* we CURLE_AGAIN it until we flushed everything. */
|
|
||||||
CURL_TRC_CF(data, cf, "[%d] could not flush last send chunk -> EAGAIN",
|
|
||||||
stream->id);
|
|
||||||
*err = CURLE_AGAIN;
|
|
||||||
nwritten = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if(should_close_session(ctx)) {
|
if(should_close_session(ctx)) {
|
||||||
/* nghttp2 thinks this session is done. If the stream has not been
|
/* nghttp2 thinks this session is done. If the stream has not been
|
||||||
@ -2650,9 +2604,6 @@ static CURLcode cf_h2_cntrl(struct Curl_cfilter *cf,
|
|||||||
case CF_CTRL_FLUSH:
|
case CF_CTRL_FLUSH:
|
||||||
result = cf_h2_flush(cf, data);
|
result = cf_h2_flush(cf, data);
|
||||||
break;
|
break;
|
||||||
case CF_CTRL_DATA_DONE_SEND:
|
|
||||||
result = http2_data_done_send(cf, data);
|
|
||||||
break;
|
|
||||||
case CF_CTRL_DATA_DETACH:
|
case CF_CTRL_DATA_DETACH:
|
||||||
http2_data_done(cf, data);
|
http2_data_done(cf, data);
|
||||||
break;
|
break;
|
||||||
|
|||||||
@ -182,16 +182,17 @@ static CURLcode xfer_send(struct Curl_easy *data,
|
|||||||
bool eos = FALSE;
|
bool eos = FALSE;
|
||||||
|
|
||||||
*pnwritten = 0;
|
*pnwritten = 0;
|
||||||
|
DEBUGASSERT(hds_len <= blen);
|
||||||
#ifdef DEBUGBUILD
|
#ifdef DEBUGBUILD
|
||||||
{
|
{
|
||||||
/* Allow debug builds to override this logic to force short initial
|
/* Allow debug builds to override this logic to force short initial
|
||||||
sends
|
sends */
|
||||||
*/
|
size_t body_len = blen - hds_len;
|
||||||
char *p = getenv("CURL_SMALLREQSEND");
|
char *p = getenv("CURL_SMALLREQSEND");
|
||||||
if(p) {
|
if(p) {
|
||||||
size_t altsize = (size_t)strtoul(p, NULL, 10);
|
size_t body_small = (size_t)strtoul(p, NULL, 10);
|
||||||
if(altsize && altsize < blen)
|
if(body_small && body_small < body_len)
|
||||||
blen = altsize;
|
blen = hds_len + body_small;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -267,10 +268,12 @@ static CURLcode req_set_upload_done(struct Curl_easy *data)
|
|||||||
else if(data->req.writebytecount)
|
else if(data->req.writebytecount)
|
||||||
infof(data, "upload completely sent off: %" CURL_FORMAT_CURL_OFF_T
|
infof(data, "upload completely sent off: %" CURL_FORMAT_CURL_OFF_T
|
||||||
" bytes", data->req.writebytecount);
|
" bytes", data->req.writebytecount);
|
||||||
else if(!data->req.download_done)
|
else if(!data->req.download_done) {
|
||||||
|
DEBUGASSERT(Curl_bufq_is_empty(&data->req.sendbuf));
|
||||||
infof(data, Curl_creader_total_length(data)?
|
infof(data, Curl_creader_total_length(data)?
|
||||||
"We are completely uploaded and fine" :
|
"We are completely uploaded and fine" :
|
||||||
"Request completely sent off");
|
"Request completely sent off");
|
||||||
|
}
|
||||||
|
|
||||||
return Curl_xfer_send_close(data);
|
return Curl_xfer_send_close(data);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -111,11 +111,11 @@ class TestInfo:
|
|||||||
self.check_stat(idx, s, r, dl_size=30, ul_size=0)
|
self.check_stat(idx, s, r, dl_size=30, ul_size=0)
|
||||||
|
|
||||||
def check_stat(self, idx, s, r, dl_size=None, ul_size=None):
|
def check_stat(self, idx, s, r, dl_size=None, ul_size=None):
|
||||||
self.check_stat_times(s)
|
self.check_stat_times(s, idx)
|
||||||
# we always send something
|
# we always send something
|
||||||
self.check_stat_positive(s, 'size_request')
|
self.check_stat_positive(s, idx, 'size_request')
|
||||||
# we always receive response headers
|
# we always receive response headers
|
||||||
self.check_stat_positive(s, 'size_header')
|
self.check_stat_positive(s, idx, 'size_header')
|
||||||
if ul_size is not None:
|
if ul_size is not None:
|
||||||
assert s['size_upload'] == ul_size, f'stat #{idx}\n{r.dump_logs()}' # the file we sent
|
assert s['size_upload'] == ul_size, f'stat #{idx}\n{r.dump_logs()}' # the file we sent
|
||||||
assert s['size_request'] >= s['size_upload'], \
|
assert s['size_request'] >= s['size_upload'], \
|
||||||
@ -123,15 +123,15 @@ class TestInfo:
|
|||||||
if dl_size is not None:
|
if dl_size is not None:
|
||||||
assert s['size_download'] == dl_size, f'stat #{idx}\n{r.dump_logs()}' # the file we received
|
assert s['size_download'] == dl_size, f'stat #{idx}\n{r.dump_logs()}' # the file we received
|
||||||
|
|
||||||
def check_stat_positive(self, s, key):
|
def check_stat_positive(self, s, idx, key):
|
||||||
assert key in s, f'stat "{key}" missing: {s}'
|
assert key in s, f'stat #{idx} "{key}" missing: {s}'
|
||||||
assert s[key] > 0, f'stat "{key}" not positive: {s}'
|
assert s[key] > 0, f'stat #{idx} "{key}" not positive: {s}'
|
||||||
|
|
||||||
def check_stat_zero(self, s, key):
|
def check_stat_zero(self, s, key):
|
||||||
assert key in s, f'stat "{key}" missing: {s}'
|
assert key in s, f'stat "{key}" missing: {s}'
|
||||||
assert s[key] == 0, f'stat "{key}" not zero: {s}'
|
assert s[key] == 0, f'stat "{key}" not zero: {s}'
|
||||||
|
|
||||||
def check_stat_times(self, s):
|
def check_stat_times(self, s, idx):
|
||||||
# check timings reported on a transfer for consistency
|
# check timings reported on a transfer for consistency
|
||||||
url = s['url_effective']
|
url = s['url_effective']
|
||||||
# all stat keys which reporting timings
|
# all stat keys which reporting timings
|
||||||
@ -152,7 +152,7 @@ class TestInfo:
|
|||||||
for key in zero_keys:
|
for key in zero_keys:
|
||||||
self.check_stat_zero(s, key)
|
self.check_stat_zero(s, key)
|
||||||
for key in pos_keys:
|
for key in pos_keys:
|
||||||
self.check_stat_positive(s, key)
|
self.check_stat_positive(s, idx, key)
|
||||||
# assert that all timers before "time_pretransfer" are less or equal
|
# assert that all timers before "time_pretransfer" are less or equal
|
||||||
for key in ['time_appconnect', 'time_connect', 'time_namelookup']:
|
for key in ['time_appconnect', 'time_connect', 'time_namelookup']:
|
||||||
assert s[key] < s['time_pretransfer'], f'time "{key}" larger than' \
|
assert s[key] < s['time_pretransfer'], f'time "{key}" larger than' \
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user