client writer: handle pause before deocding
Adds a "cw-pause" client writer in the PROTOCOL phase that buffers output when the client paused the transfer. This prevents content decoding from blowing the buffer in the "cw-out" writer. Added test_02_35 that downloads 2 100MB gzip bombs in parallel and pauses after 1MB of decoded 0's. This is a solution to issue #16280, with some limitations: - cw-out still needs buffering of its own, since it can be paused "in the middle" of a write that started with some KB of gzipped zeros and exploded into several MB of calls to cw-out. - cw-pause will then start buffering on its own *after* the write that caused the pause. cw-pause has no buffer limits, but the data it buffers is still content-encoded. Protocols like http/1.1 stop receiving, h2/h3 have window sizes, so the cw-pause buffer should not grow out of control, at least for these protocols. - the current limit on cw-out's buffer is ~75MB (for whatever historical reason). A potential content-encoding that blows 16KB (the common h2 chunk size) into > 75MB would still blow the buffer, making the transfer fail. A gzip of 0's makes 16KB into ~16MB, so that still works. A better solution would be to allow CURLE_AGAIN handling in the client writer chain and make all content encoders handle that. This would stop explosion of encoding on a pause right away. But this is a large change of the deocoder operations. Reported-by: lf- on github Fixes #16280 Closes #16296
This commit is contained in:
parent
279a4772ae
commit
f78700814d
@ -144,6 +144,7 @@ LIB_CFILES = \
|
||||
curl_threads.c \
|
||||
curl_trc.c \
|
||||
cw-out.c \
|
||||
cw-pause.c \
|
||||
dict.c \
|
||||
dllmain.c \
|
||||
doh.c \
|
||||
@ -291,6 +292,7 @@ LIB_HFILES = \
|
||||
curl_trc.h \
|
||||
curlx.h \
|
||||
cw-out.h \
|
||||
cw-pause.h \
|
||||
dict.h \
|
||||
doh.h \
|
||||
dynbuf.h \
|
||||
|
||||
59
lib/cw-out.c
59
lib/cw-out.c
@ -32,6 +32,7 @@
|
||||
#include "multiif.h"
|
||||
#include "sendf.h"
|
||||
#include "cw-out.h"
|
||||
#include "cw-pause.h"
|
||||
|
||||
/* The last 3 #include files should be in this order */
|
||||
#include "curl_printf.h"
|
||||
@ -198,7 +199,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
|
||||
const char *buf, size_t blen,
|
||||
size_t *pconsumed)
|
||||
{
|
||||
curl_write_callback wcb;
|
||||
curl_write_callback wcb = NULL;
|
||||
void *wcb_data;
|
||||
size_t max_write, min_write;
|
||||
size_t wlen, nwritten;
|
||||
@ -222,7 +223,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
|
||||
Curl_set_in_callback(data, TRUE);
|
||||
nwritten = wcb((char *)buf, 1, wlen, wcb_data);
|
||||
Curl_set_in_callback(data, FALSE);
|
||||
CURL_TRC_WRITE(data, "cw_out, wrote %zu %s bytes -> %zu",
|
||||
CURL_TRC_WRITE(data, "[OUT] wrote %zu %s bytes -> %zu",
|
||||
wlen, (otype == CW_OUT_BODY) ? "body" : "header",
|
||||
nwritten);
|
||||
if(CURL_WRITEFUNC_PAUSE == nwritten) {
|
||||
@ -236,7 +237,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
|
||||
/* mark the connection as RECV paused */
|
||||
data->req.keepon |= KEEP_RECV_PAUSE;
|
||||
ctx->paused = TRUE;
|
||||
CURL_TRC_WRITE(data, "cw_out, PAUSE requested by client");
|
||||
CURL_TRC_WRITE(data, "[OUT] PAUSE requested by client");
|
||||
break;
|
||||
}
|
||||
else if(CURL_WRITEFUNC_ERROR == nwritten) {
|
||||
@ -326,11 +327,16 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx,
|
||||
}
|
||||
|
||||
static CURLcode cw_out_append(struct cw_out_ctx *ctx,
|
||||
struct Curl_easy *data,
|
||||
cw_out_type otype,
|
||||
const char *buf, size_t blen)
|
||||
{
|
||||
if(cw_out_bufs_len(ctx) + blen > DYN_PAUSE_BUFFER)
|
||||
CURL_TRC_WRITE(data, "[OUT] paused, buffering %zu more bytes (%zu/%d)",
|
||||
blen, cw_out_bufs_len(ctx), DYN_PAUSE_BUFFER);
|
||||
if(cw_out_bufs_len(ctx) + blen > DYN_PAUSE_BUFFER) {
|
||||
failf(data, "pause buffer not large enough -> CURLE_TOO_LARGE");
|
||||
return CURLE_TOO_LARGE;
|
||||
}
|
||||
|
||||
/* if we do not have a buffer, or it is of another type, make a new one.
|
||||
* And for CW_OUT_HDS always make a new one, so we "replay" headers
|
||||
@ -364,7 +370,7 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
|
||||
|
||||
if(ctx->buf) {
|
||||
/* still have buffered data, append and flush */
|
||||
result = cw_out_append(ctx, otype, buf, blen);
|
||||
result = cw_out_append(ctx, data, otype, buf, blen);
|
||||
if(result)
|
||||
return result;
|
||||
result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
|
||||
@ -380,7 +386,8 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
|
||||
return result;
|
||||
if(consumed < blen) {
|
||||
/* did not write all, append the rest */
|
||||
result = cw_out_append(ctx, otype, buf + consumed, blen - consumed);
|
||||
result = cw_out_append(ctx, data, otype,
|
||||
buf + consumed, blen - consumed);
|
||||
if(result)
|
||||
goto out;
|
||||
}
|
||||
@ -430,23 +437,18 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data)
|
||||
return FALSE;
|
||||
|
||||
ctx = (struct cw_out_ctx *)cw_out;
|
||||
CURL_TRC_WRITE(data, "cw-out is%spaused", ctx->paused ? "" : " not");
|
||||
return ctx->paused;
|
||||
}
|
||||
|
||||
static CURLcode cw_out_flush(struct Curl_easy *data,
|
||||
bool unpause, bool flush_all)
|
||||
struct Curl_cwriter *cw_out,
|
||||
bool flush_all)
|
||||
{
|
||||
struct Curl_cwriter *cw_out;
|
||||
struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
|
||||
if(cw_out) {
|
||||
struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
|
||||
if(ctx->errored)
|
||||
return CURLE_WRITE_ERROR;
|
||||
if(unpause && ctx->paused)
|
||||
ctx->paused = FALSE;
|
||||
if(ctx->paused)
|
||||
return CURLE_OK; /* not doing it */
|
||||
|
||||
@ -456,18 +458,37 @@ static CURLcode cw_out_flush(struct Curl_easy *data,
|
||||
cw_out_bufs_free(ctx);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
CURLcode Curl_cw_out_unpause(struct Curl_easy *data)
|
||||
{
|
||||
CURL_TRC_WRITE(data, "cw-out unpause");
|
||||
return cw_out_flush(data, TRUE, FALSE);
|
||||
struct Curl_cwriter *cw_out;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
|
||||
if(cw_out) {
|
||||
struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
|
||||
CURL_TRC_WRITE(data, "[OUT] unpause");
|
||||
ctx->paused = FALSE;
|
||||
result = Curl_cw_pause_flush(data);
|
||||
if(!result)
|
||||
result = cw_out_flush(data, cw_out, FALSE);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
CURLcode Curl_cw_out_done(struct Curl_easy *data)
|
||||
{
|
||||
CURL_TRC_WRITE(data, "cw-out done");
|
||||
return cw_out_flush(data, FALSE, TRUE);
|
||||
struct Curl_cwriter *cw_out;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
|
||||
if(cw_out) {
|
||||
CURL_TRC_WRITE(data, "[OUT] done");
|
||||
result = Curl_cw_pause_flush(data);
|
||||
if(!result)
|
||||
result = cw_out_flush(data, cw_out, TRUE);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
242
lib/cw-pause.c
Normal file
242
lib/cw-pause.c
Normal file
@ -0,0 +1,242 @@
|
||||
/***************************************************************************
|
||||
* _ _ ____ _
|
||||
* Project ___| | | | _ \| |
|
||||
* / __| | | | |_) | |
|
||||
* | (__| |_| | _ <| |___
|
||||
* \___|\___/|_| \_\_____|
|
||||
*
|
||||
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
|
||||
*
|
||||
* This software is licensed as described in the file COPYING, which
|
||||
* you should have received as part of this distribution. The terms
|
||||
* are also available at https://curl.se/docs/copyright.html.
|
||||
*
|
||||
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
|
||||
* copies of the Software, and permit persons to whom the Software is
|
||||
* furnished to do so, under the terms of the COPYING file.
|
||||
*
|
||||
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
|
||||
* KIND, either express or implied.
|
||||
*
|
||||
* SPDX-License-Identifier: curl
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#include "curl_setup.h"
|
||||
|
||||
#include <curl/curl.h>
|
||||
|
||||
#include "urldata.h"
|
||||
#include "bufq.h"
|
||||
#include "cfilters.h"
|
||||
#include "headers.h"
|
||||
#include "multiif.h"
|
||||
#include "sendf.h"
|
||||
#include "cw-pause.h"
|
||||
|
||||
/* The last 3 #include files should be in this order */
|
||||
#include "curl_printf.h"
|
||||
#include "curl_memory.h"
|
||||
#include "memdebug.h"
|
||||
|
||||
|
||||
/* body dynbuf sizes */
|
||||
#define CW_PAUSE_BUF_CHUNK (16 * 1024)
|
||||
/* when content decoding, write data in chunks */
|
||||
#define CW_PAUSE_DEC_WRITE_CHUNK (4096)
|
||||
|
||||
struct cw_pause_buf {
|
||||
struct cw_pause_buf *next;
|
||||
struct bufq b;
|
||||
int type;
|
||||
};
|
||||
|
||||
static struct cw_pause_buf *cw_pause_buf_create(int type, size_t buflen)
|
||||
{
|
||||
struct cw_pause_buf *cwbuf = calloc(1, sizeof(*cwbuf));
|
||||
if(cwbuf) {
|
||||
cwbuf->type = type;
|
||||
if(type & CLIENTWRITE_BODY)
|
||||
Curl_bufq_init2(&cwbuf->b, CW_PAUSE_BUF_CHUNK, 1,
|
||||
(BUFQ_OPT_SOFT_LIMIT|BUFQ_OPT_NO_SPARES));
|
||||
else
|
||||
Curl_bufq_init(&cwbuf->b, buflen, 1);
|
||||
}
|
||||
return cwbuf;
|
||||
}
|
||||
|
||||
static void cw_pause_buf_free(struct cw_pause_buf *cwbuf)
|
||||
{
|
||||
if(cwbuf) {
|
||||
Curl_bufq_free(&cwbuf->b);
|
||||
free(cwbuf);
|
||||
}
|
||||
}
|
||||
|
||||
struct cw_pause_ctx {
|
||||
struct Curl_cwriter super;
|
||||
struct cw_pause_buf *buf;
|
||||
size_t buf_total;
|
||||
};
|
||||
|
||||
static CURLcode cw_pause_write(struct Curl_easy *data,
|
||||
struct Curl_cwriter *writer, int type,
|
||||
const char *buf, size_t nbytes);
|
||||
static void cw_pause_close(struct Curl_easy *data,
|
||||
struct Curl_cwriter *writer);
|
||||
static CURLcode cw_pause_init(struct Curl_easy *data,
|
||||
struct Curl_cwriter *writer);
|
||||
|
||||
struct Curl_cwtype Curl_cwt_pause = {
|
||||
"cw-pause",
|
||||
NULL,
|
||||
cw_pause_init,
|
||||
cw_pause_write,
|
||||
cw_pause_close,
|
||||
sizeof(struct cw_pause_ctx)
|
||||
};
|
||||
|
||||
static CURLcode cw_pause_init(struct Curl_easy *data,
|
||||
struct Curl_cwriter *writer)
|
||||
{
|
||||
struct cw_pause_ctx *ctx = writer->ctx;
|
||||
(void)data;
|
||||
ctx->buf = NULL;
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
static void cw_pause_bufs_free(struct cw_pause_ctx *ctx)
|
||||
{
|
||||
while(ctx->buf) {
|
||||
struct cw_pause_buf *next = ctx->buf->next;
|
||||
cw_pause_buf_free(ctx->buf);
|
||||
ctx->buf = next;
|
||||
}
|
||||
}
|
||||
|
||||
static void cw_pause_close(struct Curl_easy *data, struct Curl_cwriter *writer)
|
||||
{
|
||||
struct cw_pause_ctx *ctx = writer->ctx;
|
||||
|
||||
(void)data;
|
||||
cw_pause_bufs_free(ctx);
|
||||
}
|
||||
|
||||
static CURLcode cw_pause_flush(struct Curl_easy *data,
|
||||
struct Curl_cwriter *cw_pause)
|
||||
{
|
||||
struct cw_pause_ctx *ctx = (struct cw_pause_ctx *)cw_pause;
|
||||
bool decoding = Curl_cwriter_is_content_decoding(data);
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
/* write the end of the chain until it blocks or gets empty */
|
||||
while(ctx->buf && !Curl_cwriter_is_paused(data)) {
|
||||
struct cw_pause_buf **plast = &ctx->buf;
|
||||
size_t blen, wlen = 0;
|
||||
const unsigned char *buf = NULL;
|
||||
|
||||
while((*plast)->next) /* got to last in list */
|
||||
plast = &(*plast)->next;
|
||||
if(Curl_bufq_peek(&(*plast)->b, &buf, &blen)) {
|
||||
wlen = (decoding && ((*plast)->type & CLIENTWRITE_BODY)) ?
|
||||
CURLMIN(blen, CW_PAUSE_DEC_WRITE_CHUNK) : blen;
|
||||
result = Curl_cwriter_write(data, cw_pause->next, (*plast)->type,
|
||||
(const char *)buf, wlen);
|
||||
CURL_TRC_WRITE(data, "[PAUSE] flushed %zu/%zu bytes, type=%x -> %d",
|
||||
wlen, ctx->buf_total, (*plast)->type, result);
|
||||
Curl_bufq_skip(&(*plast)->b, wlen);
|
||||
DEBUGASSERT(ctx->buf_total >= wlen);
|
||||
ctx->buf_total -= wlen;
|
||||
if(result)
|
||||
return result;
|
||||
}
|
||||
else if((*plast)->type & CLIENTWRITE_EOS) {
|
||||
result = Curl_cwriter_write(data, cw_pause->next, (*plast)->type,
|
||||
(const char *)buf, 0);
|
||||
CURL_TRC_WRITE(data, "[PAUSE] flushed 0/%zu bytes, type=%x -> %d",
|
||||
ctx->buf_total, (*plast)->type, result);
|
||||
}
|
||||
|
||||
if(Curl_bufq_is_empty(&(*plast)->b)) {
|
||||
cw_pause_buf_free(*plast);
|
||||
*plast = NULL;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
static CURLcode cw_pause_write(struct Curl_easy *data,
|
||||
struct Curl_cwriter *writer, int type,
|
||||
const char *buf, size_t blen)
|
||||
{
|
||||
struct cw_pause_ctx *ctx = writer->ctx;
|
||||
CURLcode result = CURLE_OK;
|
||||
size_t wlen = 0;
|
||||
bool decoding = Curl_cwriter_is_content_decoding(data);
|
||||
|
||||
if(ctx->buf && !Curl_cwriter_is_paused(data)) {
|
||||
result = cw_pause_flush(data, writer);
|
||||
if(result)
|
||||
return result;
|
||||
}
|
||||
|
||||
while(!ctx->buf && !Curl_cwriter_is_paused(data)) {
|
||||
int wtype = type;
|
||||
DEBUGASSERT(!ctx->buf);
|
||||
/* content decoding might blow up size considerably, write smaller
|
||||
* chunks to make pausing need buffer less. */
|
||||
wlen = (decoding && (type & CLIENTWRITE_BODY)) ?
|
||||
CURLMIN(blen, CW_PAUSE_DEC_WRITE_CHUNK) : blen;
|
||||
if(wlen < blen)
|
||||
wtype &= ~CLIENTWRITE_EOS;
|
||||
result = Curl_cwriter_write(data, writer->next, wtype, buf, wlen);
|
||||
CURL_TRC_WRITE(data, "[PAUSE] writing %zu/%zu bytes of type %x -> %d",
|
||||
wlen, blen, wtype, result);
|
||||
if(result)
|
||||
return result;
|
||||
buf += wlen;
|
||||
blen -= wlen;
|
||||
if(!blen)
|
||||
return result;
|
||||
}
|
||||
|
||||
do {
|
||||
size_t nwritten = 0;
|
||||
if(ctx->buf && (ctx->buf->type == type) && (type & CLIENTWRITE_BODY)) {
|
||||
/* same type and body, append to current buffer which has a soft
|
||||
* limit and should take everything up to OOM. */
|
||||
result = Curl_bufq_cwrite(&ctx->buf->b, buf, blen, &nwritten);
|
||||
}
|
||||
else {
|
||||
/* Need a new buf, type changed */
|
||||
struct cw_pause_buf *cwbuf = cw_pause_buf_create(type, blen);
|
||||
if(!cwbuf)
|
||||
return CURLE_OUT_OF_MEMORY;
|
||||
cwbuf->next = ctx->buf;
|
||||
ctx->buf = cwbuf;
|
||||
result = Curl_bufq_cwrite(&ctx->buf->b, buf, blen, &nwritten);
|
||||
}
|
||||
CURL_TRC_WRITE(data, "[PAUSE] buffer %zu more bytes of type %x, "
|
||||
"total=%zu -> %d", nwritten, type, ctx->buf_total + wlen,
|
||||
result);
|
||||
if(result)
|
||||
return result;
|
||||
buf += nwritten;
|
||||
blen -= nwritten;
|
||||
ctx->buf_total += nwritten;
|
||||
} while(blen);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
CURLcode Curl_cw_pause_flush(struct Curl_easy *data)
|
||||
{
|
||||
struct Curl_cwriter *cw_pause;
|
||||
CURLcode result = CURLE_OK;
|
||||
|
||||
cw_pause = Curl_cwriter_get_by_type(data, &Curl_cwt_pause);
|
||||
if(cw_pause)
|
||||
result = cw_pause_flush(data, cw_pause);
|
||||
|
||||
return result;
|
||||
}
|
||||
40
lib/cw-pause.h
Normal file
40
lib/cw-pause.h
Normal file
@ -0,0 +1,40 @@
|
||||
#ifndef HEADER_CURL_CW_PAUSE_H
|
||||
#define HEADER_CURL_CW_PAUSE_H
|
||||
/***************************************************************************
|
||||
* _ _ ____ _
|
||||
* Project ___| | | | _ \| |
|
||||
* / __| | | | |_) | |
|
||||
* | (__| |_| | _ <| |___
|
||||
* \___|\___/|_| \_\_____|
|
||||
*
|
||||
* Copyright (C) Daniel Stenberg, <daniel@haxx.se>, et al.
|
||||
*
|
||||
* This software is licensed as described in the file COPYING, which
|
||||
* you should have received as part of this distribution. The terms
|
||||
* are also available at https://curl.se/docs/copyright.html.
|
||||
*
|
||||
* You may opt to use, copy, modify, merge, publish, distribute and/or sell
|
||||
* copies of the Software, and permit persons to whom the Software is
|
||||
* furnished to do so, under the terms of the COPYING file.
|
||||
*
|
||||
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
|
||||
* KIND, either express or implied.
|
||||
*
|
||||
* SPDX-License-Identifier: curl
|
||||
*
|
||||
***************************************************************************/
|
||||
|
||||
#include "curl_setup.h"
|
||||
|
||||
#include "sendf.h"
|
||||
|
||||
/**
|
||||
* The client writer type "cw-pause" that buffers writes for
|
||||
* paused transfer writes.
|
||||
*/
|
||||
extern struct Curl_cwtype Curl_cwt_pause;
|
||||
|
||||
CURLcode Curl_cw_pause_flush(struct Curl_easy *data);
|
||||
|
||||
|
||||
#endif /* HEADER_CURL_CW_PAUSE_H */
|
||||
31
lib/http2.c
31
lib/http2.c
@ -219,6 +219,7 @@ struct h2_stream_ctx {
|
||||
BIT(bodystarted);
|
||||
BIT(body_eos); /* the complete body has been added to `sendbuf` and
|
||||
* is being/has been processed from there. */
|
||||
BIT(write_paused); /* stream write is paused */
|
||||
};
|
||||
|
||||
#define H2_STREAM_CTX(ctx,data) ((struct h2_stream_ctx *)(\
|
||||
@ -289,14 +290,14 @@ static int32_t cf_h2_get_desired_local_win(struct Curl_cfilter *cf,
|
||||
|
||||
static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h2_stream_ctx *stream,
|
||||
bool paused)
|
||||
struct h2_stream_ctx *stream)
|
||||
{
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
int32_t dwsize;
|
||||
int rv;
|
||||
|
||||
dwsize = paused ? 0 : cf_h2_get_desired_local_win(cf, data);
|
||||
dwsize = (stream->write_paused || stream->xfer_result) ?
|
||||
0 : cf_h2_get_desired_local_win(cf, data);
|
||||
if(dwsize != stream->local_window_size) {
|
||||
int32_t wsize = nghttp2_session_get_stream_effective_local_window_size(
|
||||
ctx->h2, stream->id);
|
||||
@ -332,13 +333,11 @@ static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf,
|
||||
|
||||
static CURLcode cf_h2_update_local_win(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data,
|
||||
struct h2_stream_ctx *stream,
|
||||
bool paused)
|
||||
struct h2_stream_ctx *stream)
|
||||
{
|
||||
(void)cf;
|
||||
(void)data;
|
||||
(void)stream;
|
||||
(void)paused;
|
||||
return CURLE_OK;
|
||||
}
|
||||
#endif /* !NGHTTP2_HAS_SET_LOCAL_WINDOW_SIZE */
|
||||
@ -1058,7 +1057,7 @@ static void h2_xfer_write_resp_hd(struct Curl_cfilter *cf,
|
||||
if(!stream->xfer_result) {
|
||||
stream->xfer_result = Curl_xfer_write_resp_hd(data, buf, blen, eos);
|
||||
if(!stream->xfer_result && !eos)
|
||||
stream->xfer_result = cf_h2_update_local_win(cf, data, stream, FALSE);
|
||||
stream->xfer_result = cf_h2_update_local_win(cf, data, stream);
|
||||
if(stream->xfer_result)
|
||||
CURL_TRC_CF(data, cf, "[%d] error %d writing %zu bytes of headers",
|
||||
stream->id, stream->xfer_result, blen);
|
||||
@ -1074,8 +1073,6 @@ static void h2_xfer_write_resp(struct Curl_cfilter *cf,
|
||||
/* If we already encountered an error, skip further writes */
|
||||
if(!stream->xfer_result)
|
||||
stream->xfer_result = Curl_xfer_write_resp(data, buf, blen, eos);
|
||||
if(!stream->xfer_result && !eos)
|
||||
stream->xfer_result = cf_h2_update_local_win(cf, data, stream, FALSE);
|
||||
/* If the transfer write is errored, we do not want any more data */
|
||||
if(stream->xfer_result) {
|
||||
struct cf_h2_ctx *ctx = cf->ctx;
|
||||
@ -1085,6 +1082,17 @@ static void h2_xfer_write_resp(struct Curl_cfilter *cf,
|
||||
nghttp2_submit_rst_stream(ctx->h2, 0, stream->id,
|
||||
(uint32_t)NGHTTP2_ERR_CALLBACK_FAILURE);
|
||||
}
|
||||
else if(!stream->write_paused && Curl_xfer_write_is_paused(data)) {
|
||||
CURL_TRC_CF(data, cf, "[%d] stream output paused", stream->id);
|
||||
stream->write_paused = TRUE;
|
||||
}
|
||||
else if(stream->write_paused && !Curl_xfer_write_is_paused(data)) {
|
||||
CURL_TRC_CF(data, cf, "[%d] stream output unpaused", stream->id);
|
||||
stream->write_paused = FALSE;
|
||||
}
|
||||
|
||||
if(!stream->xfer_result && !eos)
|
||||
stream->xfer_result = cf_h2_update_local_win(cf, data, stream);
|
||||
}
|
||||
|
||||
static CURLcode on_stream_frame(struct Curl_cfilter *cf,
|
||||
@ -2579,7 +2587,10 @@ static CURLcode http2_data_pause(struct Curl_cfilter *cf,
|
||||
|
||||
DEBUGASSERT(data);
|
||||
if(ctx && ctx->h2 && stream) {
|
||||
CURLcode result = cf_h2_update_local_win(cf, data, stream, pause);
|
||||
CURLcode result;
|
||||
|
||||
stream->write_paused = pause;
|
||||
result = cf_h2_update_local_win(cf, data, stream);
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
|
||||
41
lib/sendf.c
41
lib/sendf.c
@ -42,6 +42,7 @@
|
||||
#include "connect.h"
|
||||
#include "content_encoding.h"
|
||||
#include "cw-out.h"
|
||||
#include "cw-pause.h"
|
||||
#include "vtls/vtls.h"
|
||||
#include "vssh/ssh.h"
|
||||
#include "easyif.h"
|
||||
@ -433,21 +434,37 @@ static CURLcode do_init_writer_stack(struct Curl_easy *data)
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
result = Curl_cwriter_create(&writer, data, &cw_download, CURL_CW_PROTOCOL);
|
||||
if(result)
|
||||
return result;
|
||||
/* This places the "pause" writer behind the "download" writer that
|
||||
* is added below. Meaning the "download" can do checks on content length
|
||||
* and other things *before* write outs are buffered for paused transfers. */
|
||||
result = Curl_cwriter_create(&writer, data, &Curl_cwt_pause,
|
||||
CURL_CW_PROTOCOL);
|
||||
if(!result) {
|
||||
result = Curl_cwriter_add(data, writer);
|
||||
if(result) {
|
||||
if(result)
|
||||
Curl_cwriter_free(data, writer);
|
||||
}
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
result = Curl_cwriter_create(&writer, data, &cw_download, CURL_CW_PROTOCOL);
|
||||
if(!result) {
|
||||
result = Curl_cwriter_add(data, writer);
|
||||
if(result)
|
||||
Curl_cwriter_free(data, writer);
|
||||
}
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
result = Curl_cwriter_create(&writer, data, &cw_raw, CURL_CW_RAW);
|
||||
if(result)
|
||||
return result;
|
||||
if(!result) {
|
||||
result = Curl_cwriter_add(data, writer);
|
||||
if(result) {
|
||||
if(result)
|
||||
Curl_cwriter_free(data, writer);
|
||||
}
|
||||
if(result)
|
||||
return result;
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -494,6 +511,16 @@ struct Curl_cwriter *Curl_cwriter_get_by_type(struct Curl_easy *data,
|
||||
return NULL;
|
||||
}
|
||||
|
||||
bool Curl_cwriter_is_content_decoding(struct Curl_easy *data)
|
||||
{
|
||||
struct Curl_cwriter *writer;
|
||||
for(writer = data->req.writer_stack; writer; writer = writer->next) {
|
||||
if(writer->phase == CURL_CW_CONTENT_DECODE)
|
||||
return TRUE;
|
||||
}
|
||||
return FALSE;
|
||||
}
|
||||
|
||||
bool Curl_cwriter_is_paused(struct Curl_easy *data)
|
||||
{
|
||||
return Curl_cw_out_is_paused(data);
|
||||
|
||||
@ -182,6 +182,8 @@ CURLcode Curl_cwriter_write(struct Curl_easy *data,
|
||||
*/
|
||||
bool Curl_cwriter_is_paused(struct Curl_easy *data);
|
||||
|
||||
bool Curl_cwriter_is_content_decoding(struct Curl_easy *data);
|
||||
|
||||
/**
|
||||
* Unpause client writer and flush any buffered date to the client.
|
||||
*/
|
||||
|
||||
@ -880,6 +880,11 @@ CURLcode Curl_xfer_write_resp(struct Curl_easy *data,
|
||||
return result;
|
||||
}
|
||||
|
||||
bool Curl_xfer_write_is_paused(struct Curl_easy *data)
|
||||
{
|
||||
return Curl_cwriter_is_paused(data);
|
||||
}
|
||||
|
||||
CURLcode Curl_xfer_write_resp_hd(struct Curl_easy *data,
|
||||
const char *hd0, size_t hdlen, bool is_eos)
|
||||
{
|
||||
|
||||
@ -55,6 +55,8 @@ CURLcode Curl_xfer_write_resp(struct Curl_easy *data,
|
||||
const char *buf, size_t blen,
|
||||
bool is_eos);
|
||||
|
||||
bool Curl_xfer_write_is_paused(struct Curl_easy *data);
|
||||
|
||||
/**
|
||||
* Write a single "header" line from a server response.
|
||||
* @param hd0 the 0-terminated, single header line
|
||||
|
||||
@ -159,6 +159,7 @@ struct transfer {
|
||||
int paused;
|
||||
int resumed;
|
||||
int done;
|
||||
CURLcode result;
|
||||
};
|
||||
|
||||
static size_t transfer_count = 1;
|
||||
@ -240,6 +241,7 @@ static int setup(CURL *hnd, const char *url, struct transfer *t,
|
||||
curl_easy_setopt(hnd, CURLOPT_HTTP_VERSION, http_version);
|
||||
curl_easy_setopt(hnd, CURLOPT_SSL_VERIFYPEER, 0L);
|
||||
curl_easy_setopt(hnd, CURLOPT_SSL_VERIFYHOST, 0L);
|
||||
curl_easy_setopt(hnd, CURLOPT_ACCEPT_ENCODING, "");
|
||||
curl_easy_setopt(hnd, CURLOPT_BUFFERSIZE, (long)(128 * 1024));
|
||||
curl_easy_setopt(hnd, CURLOPT_WRITEFUNCTION, my_write_cb);
|
||||
curl_easy_setopt(hnd, CURLOPT_WRITEDATA, t);
|
||||
@ -472,7 +474,9 @@ int main(int argc, char *argv[])
|
||||
t = get_transfer_for_easy(e);
|
||||
if(t) {
|
||||
t->done = 1;
|
||||
fprintf(stderr, "[t-%d] FINISHED\n", t->idx);
|
||||
t->result = m->data.result;
|
||||
fprintf(stderr, "[t-%d] FINISHED with result %d\n",
|
||||
t->idx, t->result);
|
||||
if(use_earlydata) {
|
||||
curl_off_t sent;
|
||||
curl_easy_getinfo(e, CURLINFO_EARLYDATA_SENT_T, &sent);
|
||||
@ -551,6 +555,8 @@ int main(int argc, char *argv[])
|
||||
curl_easy_cleanup(t->easy);
|
||||
t->easy = NULL;
|
||||
}
|
||||
if(t->result)
|
||||
result = t->result;
|
||||
}
|
||||
free(transfers);
|
||||
|
||||
|
||||
@ -56,6 +56,7 @@ class TestDownload:
|
||||
env.make_data_file(indir=indir, fname="data-1m", fsize=1024*1024)
|
||||
env.make_data_file(indir=indir, fname="data-10m", fsize=10*1024*1024)
|
||||
env.make_data_file(indir=indir, fname="data-50m", fsize=50*1024*1024)
|
||||
env.make_data_gzipbomb(indir=indir, fname="bomb-100m.txt", fsize=100*1024*1024)
|
||||
|
||||
# download 1 file
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
@ -405,7 +406,7 @@ class TestDownload:
|
||||
'-n', f'{count}', '-m', f'{max_parallel}', '-a',
|
||||
'-A', f'{abort_offset}', '-V', proto, url
|
||||
])
|
||||
r.check_exit_code(0)
|
||||
r.check_exit_code(42) # CURLE_ABORTED_BY_CALLBACK
|
||||
srcfile = os.path.join(httpd.docs_dir, docname)
|
||||
# downloads should be there, but not necessarily complete
|
||||
self.check_downloads(client, srcfile, count, complete=False)
|
||||
@ -434,7 +435,7 @@ class TestDownload:
|
||||
'-n', f'{count}', '-m', f'{max_parallel}', '-a',
|
||||
'-F', f'{fail_offset}', '-V', proto, url
|
||||
])
|
||||
r.check_exit_code(0)
|
||||
r.check_exit_code(23) # CURLE_WRITE_ERROR
|
||||
srcfile = os.path.join(httpd.docs_dir, docname)
|
||||
# downloads should be there, but not necessarily complete
|
||||
self.check_downloads(client, srcfile, count, complete=False)
|
||||
@ -615,11 +616,11 @@ class TestDownload:
|
||||
assert reused_session, 'session was not reused for 2nd transfer'
|
||||
assert earlydata[0] == 0, f'{earlydata}'
|
||||
if proto == 'http/1.1':
|
||||
assert earlydata[1] == 69, f'{earlydata}'
|
||||
assert earlydata[1] == 111, f'{earlydata}'
|
||||
elif proto == 'h2':
|
||||
assert earlydata[1] == 107, f'{earlydata}'
|
||||
assert earlydata[1] == 127, f'{earlydata}'
|
||||
elif proto == 'h3':
|
||||
assert earlydata[1] == 67, f'{earlydata}'
|
||||
assert earlydata[1] == 109, f'{earlydata}'
|
||||
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2'])
|
||||
@pytest.mark.parametrize("max_host_conns", [0, 1, 5])
|
||||
@ -688,3 +689,30 @@ class TestDownload:
|
||||
n = int(m.group(1))
|
||||
assert n <= max_total_conns
|
||||
assert matched_lines > 0
|
||||
|
||||
# 2 parallel transers, pause and resume. Load a 100 MB zip bomb from
|
||||
# the server with "Content-Encoding: gzip" that gets exloded during
|
||||
# response writing to the client. Client pauses after 1MB unzipped data
|
||||
# and causes buffers to fill while the server sends more response
|
||||
# data.
|
||||
# * http/1.1: not much buffering is done as curl does no longer
|
||||
# serve the connections that are paused
|
||||
# * h2/h3: server continues sending what the stream window allows and
|
||||
# since the one connection involved unpaused transfers, data continues
|
||||
# to be received, requiring buffering.
|
||||
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
|
||||
def test_02_35_pause_bomb(self, env: Env, httpd, nghttpx, proto):
|
||||
if proto == 'h3' and not env.have_h3():
|
||||
pytest.skip("h3 not supported")
|
||||
count = 2
|
||||
pause_offset = 1024 * 1024
|
||||
docname = 'bomb-100m.txt.var'
|
||||
url = f'https://localhost:{env.https_port}/{docname}'
|
||||
client = LocalClient(name='hx-download', env=env)
|
||||
if not client.exists():
|
||||
pytest.skip(f'example client not built: {client.name}')
|
||||
r = client.run(args=[
|
||||
'-n', f'{count}', '-m', f'{count}',
|
||||
'-P', f'{pause_offset}', '-V', proto, url
|
||||
])
|
||||
r.check_exit_code(0)
|
||||
|
||||
@ -234,7 +234,7 @@ class TestCaddy:
|
||||
earlydata[int(m.group(1))] = int(m.group(2))
|
||||
assert earlydata[0] == 0, f'{earlydata}'
|
||||
if proto == 'h3':
|
||||
assert earlydata[1] == 71, f'{earlydata}'
|
||||
assert earlydata[1] == 113, f'{earlydata}'
|
||||
else:
|
||||
# Caddy does not support early data on TCP
|
||||
assert earlydata[1] == 0, f'{earlydata}'
|
||||
|
||||
@ -24,6 +24,7 @@
|
||||
#
|
||||
###########################################################################
|
||||
#
|
||||
import gzip
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
@ -618,3 +619,24 @@ class Env:
|
||||
i = int(fsize / line_length) + 1
|
||||
fd.write(f"{i:09d}-{s}"[0:remain-1] + "\n")
|
||||
return fpath
|
||||
|
||||
def make_data_gzipbomb(self, indir: str, fname: str, fsize: int) -> str:
|
||||
fpath = os.path.join(indir, fname)
|
||||
gzpath = f'{fpath}.gz'
|
||||
varpath = f'{fpath}.var'
|
||||
|
||||
with open(fpath, 'w') as fd:
|
||||
fd.write('not what we are looking for!\n')
|
||||
count = int(fsize / 1024)
|
||||
zero1k = bytearray(1024)
|
||||
with gzip.open(gzpath, 'wb') as fd:
|
||||
for _ in range(count):
|
||||
fd.write(zero1k)
|
||||
with open(varpath, 'w') as fd:
|
||||
fd.write(f'URI: {fname}\n')
|
||||
fd.write('\n')
|
||||
fd.write(f'URI: {fname}.gz\n')
|
||||
fd.write('Content-Type: text/plain\n')
|
||||
fd.write('Content-Encoding: x-gzip\n')
|
||||
fd.write('\n')
|
||||
return fpath
|
||||
|
||||
@ -48,7 +48,7 @@ class Httpd:
|
||||
'authn_core', 'authn_file',
|
||||
'authz_user', 'authz_core', 'authz_host',
|
||||
'auth_basic', 'auth_digest',
|
||||
'alias', 'env', 'filter', 'headers', 'mime', 'setenvif',
|
||||
'alias', 'env', 'filter', 'headers', 'mime', 'setenvif', 'negotiation',
|
||||
'socache_shmcb',
|
||||
'rewrite', 'http2', 'ssl', 'proxy', 'proxy_http', 'proxy_connect',
|
||||
'brotli',
|
||||
@ -269,6 +269,8 @@ class Httpd:
|
||||
f'Listen {self.env.proxys_port}',
|
||||
f'TypesConfig "{self._conf_dir}/mime.types',
|
||||
'SSLSessionCache "shmcb:ssl_gcache_data(32000)"',
|
||||
'AddEncoding x-gzip .gz .tgz .gzip',
|
||||
'AddHandler type-map .var',
|
||||
]
|
||||
if 'base' in self._extra_configs:
|
||||
conf.extend(self._extra_configs['base'])
|
||||
@ -399,8 +401,11 @@ class Httpd:
|
||||
fd.write("\n".join(conf))
|
||||
with open(os.path.join(self._conf_dir, 'mime.types'), 'w') as fd:
|
||||
fd.write("\n".join([
|
||||
'text/plain txt',
|
||||
'text/html html',
|
||||
'application/json json',
|
||||
'application/x-gzip gzip',
|
||||
'application/x-gzip gz',
|
||||
''
|
||||
]))
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user