cw-out: improved error handling

- remember error encountered in invoking write callback and always fail
  afterwards without further invokes

- check behaviour in test_02_17 with h2-pausing client

Reported-by: Pavel Kropachev
Fixes #13337
Closes #13340
This commit is contained in:
Stefan Eissing 2024-04-10 14:52:34 +02:00 committed by Daniel Stenberg
parent 5e3fd347c5
commit 270a25c011
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
9 changed files with 197 additions and 64 deletions

View File

@ -102,6 +102,8 @@ static void cw_out_buf_free(struct cw_out_buf *cwbuf)
struct cw_out_ctx {
struct Curl_cwriter super;
struct cw_out_buf *buf;
BIT(paused);
BIT(errored);
};
static CURLcode cw_out_write(struct Curl_easy *data,
@ -201,7 +203,10 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
size_t max_write, min_write;
size_t wlen, nwritten;
(void)ctx;
/* If we errored once, we do not invoke the client callback again */
if(ctx->errored)
return CURLE_WRITE_ERROR;
/* write callbacks may get NULLed by the client between calls. */
cw_get_writefunc(data, otype, &wcb, &wcb_data, &max_write, &min_write);
if(!wcb) {
@ -210,7 +215,7 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
}
*pconsumed = 0;
while(blen && !(data->req.keepon & KEEP_RECV_PAUSE)) {
while(blen && !ctx->paused) {
if(!flush_all && blen < min_write)
break;
wlen = max_write? CURLMIN(blen, max_write) : blen;
@ -230,10 +235,15 @@ static CURLcode cw_out_ptr_flush(struct cw_out_ctx *ctx,
}
/* mark the connection as RECV paused */
data->req.keepon |= KEEP_RECV_PAUSE;
ctx->paused = TRUE;
CURL_TRC_WRITE(data, "cw_out, PAUSE requested by client");
break;
}
if(nwritten != wlen) {
else if(CURL_WRITEFUNC_ERROR == nwritten) {
failf(data, "client returned ERROR on write of %zu bytes", wlen);
return CURLE_WRITE_ERROR;
}
else if(nwritten != wlen) {
failf(data, "Failure writing output to destination, "
"passed %zu returned %zd", wlen, nwritten);
return CURLE_WRITE_ERROR;
@ -287,7 +297,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx,
if(!cwbuf)
return CURLE_OK;
if(data->req.keepon & KEEP_RECV_PAUSE)
if(ctx->paused)
return CURLE_OK;
/* write the end of the chain until it blocks or gets empty */
@ -300,7 +310,7 @@ static CURLcode cw_out_flush_chain(struct cw_out_ctx *ctx,
return result;
if(*plast) {
/* could not write last, paused again? */
DEBUGASSERT(data->req.keepon & KEEP_RECV_PAUSE);
DEBUGASSERT(ctx->paused);
return CURLE_OK;
}
}
@ -342,14 +352,14 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
bool flush_all,
const char *buf, size_t blen)
{
CURLcode result;
CURLcode result = CURLE_OK;
/* if we have buffered data and it is a different type than what
* we are writing now, try to flush all */
if(ctx->buf && ctx->buf->type != otype) {
result = cw_out_flush_chain(ctx, data, &ctx->buf, TRUE);
if(result)
return result;
goto out;
}
if(ctx->buf) {
@ -359,7 +369,7 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
return result;
result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
if(result)
return result;
goto out;
}
else {
/* nothing buffered, try direct write */
@ -372,10 +382,18 @@ static CURLcode cw_out_do_write(struct cw_out_ctx *ctx,
/* did not write all, append the rest */
result = cw_out_append(ctx, otype, buf + consumed, blen - consumed);
if(result)
return result;
goto out;
}
}
return CURLE_OK;
out:
if(result) {
/* We do not want to invoked client callbacks a second time after
* encountering an error. See issue #13337 */
ctx->errored = TRUE;
cw_out_bufs_free(ctx);
}
return result;
}
static CURLcode cw_out_write(struct Curl_easy *data,
@ -413,10 +431,12 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data)
return FALSE;
ctx = (struct cw_out_ctx *)cw_out;
return cw_out_bufs_len(ctx) > 0;
CURL_TRC_WRITE(data, "cw-out is%spaused", ctx->paused? "" : " not");
return ctx->paused;
}
static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all)
static CURLcode cw_out_flush(struct Curl_easy *data,
bool unpause, bool flush_all)
{
struct Curl_cwriter *cw_out;
CURLcode result = CURLE_OK;
@ -424,18 +444,31 @@ static CURLcode cw_out_flush(struct Curl_easy *data, bool flush_all)
cw_out = Curl_cwriter_get_by_type(data, &Curl_cwt_out);
if(cw_out) {
struct cw_out_ctx *ctx = (struct cw_out_ctx *)cw_out;
if(ctx->errored)
return CURLE_WRITE_ERROR;
if(unpause && ctx->paused)
ctx->paused = FALSE;
if(ctx->paused)
return CURLE_OK; /* not doing it */
result = cw_out_flush_chain(ctx, data, &ctx->buf, flush_all);
if(result) {
ctx->errored = TRUE;
cw_out_bufs_free(ctx);
return result;
}
}
return result;
}
CURLcode Curl_cw_out_flush(struct Curl_easy *data)
CURLcode Curl_cw_out_unpause(struct Curl_easy *data)
{
return cw_out_flush(data, FALSE);
CURL_TRC_WRITE(data, "cw-out unpause");
return cw_out_flush(data, TRUE, FALSE);
}
CURLcode Curl_cw_out_done(struct Curl_easy *data)
{
return cw_out_flush(data, TRUE);
CURL_TRC_WRITE(data, "cw-out done");
return cw_out_flush(data, FALSE, TRUE);
}

View File

@ -43,7 +43,7 @@ bool Curl_cw_out_is_paused(struct Curl_easy *data);
/**
* Flush any buffered date to the client, chunk collation still applies.
*/
CURLcode Curl_cw_out_flush(struct Curl_easy *data);
CURLcode Curl_cw_out_unpause(struct Curl_easy *data);
/**
* Mark EndOfStream reached and flush ALL data to the client.

View File

@ -58,7 +58,6 @@
#include "multiif.h"
#include "select.h"
#include "cfilters.h"
#include "cw-out.h"
#include "sendf.h" /* for failf function prototype */
#include "connect.h" /* for Curl_getconnectinfo */
#include "slist.h"
@ -1086,6 +1085,7 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action)
int oldstate;
int newstate;
bool recursive = FALSE;
bool keep_changed, unpause_read, not_all_paused;
if(!GOOD_EASY_HANDLE(data) || !data->conn)
/* crazy input, don't continue */
@ -1101,51 +1101,47 @@ CURLcode curl_easy_pause(struct Curl_easy *data, int action)
((action & CURLPAUSE_RECV)?KEEP_RECV_PAUSE:0) |
((action & CURLPAUSE_SEND)?KEEP_SEND_PAUSE:0);
if((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) == oldstate) {
/* Not changing any pause state, return */
DEBUGF(infof(data, "pause: no change, early return"));
return CURLE_OK;
}
keep_changed = ((newstate & (KEEP_RECV_PAUSE| KEEP_SEND_PAUSE)) != oldstate);
not_all_paused = (newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
(KEEP_RECV_PAUSE|KEEP_SEND_PAUSE);
unpause_read = ((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
(data->mstate == MSTATE_PERFORMING ||
data->mstate == MSTATE_RATELIMITING));
/* Unpausing writes is detected on the next run in
* transfer.c:Curl_readwrite(). This is because this may result
* in a transfer error if the application's callbacks fail */
/* Unpause parts in active mime tree. */
if((k->keepon & ~newstate & KEEP_SEND_PAUSE) &&
(data->mstate == MSTATE_PERFORMING ||
data->mstate == MSTATE_RATELIMITING)) {
result = Curl_creader_unpause(data);
if(result)
return result;
}
/* put it back in the keepon */
/* Set the new keepon state, so it takes effect no matter what error
* may happen afterwards. */
k->keepon = newstate;
if(!(newstate & KEEP_RECV_PAUSE)) {
Curl_conn_ev_data_pause(data, FALSE);
result = Curl_cw_out_flush(data);
if(result)
return result;
}
/* if there's no error and we're not pausing both directions, we want
to have this handle checked soon */
if((newstate & (KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) !=
(KEEP_RECV_PAUSE|KEEP_SEND_PAUSE)) {
Curl_expire(data, 0, EXPIRE_RUN_NOW); /* get this handle going again */
/* If not completely pausing both directions now, run again in any case. */
if(not_all_paused) {
Curl_expire(data, 0, EXPIRE_RUN_NOW);
/* reset the too-slow time keeper */
data->state.keeps_speed.tv_sec = 0;
if(!Curl_cw_out_is_paused(data))
/* if not pausing again, force a recv/send check of this connection as
the data might've been read off the socket already */
data->state.select_bits = CURL_CSELECT_IN | CURL_CSELECT_OUT;
if(data->multi) {
if(Curl_update_timer(data->multi))
return CURLE_ABORTED_BY_CALLBACK;
/* Simulate socket events on next run for unpaused directions */
if(!(newstate & KEEP_SEND_PAUSE))
data->state.select_bits |= CURL_CSELECT_OUT;
if(!(newstate & KEEP_RECV_PAUSE))
data->state.select_bits |= CURL_CSELECT_IN;
/* On changes, tell application to update its timers. */
if(keep_changed && data->multi) {
if(Curl_update_timer(data->multi)) {
result = CURLE_ABORTED_BY_CALLBACK;
goto out;
}
}
}
if(!data->state.done)
if(unpause_read) {
result = Curl_creader_unpause(data);
if(result)
goto out;
}
out:
if(!result && !data->state.done && keep_changed)
/* This transfer may have been moved in or out of the bundle, update the
corresponding socket callback, if used */
result = Curl_updatesocket(data);

View File

@ -2521,7 +2521,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi,
Curl_posttransfer(data);
multi_done(data, result, TRUE);
}
else if(data->req.done) {
else if(data->req.done && !Curl_cwriter_is_paused(data)) {
/* call this even if the readwrite function returned error */
Curl_posttransfer(data);

View File

@ -506,6 +506,16 @@ void Curl_cwriter_remove_by_name(struct Curl_easy *data,
}
}
bool Curl_cwriter_is_paused(struct Curl_easy *data)
{
return Curl_cw_out_is_paused(data);
}
CURLcode Curl_cwriter_unpause(struct Curl_easy *data)
{
return Curl_cw_out_unpause(data);
}
CURLcode Curl_creader_read(struct Curl_easy *data,
struct Curl_creader *reader,
char *buf, size_t blen, size_t *nread, bool *eos)

View File

@ -180,6 +180,16 @@ CURLcode Curl_cwriter_write(struct Curl_easy *data,
struct Curl_cwriter *writer, int type,
const char *buf, size_t nbytes);
/**
* Return TRUE iff client writer is paused.
*/
bool Curl_cwriter_is_paused(struct Curl_easy *data);
/**
* Unpause client writer and flush any buffered date to the client.
*/
CURLcode Curl_cwriter_unpause(struct Curl_easy *data);
/**
* Default implementations for do_init, do_write, do_close that
* do nothing and pass the data through.

View File

@ -272,7 +272,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
DEBUGF(infof(data, "nread == 0, stream closed, bailing"));
else
DEBUGF(infof(data, "nread <= 0, server closed connection, bailing"));
k->keepon = 0; /* stop sending as well */
k->keepon &= ~(KEEP_RECV|KEEP_SEND); /* stop sending as well */
if(k->eos_written) /* already did write this to client, leave */
break;
}
@ -409,6 +409,14 @@ CURLcode Curl_readwrite(struct Curl_easy *data)
int didwhat = 0;
int select_bits;
/* Check if client writes had been paused and can resume now. */
if(!(k->keepon & KEEP_RECV_PAUSE) && Curl_cwriter_is_paused(data)) {
Curl_conn_ev_data_pause(data, FALSE);
result = Curl_cwriter_unpause(data);
if(result)
goto out;
}
if(data->state.select_bits) {
if(select_bits_paused(data, data->state.select_bits)) {
/* leave the bits unchanged, so they'll tell us what to do when

View File

@ -27,6 +27,7 @@
*/
/* This is based on the poc client of issue #11982
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <sys/time.h>
@ -141,11 +142,24 @@ static int err(void)
exit(2);
}
static void usage(const char *msg)
{
if(msg)
fprintf(stderr, "%s\n", msg);
fprintf(stderr,
"usage: [options] url\n"
" pause downloads with following options:\n"
" -V http_version (http/1.1, h2, h3) http version to use\n"
);
}
struct handle
{
int idx;
int paused;
int resumed;
int errored;
int fail_write;
CURL *h;
};
@ -165,8 +179,15 @@ static size_t cb(void *data, size_t size, size_t nmemb, void *clientp)
++handle->paused;
fprintf(stderr, "INFO: [%d] write, PAUSING %d time on %lu bytes\n",
handle->idx, handle->paused, (long)realsize);
assert(handle->paused == 1);
return CURL_WRITEFUNC_PAUSE;
}
if(handle->fail_write) {
++handle->errored;
fprintf(stderr, "INFO: [%d] FAIL write of %lu bytes, %d time\n",
handle->idx, (long)realsize, handle->errored);
return CURL_WRITEFUNC_ERROR;
}
fprintf(stderr, "INFO: [%d] write, accepting %lu bytes\n",
handle->idx, (long)realsize);
return realsize;
@ -186,15 +207,43 @@ int main(int argc, char *argv[])
char *url, *host = NULL, *port = NULL;
int all_paused = 0;
int resume_round = -1;
int http_version = CURL_HTTP_VERSION_2_0;
int ch;
if(argc != 2) {
while((ch = getopt(argc, argv, "hV:")) != -1) {
switch(ch) {
case 'h':
usage(NULL);
return 2;
case 'V': {
if(!strcmp("http/1.1", optarg))
http_version = CURL_HTTP_VERSION_1_1;
else if(!strcmp("h2", optarg))
http_version = CURL_HTTP_VERSION_2_0;
else if(!strcmp("h3", optarg))
http_version = CURL_HTTP_VERSION_3ONLY;
else {
usage("invalid http version");
return 1;
}
break;
}
default:
usage("invalid option");
return 1;
}
}
argc -= optind;
argv += optind;
if(argc != 1) {
fprintf(stderr, "ERROR: need URL as argument\n");
return 2;
}
url = argv[1];
url = argv[0];
curl_global_init(CURL_GLOBAL_DEFAULT);
curl_global_trace("ids,time,http/2");
curl_global_trace("ids,time,http/2,http/3");
cu = curl_url();
if(!cu) {
@ -222,6 +271,8 @@ int main(int argc, char *argv[])
handles[i].idx = i;
handles[i].paused = 0;
handles[i].resumed = 0;
handles[i].errored = 0;
handles[i].fail_write = 1;
handles[i].h = curl_easy_init();
if(!handles[i].h ||
curl_easy_setopt(handles[i].h, CURLOPT_WRITEFUNCTION, cb) != CURLE_OK ||
@ -233,9 +284,11 @@ int main(int argc, char *argv[])
!= CURLE_OK ||
curl_easy_setopt(handles[i].h, CURLOPT_SSL_VERIFYPEER, 0L) != CURLE_OK ||
curl_easy_setopt(handles[i].h, CURLOPT_RESOLVE, resolve) != CURLE_OK ||
curl_easy_setopt(handles[i].h, CURLOPT_PIPEWAIT, 1L) ||
curl_easy_setopt(handles[i].h, CURLOPT_URL, url) != CURLE_OK) {
err();
}
curl_easy_setopt(handles[i].h, CURLOPT_HTTP_VERSION, (long)http_version);
}
multi_handle = curl_multi_init();
@ -269,6 +322,11 @@ int main(int argc, char *argv[])
fprintf(stderr, "ERROR: [%d] NOT resumed!\n", i);
as_expected = 0;
}
else if(handles[i].errored != 1) {
fprintf(stderr, "ERROR: [%d] NOT errored once, %d instead!\n",
i, handles[i].errored);
as_expected = 0;
}
}
if(!as_expected) {
fprintf(stderr, "ERROR: handles not in expected state "
@ -308,7 +366,7 @@ int main(int argc, char *argv[])
if(all_paused) {
fprintf(stderr, "INFO: all transfers paused\n");
/* give transfer some rounds to mess things up */
resume_round = rounds + 3;
resume_round = rounds + 2;
}
}
if(resume_round > 0 && rounds == resume_round) {

View File

@ -445,12 +445,30 @@ class TestDownload:
r.check_exit_code(0)
# test on paused transfers, based on issue #11982
def test_02_27_paused_no_cl(self, env: Env, httpd, nghttpx, repeat):
proto = 'h2'
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_27a_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
url = f'https://{env.authority_for(env.domain1, proto)}' \
'/tweak?&chunks=2&chunk_size=16000'
'/curltest/tweak/?&chunks=6&chunk_size=8000'
client = LocalClient(env=env, name='h2-pausing')
r = client.run(args=[url])
r = client.run(args=['-V', proto, url])
r.check_exit_code(0)
# test on paused transfers, based on issue #11982
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_27b_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
url = f'https://{env.authority_for(env.domain1, proto)}' \
'/curltest/tweak/?error=502'
client = LocalClient(env=env, name='h2-pausing')
r = client.run(args=['-V', proto, url])
r.check_exit_code(0)
# test on paused transfers, based on issue #11982
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])
def test_02_27c_paused_no_cl(self, env: Env, httpd, nghttpx, proto, repeat):
url = f'https://{env.authority_for(env.domain1, proto)}' \
'/curltest/tweak/?status=200&chunks=1&chunk_size=100'
client = LocalClient(env=env, name='h2-pausing')
r = client.run(args=['-V', proto, url])
r.check_exit_code(0)
@pytest.mark.parametrize("proto", ['http/1.1', 'h2', 'h3'])