h2/h3: replace state.drain counter with state.dselect_bits
- `drain` was used by http/2 and http/3 implementations to indicate that the transfer requires send/recv independant from its socket poll state. Intended as a counter, it was used as bool flag only. - a similar mechanism exists on `connectdata->cselect_bits` where specific protocols can indicate something similar, only for the whole connection. - `cselect_bits` are cleard in transfer.c on use and, importantly, also set when the transfer loop expended its `maxloops` tries. `drain` was not cleared by transfer and the http2/3 implementations had to take care of that. - `dselect_bits` is cleared *and* set by the transfer loop. http2/3 does no longer clear it, only set when new events happen. This change unifies the handling of socket poll overrides, extending `cselect_bits` by a easy handle specific value and a common treatment in transfers. Closes #11005
This commit is contained in:
parent
a97e4eb95f
commit
cab2d56ea5
109
lib/http2.c
109
lib/http2.c
@ -154,34 +154,6 @@ static void cf_h2_ctx_free(struct cf_h2_ctx *ctx)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This specific transfer on this connection has been "drained".
|
||||
*/
|
||||
static void drained_transfer(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
if(data->state.drain) {
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
DEBUGASSERT(ctx->drain_total > 0);
|
||||
ctx->drain_total--;
|
||||
data->state.drain = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Mark this transfer to get "drained".
|
||||
*/
|
||||
static void drain_this(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
if(!data->state.drain) {
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
data->state.drain = 1;
|
||||
ctx->drain_total++;
|
||||
DEBUGASSERT(ctx->drain_total > 0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* All about the H3 internals of a stream
|
||||
*/
|
||||
@ -213,6 +185,25 @@ struct stream_ctx {
|
||||
#define H2_STREAM_ID(d) (H2_STREAM_CTX(d)? \
|
||||
H2_STREAM_CTX(d)->id : -2)
|
||||
|
||||
/*
|
||||
* Mark this transfer to get "drained".
|
||||
*/
|
||||
static void drain_stream(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct stream_ctx *stream)
|
||||
{
|
||||
int bits;
|
||||
|
||||
(void)cf;
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(stream->upload_left)
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.dselect_bits != bits) {
|
||||
data->state.dselect_bits = bits;
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
}
|
||||
|
||||
static CURLcode http2_data_setup(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct stream_ctx **pstream)
|
||||
@ -276,8 +267,6 @@ static void http2_data_done(struct Curl_cfilter *cf,
|
||||
(void)nghttp2_session_send(ctx->h2);
|
||||
}
|
||||
|
||||
drained_transfer(cf, data);
|
||||
|
||||
/* -1 means unassigned and 0 means cleared */
|
||||
if(nghttp2_session_get_stream_user_data(ctx->h2, stream->id)) {
|
||||
int rv = nghttp2_session_set_stream_user_data(ctx->h2,
|
||||
@ -515,8 +504,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
|
||||
while(Curl_bufq_peek(&ctx->inbufq, &buf, &blen)) {
|
||||
|
||||
rv = nghttp2_session_mem_recv(ctx->h2, (const uint8_t *)buf, blen);
|
||||
DEBUGF(LOG_CF(data, cf,
|
||||
"fed %zu bytes from nw to nghttp2 -> %zd", blen, rv));
|
||||
if(rv < 0) {
|
||||
failf(data,
|
||||
"process_pending_input: nghttp2_session_mem_recv() returned "
|
||||
@ -526,7 +513,6 @@ static int h2_process_pending_input(struct Curl_cfilter *cf,
|
||||
}
|
||||
Curl_bufq_skip(&ctx->inbufq, (size_t)rv);
|
||||
if(Curl_bufq_is_empty(&ctx->inbufq)) {
|
||||
DEBUGF(LOG_CF(data, cf, "all data in connection buffer processed"));
|
||||
break;
|
||||
}
|
||||
else {
|
||||
@ -975,8 +961,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
||||
}
|
||||
}
|
||||
if(frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
|
||||
drain_this(cf, data);
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
drain_stream(cf, data, stream);
|
||||
}
|
||||
break;
|
||||
case NGHTTP2_HEADERS:
|
||||
@ -1005,10 +990,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
||||
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] %zu header bytes",
|
||||
stream_id, Curl_bufq_len(&stream->recvbuf)));
|
||||
if(CF_DATA_CURRENT(cf) != data) {
|
||||
drain_this(cf, data);
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
drain_stream(cf, data, stream);
|
||||
break;
|
||||
case NGHTTP2_PUSH_PROMISE:
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv PUSH_PROMISE", stream_id));
|
||||
@ -1031,16 +1013,14 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv RST", stream_id));
|
||||
stream->closed = TRUE;
|
||||
stream->reset = TRUE;
|
||||
drain_this(cf, data);
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
drain_stream(cf, data, stream);
|
||||
break;
|
||||
case NGHTTP2_WINDOW_UPDATE:
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] recv WINDOW_UPDATE", stream_id));
|
||||
if((data->req.keepon & KEEP_SEND_HOLD) &&
|
||||
(data->req.keepon & KEEP_SEND)) {
|
||||
data->req.keepon &= ~KEEP_SEND_HOLD;
|
||||
drain_this(cf, data);
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
drain_stream(cf, data, stream);
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] un-holding after win update",
|
||||
stream_id));
|
||||
}
|
||||
@ -1156,10 +1136,7 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
|
||||
}
|
||||
|
||||
/* if we receive data for another handle, wake that up */
|
||||
if(CF_DATA_CURRENT(cf) != data_s) {
|
||||
drain_this(cf, data_s);
|
||||
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
drain_stream(cf, data_s, stream);
|
||||
|
||||
DEBUGASSERT((size_t)nwritten == len);
|
||||
DEBUGF(LOG_CF(data_s, cf, "[h2sid=%d] %zd/%zu DATA recvd, "
|
||||
@ -1196,10 +1173,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
|
||||
if(stream->error)
|
||||
stream->reset = TRUE;
|
||||
|
||||
if(CF_DATA_CURRENT(cf) != data_s) {
|
||||
drain_this(cf, data_s);
|
||||
Curl_expire(data_s, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
drain_stream(cf, data_s, stream);
|
||||
|
||||
/* remove `data_s` from the nghttp2 stream */
|
||||
rv = nghttp2_session_set_stream_user_data(session, stream_id, 0);
|
||||
@ -1529,7 +1503,7 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
|
||||
/* resume sending here to trigger the callback to get called again so
|
||||
that it can signal EOF to nghttp2 */
|
||||
(void)nghttp2_session_resume_data(ctx->h2, stream->id);
|
||||
drain_this(cf, data);
|
||||
drain_stream(cf, data, stream);
|
||||
}
|
||||
|
||||
out:
|
||||
@ -1543,14 +1517,17 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
|
||||
struct stream_ctx *stream = H2_STREAM_CTX(data);
|
||||
ssize_t rv = 0;
|
||||
|
||||
drained_transfer(cf, data);
|
||||
|
||||
if(stream->error == NGHTTP2_REFUSED_STREAM) {
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] REFUSED_STREAM, try again on a new "
|
||||
"connection", stream->id));
|
||||
connclose(cf->conn, "REFUSED_STREAM"); /* don't use this anymore */
|
||||
data->state.refused_stream = TRUE;
|
||||
*err = CURLE_RECV_ERROR; /* trigger Curl_retry_request() later */
|
||||
*err = CURLE_SEND_ERROR; /* trigger Curl_retry_request() later */
|
||||
return -1;
|
||||
}
|
||||
else if(stream->reset) {
|
||||
failf(data, "HTTP/2 stream %u was reset", stream->id);
|
||||
*err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
|
||||
return -1;
|
||||
}
|
||||
else if(stream->error != NGHTTP2_NO_ERROR) {
|
||||
@ -1560,11 +1537,6 @@ static ssize_t http2_handle_stream_close(struct Curl_cfilter *cf,
|
||||
*err = CURLE_HTTP2_STREAM;
|
||||
return -1;
|
||||
}
|
||||
else if(stream->reset) {
|
||||
failf(data, "HTTP/2 stream %u was reset", stream->id);
|
||||
*err = stream->bodystarted? CURLE_PARTIAL_FILE : CURLE_RECV_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(!stream->bodystarted) {
|
||||
failf(data, "HTTP/2 stream %u was closed cleanly, but before getting "
|
||||
@ -1691,7 +1663,6 @@ static ssize_t stream_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
ssize_t nread = -1;
|
||||
|
||||
*err = CURLE_AGAIN;
|
||||
drained_transfer(cf, data);
|
||||
if(!Curl_bufq_is_empty(&stream->recvbuf)) {
|
||||
nread = Curl_bufq_read(&stream->recvbuf,
|
||||
(unsigned char *)buf, len, err);
|
||||
@ -1755,8 +1726,8 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
|
||||
}
|
||||
|
||||
nread = Curl_bufq_slurp(&ctx->inbufq, nw_in_reader, cf, &result);
|
||||
DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
|
||||
Curl_bufq_len(&ctx->inbufq), nread, result));
|
||||
/* DEBUGF(LOG_CF(data, cf, "read %zd bytes nw data -> %zd, %d",
|
||||
Curl_bufq_len(&ctx->inbufq), nread, result)); */
|
||||
if(nread < 0) {
|
||||
if(result != CURLE_AGAIN) {
|
||||
failf(data, "Failed receiving HTTP2 data");
|
||||
@ -1832,7 +1803,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
if(stream->closed) {
|
||||
DEBUGF(LOG_CF(data, cf, "[h2sid=%d] closed stream, set drain",
|
||||
stream->id));
|
||||
drain_this(cf, data);
|
||||
drain_stream(cf, data, stream);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2040,9 +2011,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
}
|
||||
|
||||
if(should_close_session(ctx)) {
|
||||
if(stream->closed) {
|
||||
nwritten = http2_handle_stream_close(cf, data, err);
|
||||
}
|
||||
else {
|
||||
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
||||
*err = CURLE_HTTP2;
|
||||
nwritten = -1;
|
||||
}
|
||||
goto out;
|
||||
}
|
||||
|
||||
@ -2085,9 +2061,14 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
}
|
||||
|
||||
if(should_close_session(ctx)) {
|
||||
if(stream->closed) {
|
||||
nwritten = http2_handle_stream_close(cf, data, err);
|
||||
}
|
||||
else {
|
||||
DEBUGF(LOG_CF(data, cf, "send: nothing to do in this session"));
|
||||
*err = CURLE_HTTP2;
|
||||
nwritten = -1;
|
||||
}
|
||||
goto out;
|
||||
}
|
||||
}
|
||||
|
||||
@ -753,7 +753,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
|
||||
|
||||
if(maxloops <= 0) {
|
||||
/* we mark it as read-again-please */
|
||||
conn->cselect_bits = CURL_CSELECT_IN;
|
||||
data->state.dselect_bits = CURL_CSELECT_IN;
|
||||
*comeback = TRUE;
|
||||
}
|
||||
|
||||
@ -1065,16 +1065,22 @@ CURLcode Curl_readwrite(struct connectdata *conn,
|
||||
CURLcode result;
|
||||
struct curltime now;
|
||||
int didwhat = 0;
|
||||
int select_bits;
|
||||
|
||||
|
||||
if(data->state.dselect_bits) {
|
||||
select_bits = data->state.dselect_bits;
|
||||
data->state.dselect_bits = 0;
|
||||
}
|
||||
else if(conn->cselect_bits) {
|
||||
select_bits = conn->cselect_bits;
|
||||
conn->cselect_bits = 0;
|
||||
}
|
||||
else {
|
||||
curl_socket_t fd_read;
|
||||
curl_socket_t fd_write;
|
||||
int select_res = conn->cselect_bits;
|
||||
|
||||
conn->cselect_bits = 0;
|
||||
|
||||
/* only use the proper socket if the *_HOLD bit is not set simultaneously as
|
||||
then we are in rate limiting state in that transfer direction */
|
||||
|
||||
/* only use the proper socket if the *_HOLD bit is not set simultaneously
|
||||
as then we are in rate limiting state in that transfer direction */
|
||||
if((k->keepon & KEEP_RECVBITS) == KEEP_RECV)
|
||||
fd_read = conn->sockfd;
|
||||
else
|
||||
@ -1085,20 +1091,10 @@ CURLcode Curl_readwrite(struct connectdata *conn,
|
||||
else
|
||||
fd_write = CURL_SOCKET_BAD;
|
||||
|
||||
#if defined(USE_HTTP2) || defined(USE_HTTP3)
|
||||
if(data->state.drain) {
|
||||
select_res |= CURL_CSELECT_IN;
|
||||
DEBUGF(infof(data, "Curl_readwrite: forcibly told to drain data"));
|
||||
if((k->keepon & KEEP_SENDBITS) == KEEP_SEND)
|
||||
select_res |= CURL_CSELECT_OUT;
|
||||
select_bits = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
|
||||
}
|
||||
#endif
|
||||
|
||||
if(!select_res) /* Call for select()/poll() only, if read/write/error
|
||||
status is not known. */
|
||||
select_res = Curl_socket_check(fd_read, CURL_SOCKET_BAD, fd_write, 0);
|
||||
|
||||
if(select_res == CURL_CSELECT_ERR) {
|
||||
if(select_bits == CURL_CSELECT_ERR) {
|
||||
failf(data, "select/poll returned error");
|
||||
result = CURLE_SEND_ERROR;
|
||||
goto out;
|
||||
@ -1106,7 +1102,7 @@ CURLcode Curl_readwrite(struct connectdata *conn,
|
||||
|
||||
#ifdef USE_HYPER
|
||||
if(conn->datastream) {
|
||||
result = conn->datastream(data, conn, &didwhat, done, select_res);
|
||||
result = conn->datastream(data, conn, &didwhat, done, select_bits);
|
||||
if(result || *done)
|
||||
goto out;
|
||||
}
|
||||
@ -1115,14 +1111,14 @@ CURLcode Curl_readwrite(struct connectdata *conn,
|
||||
/* We go ahead and do a read if we have a readable socket or if
|
||||
the stream was rewound (in which case we have data in a
|
||||
buffer) */
|
||||
if((k->keepon & KEEP_RECV) && (select_res & CURL_CSELECT_IN)) {
|
||||
if((k->keepon & KEEP_RECV) && (select_bits & CURL_CSELECT_IN)) {
|
||||
result = readwrite_data(data, conn, k, &didwhat, done, comeback);
|
||||
if(result || *done)
|
||||
goto out;
|
||||
}
|
||||
|
||||
/* If we still have writing to do, we check if we have a writable socket. */
|
||||
if((k->keepon & KEEP_SEND) && (select_res & CURL_CSELECT_OUT)) {
|
||||
if((k->keepon & KEEP_SEND) && (select_bits & CURL_CSELECT_OUT)) {
|
||||
/* write */
|
||||
|
||||
result = readwrite_upload(data, conn, &didwhat);
|
||||
|
||||
@ -1319,6 +1319,8 @@ struct UrlState {
|
||||
char *scratch; /* huge buffer[set.buffer_size*2] for upload CRLF replacing */
|
||||
long followlocation; /* redirect counter */
|
||||
int requests; /* request counter: redirects + authentication retakes */
|
||||
int dselect_bits; /* != 0 -> bitmask of socket events for this transfer
|
||||
* overriding anything the socket may report */
|
||||
#ifdef HAVE_SIGNAL
|
||||
/* storage for the previous bag^H^H^HSIGPIPE signal handler :-) */
|
||||
void (*prev_signal)(int sig);
|
||||
@ -1374,9 +1376,6 @@ struct UrlState {
|
||||
curl_off_t infilesize; /* size of file to upload, -1 means unknown.
|
||||
Copied from set.filesize at start of operation */
|
||||
#if defined(USE_HTTP2) || defined(USE_HTTP3)
|
||||
size_t drain; /* Increased when this stream has data to read, even if its
|
||||
socket is not necessarily is readable. Decreased when
|
||||
checked. */
|
||||
struct Curl_data_priority priority; /* shallow copy of data->set */
|
||||
#endif
|
||||
|
||||
|
||||
@ -189,12 +189,33 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
}
|
||||
}
|
||||
|
||||
static void notify_drain(struct Curl_cfilter *cf,
|
||||
static void drain_stream_from_other_thread(struct Curl_easy *data,
|
||||
struct stream_ctx *stream)
|
||||
{
|
||||
int bits;
|
||||
|
||||
/* risky */
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(stream && !stream->upload_done)
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.dselect_bits != bits) {
|
||||
data->state.dselect_bits = bits;
|
||||
/* cannot expire from other thread */
|
||||
}
|
||||
}
|
||||
|
||||
static void drain_stream(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
struct stream_ctx *stream = H3_STREAM_CTX(data);
|
||||
int bits;
|
||||
|
||||
(void)cf;
|
||||
if(!data->state.drain) {
|
||||
data->state.drain = 1;
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(stream && !stream->upload_done)
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.dselect_bits != bits) {
|
||||
data->state.dselect_bits = bits;
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
}
|
||||
@ -350,7 +371,7 @@ static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
|
||||
}
|
||||
}
|
||||
|
||||
data->state.drain = 1;
|
||||
drain_stream_from_other_thread(data, stream);
|
||||
msh3_lock_release(&stream->recv_lock);
|
||||
}
|
||||
|
||||
@ -469,7 +490,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
|
||||
nread = 0;
|
||||
|
||||
out:
|
||||
data->state.drain = 0;
|
||||
return nread;
|
||||
}
|
||||
|
||||
@ -508,7 +528,6 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
|
||||
if(stream->recv_error) {
|
||||
failf(data, "request aborted");
|
||||
data->state.drain = 0;
|
||||
*err = stream->recv_error;
|
||||
goto out;
|
||||
}
|
||||
@ -522,10 +541,8 @@ static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
len, nread, *err));
|
||||
if(nread < 0)
|
||||
goto out;
|
||||
if(!Curl_bufq_is_empty(&stream->recvbuf) ||
|
||||
stream->closed) {
|
||||
notify_drain(cf, data);
|
||||
}
|
||||
if(stream->closed)
|
||||
drain_stream(cf, data);
|
||||
}
|
||||
else if(stream->closed) {
|
||||
nread = recv_closed_stream(cf, data, err);
|
||||
@ -669,15 +686,14 @@ static int cf_msh3_get_select_socks(struct Curl_cfilter *cf,
|
||||
|
||||
if(stream->recv_error) {
|
||||
bitmap |= GETSOCK_READSOCK(0);
|
||||
notify_drain(cf, data);
|
||||
drain_stream(cf, data);
|
||||
}
|
||||
else if(stream->req) {
|
||||
bitmap |= GETSOCK_READSOCK(0);
|
||||
notify_drain(cf, data);
|
||||
drain_stream(cf, data);
|
||||
}
|
||||
}
|
||||
DEBUGF(LOG_CF(data, cf, "select_sock %u -> %d",
|
||||
(uint32_t)data->state.drain, bitmap));
|
||||
DEBUGF(LOG_CF(data, cf, "select_sock -> %d", bitmap));
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
return bitmap;
|
||||
}
|
||||
@ -698,6 +714,8 @@ static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
|
||||
Curl_bufq_len(&stream->recvbuf)));
|
||||
pending = !Curl_bufq_is_empty(&stream->recvbuf);
|
||||
msh3_lock_release(&stream->recv_lock);
|
||||
if(pending)
|
||||
drain_stream(cf, (struct Curl_easy *)data);
|
||||
}
|
||||
|
||||
CF_DATA_RESTORE(cf, save);
|
||||
|
||||
@ -709,11 +709,6 @@ static void report_consumed_data(struct Curl_cfilter *cf,
|
||||
consumed);
|
||||
ngtcp2_conn_extend_max_offset(ctx->qconn, consumed);
|
||||
}
|
||||
if(!stream->closed && data->state.drain &&
|
||||
Curl_bufq_is_empty(&stream->recvbuf)) {
|
||||
/* nothing buffered any more */
|
||||
data->state.drain = 0;
|
||||
}
|
||||
}
|
||||
|
||||
static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
|
||||
@ -995,12 +990,18 @@ static int cf_ngtcp2_get_select_socks(struct Curl_cfilter *cf,
|
||||
return rv;
|
||||
}
|
||||
|
||||
static void notify_drain(struct Curl_cfilter *cf,
|
||||
static void drain_stream(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
struct stream_ctx *stream = H3_STREAM_CTX(data);
|
||||
int bits;
|
||||
|
||||
(void)cf;
|
||||
if(!data->state.drain) {
|
||||
data->state.drain = 1;
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(stream && !stream->upload_done)
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.dselect_bits != bits) {
|
||||
data->state.dselect_bits = bits;
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
}
|
||||
@ -1028,7 +1029,7 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
|
||||
if(app_error_code == NGHTTP3_H3_INTERNAL_ERROR) {
|
||||
stream->reset = TRUE;
|
||||
}
|
||||
notify_drain(cf, data);
|
||||
drain_stream(cf, data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1082,9 +1083,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
|
||||
(void)stream3_id;
|
||||
|
||||
result = write_resp_raw(cf, data, buf, buflen, TRUE);
|
||||
if(CF_DATA_CURRENT(cf) != data) {
|
||||
notify_drain(cf, data);
|
||||
}
|
||||
drain_stream(cf, data);
|
||||
return result? -1 : 0;
|
||||
}
|
||||
|
||||
@ -1129,9 +1128,7 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t stream_id,
|
||||
if(stream->status_code / 100 != 1) {
|
||||
stream->resp_hds_complete = TRUE;
|
||||
}
|
||||
if(CF_DATA_CURRENT(cf) != data) {
|
||||
notify_drain(cf, data);
|
||||
}
|
||||
drain_stream(cf, data);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1358,7 +1355,6 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
|
||||
nread = 0;
|
||||
|
||||
out:
|
||||
data->state.drain = 0;
|
||||
return nread;
|
||||
}
|
||||
|
||||
@ -1413,16 +1409,13 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
}
|
||||
|
||||
if(nread > 0) {
|
||||
if(1 || !Curl_bufq_is_empty(&stream->recvbuf)) {
|
||||
notify_drain(cf, data);
|
||||
}
|
||||
drain_stream(cf, data);
|
||||
}
|
||||
else {
|
||||
if(stream->closed) {
|
||||
nread = recv_closed_stream(cf, data, err);
|
||||
goto out;
|
||||
}
|
||||
data->state.drain = FALSE;
|
||||
*err = CURLE_AGAIN;
|
||||
nread = -1;
|
||||
}
|
||||
@ -1468,7 +1461,7 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
|
||||
if((data->req.keepon & KEEP_SEND_HOLD) &&
|
||||
(data->req.keepon & KEEP_SEND)) {
|
||||
data->req.keepon &= ~KEEP_SEND_HOLD;
|
||||
notify_drain(cf, data);
|
||||
drain_stream(cf, data);
|
||||
DEBUGF(LOG_CF(data, cf, "[h3sid=%" PRId64 "] unpausing acks",
|
||||
stream_id));
|
||||
}
|
||||
|
||||
@ -299,11 +299,18 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
}
|
||||
}
|
||||
|
||||
static void notify_drain(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
static void drain_stream(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
struct stream_ctx *stream = H3_STREAM_CTX(data);
|
||||
int bits;
|
||||
|
||||
(void)cf;
|
||||
if(!data->state.drain) {
|
||||
data->state.drain = 1;
|
||||
bits = CURL_CSELECT_IN;
|
||||
if(stream && !stream->upload_done)
|
||||
bits |= CURL_CSELECT_OUT;
|
||||
if(data->state.dselect_bits != bits) {
|
||||
data->state.dselect_bits = bits;
|
||||
Curl_expire(data, 0, EXPIRE_RUN_NOW);
|
||||
}
|
||||
}
|
||||
@ -579,9 +586,7 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
|
||||
}
|
||||
else {
|
||||
result = h3_process_event(cf, sdata, stream3_id, ev);
|
||||
if(sdata != data) {
|
||||
notify_drain(cf, sdata);
|
||||
}
|
||||
drain_stream(cf, sdata);
|
||||
if(result) {
|
||||
DEBUGF(LOG_CF(data, cf, "[h3sid=%"PRId64"] error processing event %s "
|
||||
"for [h3sid=%"PRId64"] -> %d",
|
||||
@ -848,15 +853,20 @@ static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
}
|
||||
|
||||
if(nread > 0) {
|
||||
data->state.drain = (!Curl_bufq_is_empty(&stream->recvbuf) ||
|
||||
stream->closed);
|
||||
if(stream->closed)
|
||||
drain_stream(cf, data);
|
||||
}
|
||||
else {
|
||||
data->state.drain = FALSE;
|
||||
if(stream->closed) {
|
||||
nread = recv_closed_stream(cf, data, err);
|
||||
goto out;
|
||||
}
|
||||
else if(quiche_conn_is_draining(ctx->qconn)) {
|
||||
failf(data, "QUIC connection is draining");
|
||||
*err = CURLE_HTTP3;
|
||||
nread = -1;
|
||||
goto out;
|
||||
}
|
||||
*err = CURLE_AGAIN;
|
||||
nread = -1;
|
||||
}
|
||||
@ -1065,24 +1075,9 @@ static bool stream_is_writeable(struct Curl_cfilter *cf,
|
||||
{
|
||||
struct cf_quiche_ctx *ctx = cf->ctx;
|
||||
struct stream_ctx *stream = H3_STREAM_CTX(data);
|
||||
quiche_stream_iter *qiter;
|
||||
bool is_writable = FALSE;
|
||||
|
||||
if(!stream)
|
||||
return FALSE;
|
||||
/* surely, there must be a better way */
|
||||
qiter = quiche_conn_writable(ctx->qconn);
|
||||
if(qiter) {
|
||||
uint64_t stream_id;
|
||||
while(quiche_stream_iter_next(qiter, &stream_id)) {
|
||||
if(stream_id == (uint64_t)stream->id) {
|
||||
is_writable = TRUE;
|
||||
break;
|
||||
}
|
||||
}
|
||||
quiche_stream_iter_free(qiter);
|
||||
}
|
||||
return is_writable;
|
||||
return stream &&
|
||||
quiche_conn_stream_writable(ctx->qconn, (uint64_t)stream->id, 1);
|
||||
}
|
||||
|
||||
static int cf_quiche_get_select_socks(struct Curl_cfilter *cf,
|
||||
@ -1152,6 +1147,7 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
|
||||
}
|
||||
case CF_CTRL_DATA_IDLE:
|
||||
result = cf_flush_egress(cf, data);
|
||||
if(result)
|
||||
DEBUGF(LOG_CF(data, cf, "data idle, flush egress -> %d", result));
|
||||
break;
|
||||
default:
|
||||
|
||||
@ -398,7 +398,6 @@ static CURLcode recvmsg_packets(struct Curl_cfilter *cf,
|
||||
;
|
||||
if(nread == -1) {
|
||||
if(SOCKERRNO == EAGAIN || SOCKERRNO == EWOULDBLOCK) {
|
||||
DEBUGF(LOG_CF(data, cf, "ingress, recvmsg -> EAGAIN"));
|
||||
goto out;
|
||||
}
|
||||
if(!cf->connected && SOCKERRNO == ECONNREFUSED) {
|
||||
|
||||
@ -89,6 +89,6 @@ class TestErrors:
|
||||
assert len(r.stats) == count, f'did not get all stats: {r}'
|
||||
invalid_stats = []
|
||||
for idx, s in enumerate(r.stats):
|
||||
if 'exitcode' not in s or s['exitcode'] not in [18, 56, 92, 95]:
|
||||
if 'exitcode' not in s or s['exitcode'] not in [18, 55, 56, 92, 95]:
|
||||
invalid_stats.append(f'request {idx} exit with {s["exitcode"]}\n{s}')
|
||||
assert len(invalid_stats) == 0, f'failed: {invalid_stats}'
|
||||
|
||||
Loading…
Reference in New Issue
Block a user