diff --git a/lib/vquic/curl_ngtcp2.c b/lib/vquic/curl_ngtcp2.c index d6480c004d..0d9d87f34a 100644 --- a/lib/vquic/curl_ngtcp2.c +++ b/lib/vquic/curl_ngtcp2.c @@ -138,6 +138,7 @@ struct cf_ngtcp2_ctx { uint64_t used_bidi_streams; /* bidi streams we have opened */ uint64_t max_bidi_streams; /* max bidi streams we can open */ int qlogfd; + BIT(conn_closed); /* connection is closed */ }; /* How to access `call_data` from a cf_ngtcp2 filter */ @@ -175,6 +176,8 @@ struct h3_stream_ctx { #define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\ data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL)) +#define H3_STREAM_CTX_ID(ctx,id) ((struct h3_stream_ctx *)(\ + Curl_hash_offt_get(&(ctx)->streams, (id)))) static void h3_stream_ctx_free(struct h3_stream_ctx *stream) { @@ -222,28 +225,37 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf, return CURLE_OK; } +static void cf_ngtcp2_stream_close(struct Curl_cfilter *cf, + struct Curl_easy *data, + struct h3_stream_ctx *stream) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + DEBUGASSERT(data); + DEBUGASSERT(stream); + if(!stream->closed && ctx->qconn && ctx->h3conn) { + CURLcode result; + + nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL); + ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL); + stream->closed = TRUE; + (void)ngtcp2_conn_shutdown_stream(ctx->qconn, 0, stream->id, + NGHTTP3_H3_REQUEST_CANCELLED); + result = cf_progress_egress(cf, data, NULL); + if(result) + CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cancel stream -> %d", + stream->id, result); + } +} + static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_ngtcp2_ctx *ctx = cf->ctx; struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); - CURLcode result; - (void)cf; if(stream) { CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] easy handle is done", stream->id); - if(ctx->h3conn && !stream->closed) { - nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream->id); - nghttp3_conn_close_stream(ctx->h3conn, stream->id, - NGHTTP3_H3_REQUEST_CANCELLED); - nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL); - ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL); - stream->closed = TRUE; - result = cf_progress_egress(cf, data, NULL); - if(result) - CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result); - } - + cf_ngtcp2_stream_close(cf, data, stream); Curl_hash_offt_remove(&ctx->streams, data->id); } } @@ -414,6 +426,52 @@ static int cb_handshake_completed(ngtcp2_conn *tconn, void *user_data) return 0; } +static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf, + struct Curl_easy *data); + +static bool cf_ngtcp2_err_is_fatal(int code) +{ + return (NGTCP2_ERR_FATAL >= code) || + (NGTCP2_ERR_DROP_CONN == code) || + (NGTCP2_ERR_IDLE_CLOSE == code); +} + +static void cf_ngtcp2_err_set(struct Curl_cfilter *cf, + struct Curl_easy *data, int code) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + if(!ctx->last_error.error_code) { + if(NGTCP2_ERR_CRYPTO == code) { + ngtcp2_ccerr_set_tls_alert(&ctx->last_error, + ngtcp2_conn_get_tls_alert(ctx->qconn), + NULL, 0); + } + else { + ngtcp2_ccerr_set_liberr(&ctx->last_error, code, NULL, 0); + } + } + if(cf_ngtcp2_err_is_fatal(code)) + cf_ngtcp2_conn_close(cf, data); +} + +static bool cf_ngtcp2_h3_err_is_fatal(int code) +{ + return (NGHTTP3_ERR_FATAL >= code) || + (NGHTTP3_ERR_H3_CLOSED_CRITICAL_STREAM == code); +} + +static void cf_ngtcp2_h3_err_set(struct Curl_cfilter *cf, + struct Curl_easy *data, int code) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + if(!ctx->last_error.error_code) { + ngtcp2_ccerr_set_application_error(&ctx->last_error, + nghttp3_err_infer_quic_app_error_code(code), NULL, 0); + } + if(cf_ngtcp2_h3_err_is_fatal(code)) + cf_ngtcp2_conn_close(cf, data); +} + static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags, int64_t sid, uint64_t offset, const uint8_t *buf, size_t buflen, @@ -425,28 +483,23 @@ static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags, nghttp3_ssize nconsumed; int fin = (flags & NGTCP2_STREAM_DATA_FLAG_FIN) ? 1 : 0; struct Curl_easy *data = stream_user_data; - struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data); (void)offset; (void)data; nconsumed = nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin); - CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd", - stream_id, buflen, nconsumed); + if(!data) + data = CF_DATA_CURRENT(cf); + if(data) + CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd", + stream_id, buflen, nconsumed); if(nconsumed < 0) { - /* consume all bytes */ - ngtcp2_conn_extend_max_stream_offset(tconn, stream_id, buflen); - ngtcp2_conn_extend_max_offset(tconn, buflen); - if(!data || (stream && stream->reset) || - NGHTTP3_ERR_H3_STREAM_CREATION_ERROR == (int)nconsumed) { - struct Curl_easy *cdata = CF_DATA_CURRENT(cf); - CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] discard data for stream %s", - stream_id, (data && stream)? "reset" : "unknown"); - return 0; + struct h3_stream_ctx *stream = H3_STREAM_CTX_ID(ctx, stream_id); + if(data && stream) { + CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] error on known stream, " + "reset=%d, closed=%d", + stream_id, stream->reset, stream->closed); } - ngtcp2_ccerr_set_application_error( - &ctx->last_error, - nghttp3_err_infer_quic_app_error_code((int)nconsumed), NULL, 0); return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -486,26 +539,28 @@ static int cb_stream_close(ngtcp2_conn *tconn, uint32_t flags, void *user_data, void *stream_user_data) { struct Curl_cfilter *cf = user_data; + struct cf_ngtcp2_ctx *ctx = cf->ctx; struct Curl_easy *data = stream_user_data; curl_int64_t stream_id = (curl_int64_t)sid; - struct cf_ngtcp2_ctx *ctx = cf->ctx; int rv; (void)tconn; - (void)data; /* stream is closed... */ + if(!data) + data = CF_DATA_CURRENT(cf); + if(!data) + return NGTCP2_ERR_CALLBACK_FAILURE; if(!(flags & NGTCP2_STREAM_CLOSE_FLAG_APP_ERROR_CODE_SET)) { app_error_code = NGHTTP3_H3_NO_ERROR; } rv = nghttp3_conn_close_stream(ctx->h3conn, stream_id, app_error_code); - CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(err=%" + CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] quic close(app_error=%" CURL_PRIu64 ") -> %d", stream_id, (curl_uint64_t)app_error_code, rv); if(rv && rv != NGHTTP3_ERR_STREAM_NOT_FOUND) { - ngtcp2_ccerr_set_application_error( - &ctx->last_error, nghttp3_err_infer_quic_app_error_code(rv), NULL, 0); + cf_ngtcp2_h3_err_set(cf, data, rv); return NGTCP2_ERR_CALLBACK_FAILURE; } @@ -725,7 +780,7 @@ static CURLcode check_and_set_expiry(struct Curl_cfilter *cf, if(rv) { failf(data, "ngtcp2_conn_handle_expiry returned error: %s", ngtcp2_strerror(rv)); - ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); + cf_ngtcp2_err_set(cf, data, rv); return CURLE_SEND_ERROR; } result = cf_progress_ingress(cf, data, pktx); @@ -832,16 +887,13 @@ static void h3_xfer_write_resp(struct Curl_cfilter *cf, { /* If we already encountered an error, skip further writes */ - if(!stream->xfer_result) + if(!stream->xfer_result) { stream->xfer_result = Curl_xfer_write_resp(data, buf, blen, eos); - /* If the transfer write is errored, we do not want any more data */ - if(stream->xfer_result) { - struct cf_ngtcp2_ctx *ctx = cf->ctx; - CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes " - "of data, cancelling stream", - stream->id, stream->xfer_result, blen); - nghttp3_conn_close_stream(ctx->h3conn, stream->id, - NGHTTP3_H3_REQUEST_CANCELLED); + /* If the transfer write is errored, we do not want any more data */ + if(stream->xfer_result) { + CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes " + "of data", stream->id, stream->xfer_result, blen); + } } } @@ -1150,7 +1202,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, pktx_init(&pktx, cf, data); - if(!stream) { + if(!stream || ctx->conn_closed) { *err = CURLE_RECV_ERROR; goto out; } @@ -1163,6 +1215,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data, if(stream->xfer_result) { CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id); + cf_ngtcp2_stream_close(cf, data, stream); *err = stream->xfer_result; nread = -1; goto out; @@ -1451,6 +1504,12 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, } if(!stream || stream->id < 0) { + if(ctx->conn_closed) { + CURL_TRC_CF(data, cf, "cannot open stream on closed connection"); + *err = CURLE_SEND_ERROR; + sent = -1; + goto out; + } sent = h3_stream_open(cf, data, buf, len, err); if(sent < 0) { CURL_TRC_CF(data, cf, "failed to open stream -> %d", *err); @@ -1458,6 +1517,13 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, } stream = H3_STREAM_CTX(ctx, data); } + else if(stream->xfer_result) { + CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id); + cf_ngtcp2_stream_close(cf, data, stream); + *err = stream->xfer_result; + sent = -1; + goto out; + } else if(stream->upload_blocked_len) { /* the data in `buf` has already been submitted or added to the * buffers, but have been EAGAINed on the last invocation. */ @@ -1492,6 +1558,12 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data, sent = -1; goto out; } + else if(ctx->conn_closed) { + CURL_TRC_CF(data, cf, "cannot send on closed connection"); + *err = CURLE_SEND_ERROR; + sent = -1; + goto out; + } else { sent = Curl_bufq_write(&stream->sendbuf, buf, len, err); CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cf_send, add to " @@ -1568,16 +1640,7 @@ static CURLcode recv_pkt(const unsigned char *pkt, size_t pktlen, if(rv) { CURL_TRC_CF(pktx->data, pktx->cf, "ingress, read_pkt -> %s (%d)", ngtcp2_strerror(rv), rv); - if(!ctx->last_error.error_code) { - if(rv == NGTCP2_ERR_CRYPTO) { - ngtcp2_ccerr_set_tls_alert(&ctx->last_error, - ngtcp2_conn_get_tls_alert(ctx->qconn), - NULL, 0); - } - else { - ngtcp2_ccerr_set_liberr(&ctx->last_error, rv, NULL, 0); - } - } + cf_ngtcp2_err_set(pktx->cf, pktx->data, rv); if(rv == NGTCP2_ERR_CRYPTO) /* this is a "TLS problem", but a failed certificate verification @@ -1660,9 +1723,7 @@ static ssize_t read_pkt_to_send(void *userp, if(veccnt < 0) { failf(x->data, "nghttp3_conn_writev_stream returned error: %s", nghttp3_strerror((int)veccnt)); - ngtcp2_ccerr_set_application_error( - &ctx->last_error, - nghttp3_err_infer_quic_app_error_code((int)veccnt), NULL, 0); + cf_ngtcp2_h3_err_set(x->cf, x->data, (int)veccnt); *err = CURLE_SEND_ERROR; return -1; } @@ -1709,7 +1770,7 @@ static ssize_t read_pkt_to_send(void *userp, DEBUGASSERT(ndatalen == -1); failf(x->data, "ngtcp2_conn_writev_stream returned error: %s", ngtcp2_strerror((int)n)); - ngtcp2_ccerr_set_liberr(&ctx->last_error, (int)n, NULL, 0); + cf_ngtcp2_err_set(x->cf, x->data, (int)n); *err = CURLE_SEND_ERROR; nwritten = -1; goto out; @@ -1946,6 +2007,31 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx) ctx->call_data = save; } +static void cf_ngtcp2_conn_close(struct Curl_cfilter *cf, + struct Curl_easy *data) +{ + struct cf_ngtcp2_ctx *ctx = cf->ctx; + if(ctx && ctx->qconn && !ctx->conn_closed) { + char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE]; + struct pkt_io_ctx pktx; + ngtcp2_ssize rc; + + ctx->conn_closed = TRUE; + pktx_init(&pktx, cf, data); + rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */ + NULL, /* pkt_info */ + (uint8_t *)buffer, sizeof(buffer), + &ctx->last_error, pktx.ts); + CURL_TRC_CF(data, cf, "closing connection(err_type=%d, err_code=%" + CURL_PRIu64 ") -> %d", ctx->last_error.type, + (curl_uint64_t)ctx->last_error.error_code, (int)rc); + if(rc > 0) { + while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) && + SOCKERRNO == EINTR); + } + } +} + static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data) { struct cf_ngtcp2_ctx *ctx = cf->ctx; @@ -1953,24 +2039,10 @@ static void cf_ngtcp2_close(struct Curl_cfilter *cf, struct Curl_easy *data) CF_DATA_SAVE(save, cf, data); if(ctx && ctx->qconn) { - char buffer[NGTCP2_MAX_UDP_PAYLOAD_SIZE]; - struct pkt_io_ctx pktx; - ngtcp2_ssize rc; - - CURL_TRC_CF(data, cf, "close"); - pktx_init(&pktx, cf, data); - rc = ngtcp2_conn_write_connection_close(ctx->qconn, NULL, /* path */ - NULL, /* pkt_info */ - (uint8_t *)buffer, sizeof(buffer), - &ctx->last_error, pktx.ts); - if(rc > 0) { - while((send(ctx->q.sockfd, buffer, (SEND_TYPE_ARG3)rc, 0) == -1) && - SOCKERRNO == EINTR); - } - + cf_ngtcp2_conn_close(cf, data); cf_ngtcp2_ctx_clear(ctx); + CURL_TRC_CF(data, cf, "close"); } - cf->connected = FALSE; CF_DATA_RESTORE(cf, save); } @@ -2259,7 +2331,10 @@ static CURLcode cf_ngtcp2_query(struct Curl_cfilter *cf, * by callback. QUIC counts the number over the lifetime of the * connection, ever increasing. * We count the *open* transfers plus the budget for new ones. */ - if(ctx->max_bidi_streams) { + if(!ctx->qconn || ctx->conn_closed) { + *pres1 = 0; + } + else if(ctx->max_bidi_streams) { uint64_t avail_bidi_streams = 0; uint64_t max_streams = CONN_INUSE(cf->conn); if(ctx->max_bidi_streams > ctx->used_bidi_streams) @@ -2312,9 +2387,9 @@ static bool cf_ngtcp2_conn_is_alive(struct Curl_cfilter *cf, const ngtcp2_transport_params *rp; struct cf_call_data save; - CF_DATA_SAVE(save, cf, data); + CF_DATA_SAVE(save, cf, data); *input_pending = FALSE; - if(!ctx->qconn) + if(!ctx->qconn || ctx->conn_closed) goto out; /* Both sides of the QUIC connection announce they max idle times in diff --git a/tests/http/test_16_info.py b/tests/http/test_16_info.py index b5459abf75..80377cde33 100644 --- a/tests/http/test_16_info.py +++ b/tests/http/test_16_info.py @@ -62,9 +62,9 @@ class TestInfo: curl = CurlClient(env=env) url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-{count-1}]' r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True) - r.check_stats(count=count, http_status=200) - for s in r.stats: - self.check_stat(s, dl_size=30, ul_size=0) + r.check_stats(count=count, http_status=200, exitcode=0) + for idx, s in enumerate(r.stats): + self.check_stat(idx, s, r, dl_size=30, ul_size=0) # download plain file with a 302 redirect @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) @@ -77,9 +77,9 @@ class TestInfo: r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True, extra_args=[ '--location' ]) - r.check_stats(count=count, http_status=200) - for s in r.stats: - self.check_stat(s, dl_size=30, ul_size=0) + r.check_stats(count=count, http_status=200, exitcode=0) + for idx, s in enumerate(r.stats): + self.check_stat(idx, s, r, dl_size=30, ul_size=0) @pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3']) def test_16_03_info_upload(self, env: Env, httpd, nghttpx, proto, repeat): @@ -91,11 +91,13 @@ class TestInfo: curl = CurlClient(env=env) url = f'https://{env.authority_for(env.domain1, proto)}/curltest/echo?id=[0-{count-1}]' r = curl.http_upload(urls=[url], data=f'@{fdata}', alpn_proto=proto, - with_headers=True) + with_headers=True, extra_args=[ + '--trace-config', 'http/2,http/3' + ]) r.check_response(count=count, http_status=200) - r.check_stats(count=count, http_status=200) - for s in r.stats: - self.check_stat(s, dl_size=fsize, ul_size=fsize) + r.check_stats(count=count, http_status=200, exitcode=0) + for idx, s in enumerate(r.stats): + self.check_stat(idx, s, r, dl_size=fsize, ul_size=fsize) # download plain file via http: ('time_appconnect' is 0) @pytest.mark.parametrize("proto", ['http/1.1']) @@ -104,21 +106,22 @@ class TestInfo: curl = CurlClient(env=env) url = f'http://{env.domain1}:{env.http_port}/data.json?[0-{count-1}]' r = curl.http_download(urls=[url], alpn_proto=proto, with_stats=True) - r.check_stats(count=count, http_status=200) - for s in r.stats: - self.check_stat(s, dl_size=30, ul_size=0) + r.check_stats(count=count, http_status=200, exitcode=0) + for idx, s in enumerate(r.stats): + self.check_stat(idx, s, r, dl_size=30, ul_size=0) - def check_stat(self, s, dl_size=None, ul_size=None): + def check_stat(self, idx, s, r, dl_size=None, ul_size=None): self.check_stat_times(s) # we always send something self.check_stat_positive(s, 'size_request') # we always receive response headers self.check_stat_positive(s, 'size_header') if ul_size is not None: - assert s['size_upload'] == ul_size # the file we sent - assert s['size_request'] >= s['size_upload'], f'"size_request" smaller than "size_upload", {s}' + assert s['size_upload'] == ul_size, f'stat #{idx}\n{r.dump_logs()}' # the file we sent + assert s['size_request'] >= s['size_upload'], \ + f'stat #{idx}, "size_request" smaller than "size_upload", {s}\n{r.dump_logs()}' if dl_size is not None: - assert s['size_download'] == dl_size # the file we received + assert s['size_download'] == dl_size, f'stat #{idx}\n{r.dump_logs()}' # the file we received def check_stat_positive(self, s, key): assert key in s, f'stat "{key}" missing: {s}'