http/2, http/3: decouple stream state from easy handle

- add `Curl_hash_offt` as hashmap between a `curl_off_t` and
  an object. Use this in h2+h3 connection filters to associate
  `data->id` with the internal stream state.
- changed implementations of all affected connection filters
- removed `h2_ctx*` and `h3_ctx*` from `struct HTTP` and thus
  the easy handle
- solves the problem of attaching "foreign protocol" easy handles
  during connection shutdown

Test 1616 verifies the new hash functions.

Closes #13204
This commit is contained in:
Stefan Eissing 2024-03-27 15:01:06 +01:00 committed by Daniel Stenberg
parent c03556fb18
commit c6655f7029
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
14 changed files with 561 additions and 232 deletions

View File

@ -670,6 +670,19 @@ size_t Curl_conn_get_max_concurrent(struct Curl_easy *data,
return (result || n <= 0)? 1 : (size_t)n;
}
int Curl_conn_get_stream_error(struct Curl_easy *data,
struct connectdata *conn,
int sockindex)
{
CURLcode result;
int n = 0;
struct Curl_cfilter *cf = conn->cfilter[sockindex];
result = cf? cf->cft->query(cf, data, CF_QUERY_STREAM_ERROR,
&n, NULL) : CURLE_UNKNOWN_OPTION;
return (result || n < 0)? 0 : n;
}
int Curl_conn_sockindex(struct Curl_easy *data, curl_socket_t sockfd)
{
if(data && data->conn &&

View File

@ -160,6 +160,7 @@ typedef CURLcode Curl_cft_cntrl(struct Curl_cfilter *cf,
#define CF_QUERY_SOCKET 3 /* - curl_socket_t */
#define CF_QUERY_TIMER_CONNECT 4 /* - struct curltime */
#define CF_QUERY_TIMER_APPCONNECT 5 /* - struct curltime */
#define CF_QUERY_STREAM_ERROR 6 /* error code - */
/**
* Query the cfilter for properties. Filters ignorant of a query will
@ -498,6 +499,12 @@ size_t Curl_conn_get_max_concurrent(struct Curl_easy *data,
struct connectdata *conn,
int sockindex);
/**
* Get the underlying error code for a transfer stream or 0 if not known.
*/
int Curl_conn_get_stream_error(struct Curl_easy *data,
struct connectdata *conn,
int sockindex);
/**
* Get the index of the given socket in the connection's sockets.

View File

@ -368,3 +368,25 @@ void Curl_hash_print(struct Curl_hash *h,
fprintf(stderr, "\n");
}
#endif
void Curl_hash_offt_init(struct Curl_hash *h,
unsigned int slots,
Curl_hash_dtor dtor)
{
Curl_hash_init(h, slots, Curl_hash_str, Curl_str_key_compare, dtor);
}
void *Curl_hash_offt_set(struct Curl_hash *h, curl_off_t id, void *elem)
{
return Curl_hash_add(h, &id, sizeof(id), elem);
}
int Curl_hash_offt_remove(struct Curl_hash *h, curl_off_t id)
{
return Curl_hash_delete(h, &id, sizeof(id));
}
void *Curl_hash_offt_get(struct Curl_hash *h, curl_off_t id)
{
return Curl_hash_pick(h, &id, sizeof(id));
}

View File

@ -98,5 +98,14 @@ Curl_hash_next_element(struct Curl_hash_iterator *iter);
void Curl_hash_print(struct Curl_hash *h,
void (*func)(void *));
/* Hash for `curl_off_t` as key */
void Curl_hash_offt_init(struct Curl_hash *h,
unsigned int slots,
Curl_hash_dtor dtor);
void *Curl_hash_offt_set(struct Curl_hash *h, curl_off_t id, void *elem);
int Curl_hash_offt_remove(struct Curl_hash *h, curl_off_t id);
void *Curl_hash_offt_get(struct Curl_hash *h, curl_off_t id);
#endif /* HEADER_CURL_HASH_H */

View File

@ -188,12 +188,8 @@ void Curl_http_exp100_got100(struct Curl_easy *data);
* HTTP unique setup
***************************************************************************/
struct HTTP {
#ifndef CURL_DISABLE_HTTP
void *h2_ctx; /* HTTP/2 implementation context */
void *h3_ctx; /* HTTP/3 implementation context */
#else
/* TODO: no longer used, we should remove it from SingleRequest */
char unused;
#endif
};
CURLcode Curl_http_size(struct Curl_easy *data);

View File

@ -29,6 +29,7 @@
#include <nghttp2/nghttp2.h>
#include "urldata.h"
#include "bufq.h"
#include "hash.h"
#include "http1.h"
#include "http2.h"
#include "http.h"
@ -129,6 +130,7 @@ struct cf_h2_ctx {
struct bufc_pool stream_bufcp; /* spares for stream buffers */
struct dynbuf scratch; /* scratch buffer for temp use */
struct Curl_hash streams; /* hash of `data->id` to `h2_stream_ctx` */
size_t drain_total; /* sum of all stream's UrlState drain */
uint32_t max_concurrent_streams;
int32_t goaway_error;
@ -155,6 +157,8 @@ static void cf_h2_ctx_clear(struct cf_h2_ctx *ctx)
Curl_bufq_free(&ctx->outbufq);
Curl_bufcp_free(&ctx->stream_bufcp);
Curl_dyn_free(&ctx->scratch);
Curl_hash_clean(&ctx->streams);
Curl_hash_destroy(&ctx->streams);
memset(ctx, 0, sizeof(*ctx));
ctx->call_data = save;
}
@ -200,13 +204,58 @@ struct h2_stream_ctx {
buffered data in stream->sendbuf to upload. */
};
#define H2_STREAM_CTX(d) ((struct h2_stream_ctx *)(((d) && \
(d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->h2_ctx \
: NULL))
#define H2_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h2_ctx
#define H2_STREAM_ID(d) (H2_STREAM_CTX(d)? \
H2_STREAM_CTX(d)->id : -2)
#define H2_STREAM_CTX(ctx,data) ((struct h2_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
static struct h2_stream_ctx *h2_stream_ctx_create(struct cf_h2_ctx *ctx)
{
struct h2_stream_ctx *stream;
(void)ctx;
stream = calloc(1, sizeof(*stream));
if(!stream)
return NULL;
stream->id = -1;
Curl_bufq_initp(&stream->sendbuf, &ctx->stream_bufcp,
H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
Curl_dynhds_init(&stream->resp_trailers, 0, DYN_HTTP_REQUEST);
stream->resp_hds_len = 0;
stream->bodystarted = FALSE;
stream->status_code = -1;
stream->closed = FALSE;
stream->close_handled = FALSE;
stream->error = NGHTTP2_NO_ERROR;
stream->local_window_size = H2_STREAM_WINDOW_SIZE;
stream->upload_left = 0;
stream->nrcvd_data = 0;
return stream;
}
static void free_push_headers(struct h2_stream_ctx *stream)
{
size_t i;
for(i = 0; i<stream->push_headers_used; i++)
free(stream->push_headers[i]);
Curl_safefree(stream->push_headers);
stream->push_headers_used = 0;
}
static void h2_stream_ctx_free(struct h2_stream_ctx *stream)
{
Curl_bufq_free(&stream->sendbuf);
Curl_h1_req_parse_free(&stream->h1);
Curl_dynhds_free(&stream->resp_trailers);
free_push_headers(stream);
free(stream);
}
static void h2_stream_hash_free(void *stream)
{
DEBUGASSERT(stream);
h2_stream_ctx_free((struct h2_stream_ctx *)stream);
}
/*
* Mark this transfer to get "drained".
@ -243,49 +292,29 @@ static CURLcode http2_data_setup(struct Curl_cfilter *cf,
failf(data, "initialization failure, transfer not http initialized");
return CURLE_FAILED_INIT;
}
stream = H2_STREAM_CTX(data);
stream = H2_STREAM_CTX(ctx, data);
if(stream) {
*pstream = stream;
return CURLE_OK;
}
stream = calloc(1, sizeof(*stream));
stream = h2_stream_ctx_create(ctx);
if(!stream)
return CURLE_OUT_OF_MEMORY;
stream->id = -1;
Curl_bufq_initp(&stream->sendbuf, &ctx->stream_bufcp,
H2_STREAM_SEND_CHUNKS, BUFQ_OPT_NONE);
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
Curl_dynhds_init(&stream->resp_trailers, 0, DYN_HTTP_REQUEST);
stream->resp_hds_len = 0;
stream->bodystarted = FALSE;
stream->status_code = -1;
stream->closed = FALSE;
stream->close_handled = FALSE;
stream->error = NGHTTP2_NO_ERROR;
stream->local_window_size = H2_STREAM_WINDOW_SIZE;
stream->upload_left = 0;
stream->nrcvd_data = 0;
H2_STREAM_LCTX(data) = stream;
*pstream = stream;
return CURLE_OK;
if(!Curl_hash_offt_set(&ctx->streams, data->id, stream)) {
h2_stream_ctx_free(stream);
return CURLE_OUT_OF_MEMORY;
}
static void free_push_headers(struct h2_stream_ctx *stream)
{
size_t i;
for(i = 0; i<stream->push_headers_used; i++)
free(stream->push_headers[i]);
Curl_safefree(stream->push_headers);
stream->push_headers_used = 0;
*pstream = stream;
return CURLE_OK;
}
static void http2_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
DEBUGASSERT(ctx);
if(!stream)
@ -312,12 +341,7 @@ static void http2_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
nghttp2_session_send(ctx->h2);
}
Curl_bufq_free(&stream->sendbuf);
Curl_h1_req_parse_free(&stream->h1);
Curl_dynhds_free(&stream->resp_trailers);
free_push_headers(stream);
free(stream);
H2_STREAM_LCTX(data) = NULL;
Curl_hash_offt_remove(&ctx->streams, data->id);
}
static int h2_client_new(struct Curl_cfilter *cf,
@ -411,6 +435,7 @@ static CURLcode cf_h2_ctx_init(struct Curl_cfilter *cf,
Curl_bufq_initp(&ctx->inbufq, &ctx->stream_bufcp, H2_NW_RECV_CHUNKS, 0);
Curl_bufq_initp(&ctx->outbufq, &ctx->stream_bufcp, H2_NW_SEND_CHUNKS, 0);
Curl_dyn_init(&ctx->scratch, CURL_MAX_HTTP_HEADER);
Curl_hash_offt_init(&ctx->streams, 63, h2_stream_hash_free);
ctx->last_stream_id = 2147483647;
rc = nghttp2_session_callbacks_new(&cbs);
@ -709,6 +734,7 @@ static ssize_t send_callback(nghttp2_session *h2,
the struct are hidden from the user. */
struct curl_pushheaders {
struct Curl_easy *data;
struct h2_stream_ctx *stream;
const nghttp2_push_promise *frame;
};
@ -722,9 +748,8 @@ char *curl_pushheader_bynum(struct curl_pushheaders *h, size_t num)
if(!h || !GOOD_EASY_HANDLE(h->data))
return NULL;
else {
struct h2_stream_ctx *stream = H2_STREAM_CTX(h->data);
if(stream && num < stream->push_headers_used)
return stream->push_headers[num];
if(h->stream && num < h->stream->push_headers_used)
return h->stream->push_headers[num];
}
return NULL;
}
@ -747,7 +772,7 @@ char *curl_pushheader_byname(struct curl_pushheaders *h, const char *header)
!strcmp(header, ":") || strchr(header + 1, ':'))
return NULL;
stream = H2_STREAM_CTX(h->data);
stream = h->stream;
if(!stream)
return NULL;
@ -870,12 +895,10 @@ static int push_promise(struct Curl_cfilter *cf,
goto fail;
}
heads.data = data;
heads.frame = frame;
/* ask the application */
CURL_TRC_CF(data, cf, "Got PUSH_PROMISE, ask application");
stream = H2_STREAM_CTX(data);
stream = H2_STREAM_CTX(ctx, data);
if(!stream) {
failf(data, "Internal NULL stream");
discard_newhandle(cf, newhandle);
@ -883,6 +906,10 @@ static int push_promise(struct Curl_cfilter *cf,
goto fail;
}
heads.data = data;
heads.stream = stream;
heads.frame = frame;
rv = set_transfer_url(newhandle, &heads);
if(rv) {
discard_newhandle(cf, newhandle);
@ -953,7 +980,7 @@ static CURLcode on_stream_frame(struct Curl_cfilter *cf,
const nghttp2_frame *frame)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
int32_t stream_id = frame->hd.stream_id;
CURLcode result;
int rv;
@ -1184,7 +1211,7 @@ static int on_frame_recv(nghttp2_session *session, const nghttp2_frame *frame,
* servers send an explicit WINDOW_UPDATE, but not all seem to do that.
* To be safe, we UNHOLD a stream in order not to stall. */
if(CURL_WANT_SEND(data)) {
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
if(stream)
drain_stream(cf, data, stream);
}
@ -1243,7 +1270,7 @@ static int on_data_chunk_recv(nghttp2_session *session, uint8_t flags,
return 0;
}
stream = H2_STREAM_CTX(data_s);
stream = H2_STREAM_CTX(ctx, data_s);
if(!stream)
return NGHTTP2_ERR_CALLBACK_FAILURE;
@ -1266,6 +1293,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
uint32_t error_code, void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_ctx *ctx = cf->ctx;
struct Curl_easy *data_s, *call_data = CF_DATA_CURRENT(cf);
struct h2_stream_ctx *stream;
int rv;
@ -1290,7 +1318,7 @@ static int on_stream_close(nghttp2_session *session, int32_t stream_id,
(void)nghttp2_session_set_stream_user_data(session, stream_id, 0);
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
stream = H2_STREAM_CTX(data_s);
stream = H2_STREAM_CTX(ctx, data_s);
if(!stream) {
CURL_TRC_CF(data_s, cf,
"[%d] on_stream_close, GOOD easy but no stream", stream_id);
@ -1325,6 +1353,7 @@ static int on_begin_headers(nghttp2_session *session,
const nghttp2_frame *frame, void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream;
struct Curl_easy *data_s = NULL;
@ -1338,7 +1367,7 @@ static int on_begin_headers(nghttp2_session *session,
return 0;
}
stream = H2_STREAM_CTX(data_s);
stream = H2_STREAM_CTX(ctx, data_s);
if(!stream || !stream->bodystarted) {
return 0;
}
@ -1370,7 +1399,7 @@ static int on_header(nghttp2_session *session, const nghttp2_frame *frame,
internal error more than anything else! */
return NGHTTP2_ERR_CALLBACK_FAILURE;
stream = H2_STREAM_CTX(data_s);
stream = H2_STREAM_CTX(ctx, data_s);
if(!stream) {
failf(data_s, "Internal NULL stream");
return NGHTTP2_ERR_CALLBACK_FAILURE;
@ -1518,6 +1547,7 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
void *userp)
{
struct Curl_cfilter *cf = userp;
struct cf_h2_ctx *ctx = cf->ctx;
struct Curl_easy *data_s;
struct h2_stream_ctx *stream = NULL;
CURLcode result;
@ -1534,7 +1564,7 @@ static ssize_t req_body_read_callback(nghttp2_session *session,
internal error more than anything else! */
return NGHTTP2_ERR_CALLBACK_FAILURE;
stream = H2_STREAM_CTX(data_s);
stream = H2_STREAM_CTX(ctx, data_s);
if(!stream)
return NGHTTP2_ERR_CALLBACK_FAILURE;
}
@ -1621,7 +1651,7 @@ static CURLcode http2_data_done_send(struct Curl_cfilter *cf,
{
struct cf_h2_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
if(!ctx || !ctx->h2 || !stream)
goto out;
@ -1737,11 +1767,12 @@ static int sweight_in_effect(const struct Curl_easy *data)
* struct.
*/
static void h2_pri_spec(struct Curl_easy *data,
static void h2_pri_spec(struct cf_h2_ctx *ctx,
struct Curl_easy *data,
nghttp2_priority_spec *pri_spec)
{
struct Curl_data_priority *prio = &data->set.priority;
struct h2_stream_ctx *depstream = H2_STREAM_CTX(prio->parent);
struct h2_stream_ctx *depstream = H2_STREAM_CTX(ctx, prio->parent);
int32_t depstream_id = depstream? depstream->id:0;
nghttp2_priority_spec_init(pri_spec, depstream_id,
sweight_wanted(data),
@ -1759,7 +1790,7 @@ static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
int rv = 0;
if(stream && stream->id > 0 &&
@ -1769,7 +1800,7 @@ static CURLcode h2_progress_egress(struct Curl_cfilter *cf,
/* send new weight and/or dependency */
nghttp2_priority_spec pri_spec;
h2_pri_spec(data, &pri_spec);
h2_pri_spec(ctx, data, &pri_spec);
CURL_TRC_CF(data, cf, "[%d] Queuing PRIORITY", stream->id);
DEBUGASSERT(stream->id != -1);
rv = nghttp2_submit_priority(ctx->h2, NGHTTP2_FLAG_NONE,
@ -1839,7 +1870,7 @@ static CURLcode h2_progress_ingress(struct Curl_cfilter *cf,
* it is time to stop due to connection close or us not processing
* all network input */
while(!ctx->conn_closed && Curl_bufq_is_empty(&ctx->inbufq)) {
stream = H2_STREAM_CTX(data);
stream = H2_STREAM_CTX(ctx, data);
if(stream && (stream->closed || !data_max_bytes)) {
/* We would like to abort here and stop processing, so that
* the transfer loop can handle the data/close here. However,
@ -1885,7 +1916,7 @@ static ssize_t cf_h2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
ssize_t nread = -1;
CURLcode result;
struct cf_call_data save;
@ -2017,7 +2048,7 @@ static ssize_t h2_submit(struct h2_stream_ctx **pstream,
goto out;
}
h2_pri_spec(data, &pri_spec);
h2_pri_spec(ctx, data, &pri_spec);
if(!nghttp2_session_check_request_allowed(ctx->h2))
CURL_TRC_CF(data, cf, "send request NOT allowed (via nghttp2)");
@ -2114,7 +2145,7 @@ static ssize_t cf_h2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
struct cf_call_data save;
int rv;
ssize_t nwritten;
@ -2295,7 +2326,7 @@ static void cf_h2_adjust_pollset(struct Curl_cfilter *cf,
sock = Curl_conn_cf_get_socket(cf, data);
Curl_pollset_check(data, ps, sock, &want_recv, &want_send);
if(want_recv || want_send) {
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
struct cf_call_data save;
bool c_exhaust, s_exhaust;
@ -2396,7 +2427,7 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
{
#ifdef NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
DEBUGASSERT(data);
if(ctx && ctx->h2 && stream) {
@ -2481,7 +2512,7 @@ static bool cf_h2_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct cf_h2_ctx *ctx = cf->ctx;
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
if(ctx && (!Curl_bufq_is_empty(&ctx->inbufq)
|| (stream && !Curl_bufq_is_empty(&stream->sendbuf))))
@ -2540,6 +2571,11 @@ static CURLcode cf_h2_query(struct Curl_cfilter *cf,
*pres1 = (effective_max > INT_MAX)? INT_MAX : (int)effective_max;
CF_DATA_RESTORE(cf, save);
return CURLE_OK;
case CF_QUERY_STREAM_ERROR: {
struct h2_stream_ctx *stream = H2_STREAM_CTX(ctx, data);
*pres1 = stream? (int)stream->error : 0;
return CURLE_OK;
}
default:
break;
}
@ -2769,8 +2805,11 @@ CURLcode Curl_http2_upgrade(struct Curl_easy *data,
CURLE_HTTP2_STREAM error! */
bool Curl_h2_http_1_1_error(struct Curl_easy *data)
{
struct h2_stream_ctx *stream = H2_STREAM_CTX(data);
return (stream && stream->error == NGHTTP2_HTTP_1_1_REQUIRED);
if(Curl_conn_is_http2(data, data->conn, FIRSTSOCKET)) {
int err = Curl_conn_get_stream_error(data, data->conn, FIRSTSOCKET);
return (err == NGHTTP2_HTTP_1_1_REQUIRED);
}
return FALSE;
}
#else /* !USE_NGHTTP2 */

View File

@ -27,6 +27,7 @@
#ifdef USE_MSH3
#include "urldata.h"
#include "hash.h"
#include "timeval.h"
#include "multiif.h"
#include "sendf.h"
@ -118,6 +119,7 @@ struct cf_msh3_ctx {
struct cf_call_data call_data;
struct curltime connect_started; /* time the current attempt started */
struct curltime handshake_at; /* time connect handshake finished */
struct Curl_hash streams; /* hash `data->id` to `stream_ctx` */
/* Flags written by msh3/msquic thread */
bool handshake_complete;
bool handshake_succeeded;
@ -127,6 +129,8 @@ struct cf_msh3_ctx {
BIT(active);
};
static struct cf_msh3_ctx *h3_get_msh3_ctx(struct Curl_easy *data);
/* How to access `call_data` from a cf_msh3 filter */
#undef CF_CTX_CALL_DATA
#define CF_CTX_CALL_DATA(cf) \
@ -153,18 +157,26 @@ struct stream_ctx {
bool recv_header_complete;
};
#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->h3_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->id : -2)
#define H3_STREAM_CTX(ctx,data) ((struct stream_ctx *)((data && ctx)? \
Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
static void h3_stream_ctx_free(struct stream_ctx *stream)
{
Curl_bufq_free(&stream->recvbuf);
free(stream);
}
static void h3_stream_hash_free(void *stream)
{
DEBUGASSERT(stream);
h3_stream_ctx_free((struct stream_ctx *)stream);
}
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream)
return CURLE_OK;
@ -173,25 +185,29 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
if(!stream)
return CURLE_OUT_OF_MEMORY;
H3_STREAM_LCTX(data) = stream;
stream->req = ZERO_NULL;
msh3_lock_initialize(&stream->recv_lock);
Curl_bufq_init2(&stream->recvbuf, H3_STREAM_CHUNK_SIZE,
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
CURL_TRC_CF(data, cf, "data setup");
if(!Curl_hash_offt_set(&ctx->streams, data->id, stream)) {
h3_stream_ctx_free(stream);
return CURLE_OUT_OF_MEMORY;
}
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
if(stream) {
CURL_TRC_CF(data, cf, "easy handle is done");
Curl_bufq_free(&stream->recvbuf);
free(stream);
H3_STREAM_LCTX(data) = NULL;
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
@ -213,7 +229,8 @@ static void drain_stream_from_other_thread(struct Curl_easy *data,
static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
@ -311,7 +328,8 @@ static int decode_status_code(const char *value, size_t len)
static CURLcode write_resp_raw(struct Curl_easy *data,
const void *mem, size_t memlen)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
@ -337,10 +355,12 @@ static void MSH3_CALL msh3_header_received(MSH3_REQUEST *Request,
const MSH3_HEADER *hd)
{
struct Curl_easy *data = userp;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
(void)Request;
DEBUGF(infof(data, "[MSH3] header received, stream=%d", !!stream));
if(!stream || stream->recv_header_complete) {
return;
}
@ -386,7 +406,8 @@ static bool MSH3_CALL msh3_data_received(MSH3_REQUEST *Request,
const uint8_t *buf)
{
struct Curl_easy *data = IfContext;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
bool rv = FALSE;
@ -425,7 +446,8 @@ static void MSH3_CALL msh3_complete(MSH3_REQUEST *Request, void *IfContext,
bool aborted, uint64_t error)
{
struct Curl_easy *data = IfContext;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)Request;
if(!stream)
@ -444,7 +466,8 @@ static void MSH3_CALL msh3_shutdown_complete(MSH3_REQUEST *Request,
void *IfContext)
{
struct Curl_easy *data = IfContext;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(!stream)
return;
@ -456,7 +479,8 @@ static void MSH3_CALL msh3_data_sent(MSH3_REQUEST *Request,
void *IfContext, void *SendContext)
{
struct Curl_easy *data = IfContext;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = h3_get_msh3_ctx(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(!stream)
return;
(void)Request;
@ -468,7 +492,8 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
if(!stream) {
@ -501,7 +526,8 @@ out:
static void set_quic_expire(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
/* we have no indication from msh3 when it would be a good time
* to juggle the connection again. So, we compromise by calling
@ -518,17 +544,17 @@ static void set_quic_expire(struct Curl_cfilter *cf, struct Curl_easy *data)
static ssize_t cf_msh3_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
struct cf_call_data save;
(void)cf;
CURL_TRC_CF(data, cf, "cf_recv(len=%zu), stream=%d", len, !!stream);
if(!stream) {
*err = CURLE_RECV_ERROR;
return -1;
}
CF_DATA_SAVE(save, cf, data);
CURL_TRC_CF(data, cf, "req: recv with %zu byte buffer", len);
msh3_lock_acquire(&stream->recv_lock);
@ -570,7 +596,7 @@ static ssize_t cf_msh3_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct h1_req_parser h1;
struct dynhds h2_headers;
MSH3_HEADER *nva = NULL;
@ -682,7 +708,7 @@ static void cf_msh3_adjust_pollset(struct Curl_cfilter *cf,
struct easy_pollset *ps)
{
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct cf_call_data save;
CF_DATA_SAVE(save, cf, data);
@ -701,7 +727,8 @@ static void cf_msh3_adjust_pollset(struct Curl_cfilter *cf,
static bool cf_msh3_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct cf_call_data save;
bool pending = FALSE;
@ -737,7 +764,8 @@ static CURLcode cf_msh3_data_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
int event, int arg1, void *arg2)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_msh3_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct cf_call_data save;
CURLcode result = CURLE_OK;
@ -785,6 +813,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
CURLcode result;
bool verify;
Curl_hash_offt_init(&ctx->streams, 63, h3_stream_hash_free);
conn_config = Curl_ssl_cf_get_primary_config(cf);
if(!conn_config)
return CURLE_FAILED_INIT;
@ -911,6 +940,7 @@ static void cf_msh3_close(struct Curl_cfilter *cf, struct Curl_easy *data)
MsH3ApiClose(ctx->api);
ctx->api = NULL;
}
Curl_hash_destroy(&ctx->streams);
if(ctx->active) {
/* We share our socket at cf->conn->sock[cf->sockindex] when active.
@ -1019,6 +1049,20 @@ struct Curl_cftype Curl_cft_http3 = {
cf_msh3_query,
};
static struct cf_msh3_ctx *h3_get_msh3_ctx(struct Curl_easy *data)
{
if(data && data->conn) {
struct Curl_cfilter *cf = data->conn->cfilter[FIRSTSOCKET];
while(cf) {
if(cf->cft == &Curl_cft_http3)
return cf->ctx;
cf = cf->next;
}
}
DEBUGF(infof(data, "no filter context found"));
return NULL;
}
CURLcode Curl_cf_msh3_create(struct Curl_cfilter **pcf,
struct Curl_easy *data,
struct connectdata *conn,

View File

@ -44,6 +44,7 @@
#endif
#include "urldata.h"
#include "hash.h"
#include "sendf.h"
#include "strdup.h"
#include "rand.h"
@ -131,6 +132,7 @@ struct cf_ngtcp2_ctx {
struct curltime reconnect_at; /* time the next attempt should start */
struct bufc_pool stream_bufcp; /* chunk pool for streams */
struct dynbuf scratch; /* temp buffer for header construction */
struct Curl_hash streams; /* hash `data->id` to `h3_stream_ctx` */
size_t max_stream_window; /* max flow window for one stream */
uint64_t max_idle_ms; /* max idle time for QUIC connection */
int qlogfd;
@ -160,18 +162,27 @@ struct h3_stream_ctx {
BIT(quic_flow_blocked); /* stream is blocked by QUIC flow control */
};
#define H3_STREAM_CTX(d) ((struct h3_stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->h3_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->id : -2)
#define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
{
Curl_bufq_free(&stream->sendbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
}
static void h3_stream_hash_free(void *stream)
{
DEBUGASSERT(stream);
h3_stream_ctx_free((struct h3_stream_ctx *)stream);
}
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(!data || !data->req.p.http) {
failf(data, "initialization failure, transfer not http initialized");
@ -192,14 +203,18 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
stream->sendbuf_len_in_flight = 0;
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
H3_STREAM_LCTX(data) = stream;
if(!Curl_hash_offt_set(&ctx->streams, data->id, stream)) {
h3_stream_ctx_free(stream);
return CURLE_OUT_OF_MEMORY;
}
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
if(stream) {
@ -214,38 +229,46 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
stream->closed = TRUE;
}
Curl_bufq_free(&stream->sendbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
H3_STREAM_LCTX(data) = NULL;
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf,
struct Curl_easy *data,
int64_t stream_id)
int64_t stream_id,
struct h3_stream_ctx **pstream)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *sdata;
struct h3_stream_ctx *stream;
(void)cf;
if(H3_STREAM_ID(data) == stream_id) {
stream = H3_STREAM_CTX(ctx, data);
if(stream && stream->id == stream_id) {
*pstream = stream;
return data;
}
else {
DEBUGASSERT(data->multi);
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if((sdata->conn == data->conn) && H3_STREAM_ID(sdata) == stream_id) {
if(sdata->conn != data->conn)
continue;
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->id == stream_id) {
*pstream = stream;
return sdata;
}
}
}
*pstream = NULL;
return NULL;
}
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
@ -552,12 +575,12 @@ static int cb_extend_max_stream_data(ngtcp2_conn *tconn, int64_t sid,
if(rv && rv != NGHTTP3_ERR_STREAM_NOT_FOUND) {
return NGTCP2_ERR_CALLBACK_FAILURE;
}
s_data = get_stream_easy(cf, data, stream_id);
stream = H3_STREAM_CTX(s_data);
if(stream && stream->quic_flow_blocked) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] unblock quic flow", stream_id);
s_data = get_stream_easy(cf, data, stream_id, &stream);
if(s_data && stream && stream->quic_flow_blocked) {
CURL_TRC_CF(s_data, cf, "[%" CURL_PRId64 "] unblock quic flow",
stream_id);
stream->quic_flow_blocked = FALSE;
h3_drain_stream(cf, data);
h3_drain_stream(cf, s_data);
}
return 0;
}
@ -723,7 +746,7 @@ static void cf_ngtcp2_adjust_pollset(struct Curl_cfilter *cf,
Curl_pollset_check(data, ps, ctx->q.sockfd, &want_recv, &want_send);
if(want_recv || want_send) {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct cf_call_data save;
bool c_exhaust, s_exhaust;
@ -746,9 +769,10 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t sid,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = (curl_int64_t)sid;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)conn;
(void)stream_id;
@ -778,7 +802,7 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
(void)conn;
@ -827,9 +851,10 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
int fin, void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = (curl_int64_t)sid;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@ -864,7 +889,7 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t sid,
nghttp3_vec h3name = nghttp3_rcbuf_get_buf(name);
nghttp3_vec h3val = nghttp3_rcbuf_get_buf(value);
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@ -1075,7 +1100,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t blen, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
struct cf_call_data save;
struct pkt_io_ctx pktx;
@ -1133,8 +1158,9 @@ static int cb_h3_acked_req_body(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
size_t skiplen;
(void)cf;
@ -1167,8 +1193,9 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nwritten = 0;
size_t nvecs = 0;
(void)cf;
@ -1252,7 +1279,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
*err = h3_data_setup(cf, data);
if(*err)
goto out;
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
DEBUGASSERT(stream);
if(!stream) {
*err = CURLE_FAILED_INIT;
@ -1364,7 +1391,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t sent = 0;
struct cf_call_data save;
struct pkt_io_ctx pktx;
@ -1389,7 +1416,7 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
CURL_TRC_CF(data, cf, "failed to open stream -> %d", *err);
goto out;
}
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
}
else if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
@ -1616,7 +1643,7 @@ static ssize_t read_pkt_to_send(void *userp,
else if(n < 0) {
switch(n) {
case NGTCP2_ERR_STREAM_DATA_BLOCKED: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(x->data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, x->data);
DEBUGASSERT(ndatalen == -1);
nghttp3_conn_block_stream(ctx->h3conn, stream_id);
CURL_TRC_CF(x->data, x->cf, "[%" CURL_PRId64 "] block quic flow",
@ -1830,7 +1857,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
h3_data_done(cf, data);
break;
case CF_CTRL_DATA_DONE_SEND: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream && !stream->send_closed) {
stream->send_closed = TRUE;
stream->upload_left = Curl_bufq_len(&stream->sendbuf);
@ -1839,7 +1866,7 @@ static CURLcode cf_ngtcp2_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_IDLE: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURL_TRC_CF(data, cf, "data idle");
if(stream && !stream->closed) {
result = check_and_set_expiry(cf, data, NULL);
@ -1870,6 +1897,8 @@ static void cf_ngtcp2_ctx_clear(struct cf_ngtcp2_ctx *ctx)
ngtcp2_conn_del(ctx->qconn);
Curl_bufcp_free(&ctx->stream_bufcp);
Curl_dyn_free(&ctx->scratch);
Curl_hash_clean(&ctx->streams);
Curl_hash_destroy(&ctx->streams);
Curl_ssl_peer_cleanup(&ctx->peer);
memset(ctx, 0, sizeof(*ctx));
@ -2008,6 +2037,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
Curl_bufcp_init(&ctx->stream_bufcp, H3_STREAM_CHUNK_SIZE,
H3_STREAM_POOL_SPARES);
Curl_dyn_init(&ctx->scratch, CURL_MAX_HTTP_HEADER);
Curl_hash_offt_init(&ctx->streams, 63, h3_stream_hash_free);
result = Curl_ssl_peer_init(&ctx->peer, cf, TRNSPRT_QUIC);
if(result)

View File

@ -32,6 +32,7 @@
#include <nghttp3/nghttp3.h>
#include "urldata.h"
#include "hash.h"
#include "sendf.h"
#include "strdup.h"
#include "rand.h"
@ -289,6 +290,7 @@ struct cf_osslq_ctx {
struct curltime first_byte_at; /* when first byte was recvd */
struct curltime reconnect_at; /* time the next attempt should start */
struct bufc_pool stream_bufcp; /* chunk pool for streams */
struct Curl_hash streams; /* hash `data->id` to `h3_stream_ctx` */
size_t max_stream_window; /* max flow window for one stream */
uint64_t max_idle_ms; /* max idle time for QUIC connection */
BIT(got_first_byte); /* if first byte was received */
@ -306,6 +308,8 @@ static void cf_osslq_ctx_clear(struct cf_osslq_ctx *ctx)
Curl_vquic_tls_cleanup(&ctx->tls);
vquic_ctx_free(&ctx->q);
Curl_bufcp_free(&ctx->stream_bufcp);
Curl_hash_clean(&ctx->streams);
Curl_hash_destroy(&ctx->streams);
Curl_ssl_peer_cleanup(&ctx->peer);
memset(ctx, 0, sizeof(*ctx));
@ -493,18 +497,29 @@ struct h3_stream_ctx {
BIT(quic_flow_blocked); /* stream is blocked by QUIC flow control */
};
#define H3_STREAM_CTX(d) ((struct h3_stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->h3_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->s.id : -2)
#define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
{
cf_osslq_stream_cleanup(&stream->s);
Curl_bufq_free(&stream->sendbuf);
Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
}
static void h3_stream_hash_free(void *stream)
{
DEBUGASSERT(stream);
h3_stream_ctx_free((struct h3_stream_ctx *)stream);
}
static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(!data || !data->req.p.http) {
failf(data, "initialization failure, transfer not http initialized");
@ -530,14 +545,18 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
stream->recv_buf_nonflow = 0;
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
H3_STREAM_LCTX(data) = stream;
if(!Curl_hash_offt_set(&ctx->streams, data->id, stream)) {
h3_stream_ctx_free(stream);
return CURLE_OUT_OF_MEMORY;
}
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
if(stream) {
@ -551,12 +570,7 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
stream->closed = TRUE;
}
cf_osslq_stream_cleanup(&stream->s);
Curl_bufq_free(&stream->sendbuf);
Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
H3_STREAM_LCTX(data) = NULL;
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
@ -565,7 +579,7 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf,
int64_t stream_id)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct Curl_easy *sdata;
if(stream && stream->s.id == stream_id) {
@ -583,9 +597,11 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf,
else {
DEBUGASSERT(data->multi);
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if((sdata->conn == data->conn) && H3_STREAM_ID(sdata) == stream_id) {
stream = H3_STREAM_CTX(sdata);
return stream? &stream->s : NULL;
if(sdata->conn != data->conn)
continue;
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->s.id == stream_id) {
return &stream->s;
}
}
}
@ -595,7 +611,8 @@ static struct cf_osslq_stream *cf_osslq_get_qstream(struct Curl_cfilter *cf,
static void h3_drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
@ -625,8 +642,9 @@ static int cb_h3_stream_close(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)conn;
(void)stream_id;
@ -659,7 +677,8 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf,
const void *mem, size_t memlen,
bool flow)
{
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
@ -689,8 +708,9 @@ static int cb_h3_recv_data(nghttp3_conn *conn, int64_t stream3_id,
void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
(void)conn;
@ -717,8 +737,9 @@ static int cb_h3_deferred_consume(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)conn;
(void)stream_id;
@ -735,10 +756,11 @@ static int cb_h3_recv_header(nghttp3_conn *conn, int64_t sid,
{
struct Curl_cfilter *cf = user_data;
curl_int64_t stream_id = sid;
struct cf_osslq_ctx *ctx = cf->ctx;
nghttp3_vec h3name = nghttp3_rcbuf_get_buf(name);
nghttp3_vec h3val = nghttp3_rcbuf_get_buf(value);
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@ -795,9 +817,10 @@ static int cb_h3_end_headers(nghttp3_conn *conn, int64_t sid,
int fin, void *user_data, void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = sid;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
(void)conn;
(void)stream_id;
@ -826,9 +849,10 @@ static int cb_h3_stop_sending(nghttp3_conn *conn, int64_t sid,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = sid;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)conn;
(void)app_error_code;
@ -844,9 +868,10 @@ static int cb_h3_reset_stream(nghttp3_conn *conn, int64_t sid,
uint64_t app_error_code, void *user_data,
void *stream_user_data) {
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
curl_int64_t stream_id = sid;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
int rv;
(void)conn;
@ -869,8 +894,9 @@ cb_h3_read_req_body(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nwritten = 0;
size_t nvecs = 0;
(void)cf;
@ -933,8 +959,9 @@ static int cb_h3_acked_stream_data(nghttp3_conn *conn, int64_t stream_id,
void *stream_user_data)
{
struct Curl_cfilter *cf = user_data;
struct cf_osslq_ctx *ctx = cf->ctx;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
size_t skiplen;
(void)cf;
@ -1047,6 +1074,7 @@ static CURLcode cf_osslq_ctx_start(struct Curl_cfilter *cf,
Curl_bufcp_init(&ctx->stream_bufcp, H3_STREAM_CHUNK_SIZE,
H3_STREAM_POOL_SPARES);
Curl_hash_offt_init(&ctx->streams, 63, h3_stream_hash_free);
result = Curl_ssl_peer_init(&ctx->peer, cf, TRNSPRT_QUIC);
if(result)
goto out;
@ -1325,7 +1353,7 @@ static CURLcode cf_progress_ingress(struct Curl_cfilter *cf,
DEBUGASSERT(data->multi);
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if(sdata->conn == data->conn && CURL_WANT_RECV(sdata)) {
stream = H3_STREAM_CTX(sdata);
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && !stream->closed &&
!Curl_bufq_is_full(&stream->recvbuf)) {
result = cf_osslq_stream_recv(&stream->s, cf, sdata);
@ -1352,7 +1380,7 @@ static CURLcode cf_osslq_check_and_unblock(struct Curl_cfilter *cf,
if(ctx->h3.conn) {
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if(sdata->conn == data->conn) {
stream = H3_STREAM_CTX(sdata);
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->s.ssl && stream->s.send_blocked &&
!SSL_want_write(stream->s.ssl)) {
nghttp3_conn_unblock_stream(ctx->h3.conn, stream->s.id);
@ -1693,7 +1721,7 @@ static ssize_t h3_stream_open(struct Curl_cfilter *cf,
*err = h3_data_setup(cf, data);
if(*err)
goto out;
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
DEBUGASSERT(stream);
if(!stream) {
*err = CURLE_FAILED_INIT;
@ -1806,7 +1834,7 @@ static ssize_t cf_osslq_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
struct cf_call_data save;
ssize_t nwritten;
CURLcode result;
@ -1837,7 +1865,7 @@ static ssize_t cf_osslq_send(struct Curl_cfilter *cf, struct Curl_easy *data,
CURL_TRC_CF(data, cf, "failed to open stream -> %d", *err);
goto out;
}
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
}
else if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
@ -1946,7 +1974,7 @@ static ssize_t cf_osslq_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct cf_osslq_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
struct cf_call_data save;
CURLcode result;
@ -2029,7 +2057,8 @@ out:
static bool cf_osslq_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
const struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_osslq_ctx *ctx = cf->ctx;
const struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
return stream && !Curl_bufq_is_empty(&stream->recvbuf);
}
@ -2058,7 +2087,7 @@ static CURLcode cf_osslq_data_event(struct Curl_cfilter *cf,
h3_data_done(cf, data);
break;
case CF_CTRL_DATA_DONE_SEND: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream && !stream->send_closed) {
stream->send_closed = TRUE;
stream->upload_left = Curl_bufq_len(&stream->sendbuf);
@ -2067,7 +2096,7 @@ static CURLcode cf_osslq_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_IDLE: {
struct h3_stream_ctx *stream = H3_STREAM_CTX(data);
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURL_TRC_CF(data, cf, "data idle");
if(stream && !stream->closed) {
result = check_and_set_expiry(cf, data);

View File

@ -29,6 +29,7 @@
#include <openssl/err.h>
#include <openssl/ssl.h>
#include "bufq.h"
#include "hash.h"
#include "urldata.h"
#include "cfilters.h"
#include "cf-socket.h"
@ -98,6 +99,7 @@ struct cf_quiche_ctx {
struct curltime handshake_at; /* time connect handshake finished */
struct curltime reconnect_at; /* time the next attempt should start */
struct bufc_pool stream_bufcp; /* chunk pool for streams */
struct Curl_hash streams; /* hash `data->id` to `stream_ctx` */
curl_off_t data_recvd;
curl_uint64_t max_idle_ms; /* max idle time for QUIC conn */
BIT(goaway); /* got GOAWAY from server */
@ -129,6 +131,8 @@ static void cf_quiche_ctx_clear(struct cf_quiche_ctx *ctx)
Curl_ssl_peer_cleanup(&ctx->peer);
vquic_ctx_free(&ctx->q);
Curl_bufcp_free(&ctx->stream_bufcp);
Curl_hash_clean(&ctx->streams);
Curl_hash_destroy(&ctx->streams);
memset(ctx, 0, sizeof(*ctx));
}
@ -152,23 +156,33 @@ struct stream_ctx {
BIT(quic_flow_blocked); /* stream is blocked by QUIC flow control */
};
#define H3_STREAM_CTX(d) ((struct stream_ctx *)(((d) && (d)->req.p.http)? \
((struct HTTP *)(d)->req.p.http)->h3_ctx \
: NULL))
#define H3_STREAM_LCTX(d) ((struct HTTP *)(d)->req.p.http)->h3_ctx
#define H3_STREAM_ID(d) (H3_STREAM_CTX(d)? \
H3_STREAM_CTX(d)->id : -2)
#define H3_STREAM_CTX(ctx,data) ((struct stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
static void h3_stream_ctx_free(struct stream_ctx *stream)
{
Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
}
static void h3_stream_hash_free(void *stream)
{
DEBUGASSERT(stream);
h3_stream_ctx_free((struct stream_ctx *)stream);
}
static void check_resumes(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct Curl_easy *sdata;
struct stream_ctx *stream;
DEBUGASSERT(data->multi);
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if(sdata->conn == data->conn) {
stream = H3_STREAM_CTX(sdata);
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->quic_flow_blocked) {
stream->quic_flow_blocked = FALSE;
Curl_expire(data, 0, EXPIRE_RUN_NOW);
@ -182,7 +196,7 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream)
return CURLE_OK;
@ -191,18 +205,23 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
if(!stream)
return CURLE_OUT_OF_MEMORY;
H3_STREAM_LCTX(data) = stream;
stream->id = -1;
Curl_bufq_initp(&stream->recvbuf, &ctx->stream_bufcp,
H3_STREAM_RECV_CHUNKS, BUFQ_OPT_SOFT_LIMIT);
Curl_h1_req_parse_init(&stream->h1, H1_PARSE_DEFAULT_MAX_LINE_LEN);
if(!Curl_hash_offt_set(&ctx->streams, data->id, stream)) {
h3_stream_ctx_free(stream);
return CURLE_OUT_OF_MEMORY;
}
return CURLE_OK;
}
static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
if(stream) {
@ -217,17 +236,15 @@ static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
}
stream->closed = TRUE;
}
Curl_bufq_free(&stream->recvbuf);
Curl_h1_req_parse_free(&stream->h1);
free(stream);
H3_STREAM_LCTX(data) = NULL;
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
static void drain_stream(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
unsigned char bits;
(void)cf;
@ -242,22 +259,32 @@ static void drain_stream(struct Curl_cfilter *cf,
static struct Curl_easy *get_stream_easy(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_uint64_t stream3_id)
curl_uint64_t stream_id,
struct stream_ctx **pstream)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct Curl_easy *sdata;
struct stream_ctx *stream;
(void)cf;
if(H3_STREAM_ID(data) == stream3_id) {
stream = H3_STREAM_CTX(ctx, data);
if(stream && stream->id == stream_id) {
*pstream = stream;
return data;
}
else {
DEBUGASSERT(data->multi);
for(sdata = data->multi->easyp; sdata; sdata = sdata->next) {
if((sdata->conn == data->conn) && H3_STREAM_ID(sdata) == stream3_id) {
if(sdata->conn != data->conn)
continue;
stream = H3_STREAM_CTX(ctx, sdata);
if(stream && stream->id == stream_id) {
*pstream = stream;
return sdata;
}
}
}
*pstream = NULL;
return NULL;
}
@ -270,7 +297,8 @@ static CURLcode write_resp_raw(struct Curl_cfilter *cf,
struct Curl_easy *data,
const void *mem, size_t memlen)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result = CURLE_OK;
ssize_t nwritten;
@ -300,7 +328,8 @@ static int cb_each_header(uint8_t *name, size_t name_len,
void *argp)
{
struct cb_ctx *x = argp;
struct stream_ctx *stream = H3_STREAM_CTX(x->data);
struct cf_quiche_ctx *ctx = x->cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, x->data);
CURLcode result;
if(!stream)
@ -340,7 +369,7 @@ static ssize_t stream_resp_read(void *reader_ctx,
{
struct cb_ctx *x = reader_ctx;
struct cf_quiche_ctx *ctx = x->cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(x->data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, x->data);
ssize_t nread;
if(!stream) {
@ -363,7 +392,8 @@ static ssize_t stream_resp_read(void *reader_ctx,
static CURLcode cf_recv_body(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nwritten;
struct cb_ctx cb_ctx;
CURLcode result = CURLE_OK;
@ -421,17 +451,15 @@ static const char *cf_ev_name(quiche_h3_event *ev)
static CURLcode h3_process_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
curl_uint64_t stream3_id,
struct stream_ctx *stream,
quiche_h3_event *ev)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cb_ctx cb_ctx;
CURLcode result = CURLE_OK;
int rc;
if(!stream)
return CURLE_OK;
DEBUGASSERT(stream3_id == stream->id);
switch(quiche_h3_event_type(ev)) {
case QUICHE_H3_EVENT_HEADERS:
stream->resp_got_header = TRUE;
@ -440,10 +468,10 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
rc = quiche_h3_event_for_each_header(ev, cb_each_header, &cb_ctx);
if(rc) {
failf(data, "Error %d in HTTP/3 response header for stream[%"
CURL_PRIu64"]", rc, stream3_id);
CURL_PRIu64"]", rc, stream->id);
return CURLE_RECV_ERROR;
}
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] <- [HEADERS]", stream3_id);
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] <- [HEADERS]", stream->id);
break;
case QUICHE_H3_EVENT_DATA:
@ -453,7 +481,7 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
break;
case QUICHE_H3_EVENT_RESET:
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] RESET", stream3_id);
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] RESET", stream->id);
stream->closed = TRUE;
stream->reset = TRUE;
stream->send_closed = TRUE;
@ -461,7 +489,7 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
break;
case QUICHE_H3_EVENT_FINISHED:
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] CLOSED", stream3_id);
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] CLOSED", stream->id);
if(!stream->resp_hds_complete) {
result = write_resp_raw(cf, data, "\r\n", 2);
if(result)
@ -473,12 +501,12 @@ static CURLcode h3_process_event(struct Curl_cfilter *cf,
break;
case QUICHE_H3_EVENT_GOAWAY:
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] <- [GOAWAY]", stream3_id);
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] <- [GOAWAY]", stream->id);
break;
default:
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] recv, unhandled event %d",
stream3_id, quiche_h3_event_type(ev));
stream->id, quiche_h3_event_type(ev));
break;
}
return result;
@ -488,7 +516,7 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = NULL;
struct Curl_easy *sdata;
quiche_h3_event *ev;
CURLcode result;
@ -500,24 +528,21 @@ static CURLcode cf_poll_events(struct Curl_cfilter *cf,
break;
}
else if(stream3_id < 0) {
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] error poll: %"CURL_PRIu64,
stream? stream->id : -1, stream3_id);
CURL_TRC_CF(data, cf, "error poll: %"CURL_PRId64, stream3_id);
return CURLE_HTTP3;
}
sdata = get_stream_easy(cf, data, stream3_id);
if(!sdata) {
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] discard event %s for "
"unknown [%"CURL_PRIu64"]",
stream? stream->id : -1, cf_ev_name(ev), stream3_id);
sdata = get_stream_easy(cf, data, stream3_id, &stream);
if(!sdata || !stream) {
CURL_TRC_CF(data, cf, "discard event %s for unknown [%"CURL_PRId64"]",
cf_ev_name(ev), stream3_id);
}
else {
result = h3_process_event(cf, sdata, stream3_id, ev);
result = h3_process_event(cf, sdata, stream, ev);
drain_stream(cf, sdata);
if(result) {
CURL_TRC_CF(data, cf, "[%"CURL_PRIu64"] error processing event %s "
"for [%"CURL_PRIu64"] -> %d",
stream? stream->id : -1, cf_ev_name(ev),
CURL_TRC_CF(data, cf, "error processing event %s "
"for [%"CURL_PRIu64"] -> %d", cf_ev_name(ev),
stream3_id, result);
if(data == sdata) {
/* Only report this error to the caller if it is about the
@ -726,7 +751,8 @@ static ssize_t recv_closed_stream(struct Curl_cfilter *cf,
struct Curl_easy *data,
CURLcode *err)
{
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
DEBUGASSERT(stream);
@ -758,7 +784,7 @@ static ssize_t cf_quiche_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
char *buf, size_t len, CURLcode *err)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
ssize_t nread = -1;
CURLcode result;
@ -839,7 +865,7 @@ static ssize_t h3_open_stream(struct Curl_cfilter *cf,
CURLcode *err)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
size_t nheader, i;
curl_int64_t stream3_id;
struct dynhds h2_headers;
@ -851,7 +877,7 @@ static ssize_t h3_open_stream(struct Curl_cfilter *cf,
if(*err) {
return -1;
}
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
DEBUGASSERT(stream);
}
@ -958,7 +984,7 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data,
const void *buf, size_t len, CURLcode *err)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;
ssize_t nwritten;
@ -974,7 +1000,7 @@ static ssize_t cf_quiche_send(struct Curl_cfilter *cf, struct Curl_easy *data,
nwritten = h3_open_stream(cf, data, buf, len, err);
if(nwritten < 0)
goto out;
stream = H3_STREAM_CTX(data);
stream = H3_STREAM_CTX(ctx, data);
}
else if(stream->closed) {
if(stream->resp_hds_complete) {
@ -1067,7 +1093,7 @@ static bool stream_is_writeable(struct Curl_cfilter *cf,
struct Curl_easy *data)
{
struct cf_quiche_ctx *ctx = cf->ctx;
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
return stream && (quiche_conn_stream_writable(
ctx->qconn, (curl_uint64_t)stream->id, 1) > 0);
@ -1085,7 +1111,7 @@ static void cf_quiche_adjust_pollset(struct Curl_cfilter *cf,
Curl_pollset_check(data, ps, ctx->q.sockfd, &want_recv, &want_send);
if(want_recv || want_send) {
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
bool c_exhaust, s_exhaust;
c_exhaust = FALSE; /* Have not found any call in quiche that tells
@ -1107,7 +1133,8 @@ static void cf_quiche_adjust_pollset(struct Curl_cfilter *cf,
static bool cf_quiche_data_pending(struct Curl_cfilter *cf,
const struct Curl_easy *data)
{
const struct stream_ctx *stream = H3_STREAM_CTX(data);
struct cf_quiche_ctx *ctx = cf->ctx;
const struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)cf;
return stream && !Curl_bufq_is_empty(&stream->recvbuf);
}
@ -1129,6 +1156,7 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
struct Curl_easy *data,
int event, int arg1, void *arg2)
{
struct cf_quiche_ctx *ctx = cf->ctx;
CURLcode result = CURLE_OK;
(void)arg1;
@ -1146,7 +1174,7 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
h3_data_done(cf, data);
break;
case CF_CTRL_DATA_DONE_SEND: {
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream && !stream->send_closed) {
unsigned char body[1];
ssize_t sent;
@ -1161,7 +1189,7 @@ static CURLcode cf_quiche_data_event(struct Curl_cfilter *cf,
break;
}
case CF_CTRL_DATA_IDLE: {
struct stream_ctx *stream = H3_STREAM_CTX(data);
struct stream_ctx *stream = H3_STREAM_CTX(ctx, data);
if(stream && !stream->closed) {
result = cf_flush_egress(cf, data);
if(result)
@ -1196,6 +1224,7 @@ static CURLcode cf_connect_start(struct Curl_cfilter *cf,
ctx->max_idle_ms = CURL_QUIC_MAX_IDLE_MS;
Curl_bufcp_init(&ctx->stream_bufcp, H3_STREAM_CHUNK_SIZE,
H3_STREAM_POOL_SPARES);
Curl_hash_offt_init(&ctx->streams, 63, h3_stream_hash_free);
ctx->data_recvd = 0;
result = vquic_ctx_init(&ctx->q);

View File

@ -205,7 +205,7 @@ test1590 test1591 test1592 test1593 test1594 test1595 test1596 test1597 \
test1598 \
test1600 test1601 test1602 test1603 test1604 test1605 test1606 test1607 \
test1608 test1609 test1610 test1611 test1612 test1613 test1614 test1615 \
\
test1616 \
test1620 test1621 \
\
test1630 test1631 test1632 test1633 test1634 test1635 \

22
tests/data/test1616 Normal file
View File

@ -0,0 +1,22 @@
<testcase>
<info>
<keywords>
unittest
hash
</keywords>
</info>
#
# Client-side
<client>
<server>
none
</server>
<features>
unittest
</features>
<name>
Internal hash_offt create/add/destroy testing, exercising clean functions
</name>
</client>
</testcase>

View File

@ -34,7 +34,7 @@ UNITPROGS = unit1300 unit1302 unit1303 unit1304 unit1305 unit1307 \
unit1330 unit1394 unit1395 unit1396 unit1397 unit1398 \
unit1399 \
unit1600 unit1601 unit1602 unit1603 unit1604 unit1605 unit1606 unit1607 \
unit1608 unit1609 unit1610 unit1611 unit1612 unit1614 unit1615 \
unit1608 unit1609 unit1610 unit1611 unit1612 unit1614 unit1615 unit1616 \
unit1620 unit1621 \
unit1650 unit1651 unit1652 unit1653 unit1654 unit1655 \
unit1660 unit1661 \
@ -103,6 +103,8 @@ unit1614_SOURCES = unit1614.c $(UNITFILES)
unit1615_SOURCES = unit1615.c $(UNITFILES)
unit1616_SOURCES = unit1616.c $(UNITFILES)
unit1620_SOURCES = unit1620.c $(UNITFILES)
unit1621_SOURCES = unit1621.c $(UNITFILES)

87
tests/unit/unit1616.c Normal file
View File

@ -0,0 +1,87 @@
/***************************************************************************
* _ _ ____ _
* Project ___| | | | _ \| |
* / __| | | | |_) | |
* | (__| |_| | _ <| |___
* \___|\___/|_| \_\_____|
*
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
*
* This software is licensed as described in the file COPYING, which
* you should have received as part of this distribution. The terms
* are also available at https://curl.se/docs/copyright.html.
*
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
* copies of the Software, and permit persons to whom the Software is
* furnished to do so, under the terms of the COPYING file.
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
* KIND, either express or implied.
*
* SPDX-License-Identifier: curl
*
***************************************************************************/
#include "curlcheck.h"
#define ENABLE_CURLX_PRINTF
#include "curlx.h"
#include "hash.h"
#include "memdebug.h" /* LAST include file */
static struct Curl_hash hash_static;
static void mydtor(void *elem)
{
int *ptr = (int *)elem;
free(ptr);
}
static CURLcode unit_setup(void)
{
Curl_hash_offt_init(&hash_static, 15, mydtor);
return CURLE_OK;
}
static void unit_stop(void)
{
Curl_hash_destroy(&hash_static);
}
UNITTEST_START
int *value, *v;
int *value2;
int *nodep;
curl_off_t key = 20;
curl_off_t key2 = 25;
value = malloc(sizeof(int));
abort_unless(value != NULL, "Out of memory");
*value = 199;
nodep = Curl_hash_offt_set(&hash_static, key, value);
if(!nodep)
free(value);
abort_unless(nodep, "insertion into hash failed");
v = Curl_hash_offt_get(&hash_static, key);
abort_unless(v == value, "lookup present entry failed");
v = Curl_hash_offt_get(&hash_static, key2);
abort_unless(!v, "lookup missing entry failed");
Curl_hash_clean(&hash_static);
/* Attempt to add another key/value pair */
value2 = malloc(sizeof(int));
abort_unless(value2 != NULL, "Out of memory");
*value2 = 204;
nodep = Curl_hash_offt_set(&hash_static, key2, value2);
if(!nodep)
free(value2);
abort_unless(nodep, "insertion into hash failed");
v = Curl_hash_offt_get(&hash_static, key2);
abort_unless(v == value2, "lookup present entry failed");
v = Curl_hash_offt_get(&hash_static, key);
abort_unless(!v, "lookup missing entry failed");
UNITTEST_STOP