websocket: introduce blocking sends

When using `curl_ws_send()`, perform a blocking send of the data under
the following conditions:

- the websocket is in raw mode and the call is done from within a curl
  callback. A partial write of the data could subsequently mess up the
  ws framing, as a callback has a hard time handling this.

- the websocket is encoding the data itself, has added it to its
  internal sendbuf. A partial flush of the buffer has unclear semantics
  for the caller, as they will have no idea what to send again.

Fixes WebSockets tests with CURL_DBG_SOCK_WBLOCK=90 set.
Closes #14458
This commit is contained in:
Stefan Eissing 2024-08-08 16:00:24 +02:00 committed by Daniel Stenberg
parent 0a5ea09a91
commit 3e64569a9e
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
8 changed files with 299 additions and 60 deletions

View File

@ -479,6 +479,8 @@
- curl_multi_remove_handle for any of the above. See section 2.3.
- Calling curl_ws_send() from a callback
2.2 Better support for same name resolves
If a name resolve has been initiated for name NN and a second easy handle

View File

@ -109,6 +109,10 @@ Traces reading of upload data from the application in order to send it to the se
Traces writing of download data, received from the server, to the application.
## `ws`
Tracing of WebSocket operations when this protocol is enabled in your build.
# TRACE GROUPS
Besides the specific component names there are the following group names

View File

@ -53,6 +53,10 @@ If **CURLWS_RAW_MODE** is enabled in CURLOPT_WS_OPTIONS(3), the
To send a message consisting of multiple frames, set the *CURLWS_CONT* bit
in all frames except the final one.
Warning: while it is possible to invoke this function from a callback,
such a call is blocking in this situation, e.g. only returns after all data
has been sent or an error is encountered.
# FLAGS
## CURLWS_TEXT

View File

@ -91,6 +91,23 @@ static size_t chunk_read(struct buf_chunk *chunk,
}
}
static size_t chunk_unwrite(struct buf_chunk *chunk, size_t len)
{
size_t n = chunk->w_offset - chunk->r_offset;
DEBUGASSERT(chunk->w_offset >= chunk->r_offset);
if(!n) {
return 0;
}
else if(n <= len) {
chunk->r_offset = chunk->w_offset = 0;
return n;
}
else {
chunk->w_offset -= len;
return len;
}
}
static ssize_t chunk_slurpn(struct buf_chunk *chunk, size_t max_len,
Curl_bufq_reader *reader,
void *reader_ctx, CURLcode *err)
@ -363,6 +380,49 @@ static void prune_head(struct bufq *q)
}
}
static struct buf_chunk *chunk_prev(struct buf_chunk *head,
struct buf_chunk *chunk)
{
while(head) {
if(head == chunk)
return NULL;
if(head->next == chunk)
return head;
head = head->next;
}
return NULL;
}
static void prune_tail(struct bufq *q)
{
struct buf_chunk *chunk;
while(q->tail && chunk_is_empty(q->tail)) {
chunk = q->tail;
q->tail = chunk_prev(q->head, chunk);
if(q->tail)
q->tail->next = NULL;
if(q->head == chunk)
q->head = q->tail;
if(q->pool) {
bufcp_put(q->pool, chunk);
--q->chunk_count;
}
else if((q->chunk_count > q->max_chunks) ||
(q->opts & BUFQ_OPT_NO_SPARES)) {
/* SOFT_LIMIT allowed us more than max. free spares until
* we are at max again. Or free them if we are configured
* to not use spares. */
free(chunk);
--q->chunk_count;
}
else {
chunk->next = q->spare;
q->spare = chunk;
}
}
}
static struct buf_chunk *get_non_full_tail(struct bufq *q)
{
struct buf_chunk *chunk;
@ -428,6 +488,15 @@ CURLcode Curl_bufq_cwrite(struct bufq *q,
return result;
}
CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len)
{
while(len && q->tail) {
len -= chunk_unwrite(q->head, len);
prune_tail(q);
}
return len? CURLE_AGAIN : CURLE_OK;
}
ssize_t Curl_bufq_read(struct bufq *q, unsigned char *buf, size_t len,
CURLcode *err)
{

View File

@ -182,6 +182,12 @@ CURLcode Curl_bufq_cwrite(struct bufq *q,
const char *buf, size_t len,
size_t *pnwritten);
/**
* Remove `len` bytes from the end of the buffer queue again.
* Returns CURLE_AGAIN if less than `len` bytes were in the queue.
*/
CURLcode Curl_bufq_unwrite(struct bufq *q, size_t len);
/**
* Read buf from the start of the buffer queue. The buf is copied
* and the amount of copied bytes is returned.

View File

@ -221,6 +221,24 @@ void Curl_trc_ftp(struct Curl_easy *data, const char *fmt, ...)
}
#endif /* !CURL_DISABLE_FTP */
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
struct curl_trc_feat Curl_trc_feat_ws = {
"WS",
CURL_LOG_LVL_NONE,
};
void Curl_trc_ws(struct Curl_easy *data, const char *fmt, ...)
{
DEBUGASSERT(!strchr(fmt, '\n'));
if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) {
va_list ap;
va_start(ap, fmt);
trc_infof(data, &Curl_trc_feat_ws, fmt, ap);
va_end(ap);
}
}
#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */
#define TRC_CT_NONE (0)
#define TRC_CT_PROTOCOL (1<<(0))
#define TRC_CT_NETWORK (1<<(1))
@ -240,6 +258,9 @@ static struct trc_feat_def trc_feats[] = {
#ifndef CURL_DISABLE_DOH
{ &Curl_doh_trc, TRC_CT_NETWORK },
#endif
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
{ &Curl_trc_feat_ws, TRC_CT_PROTOCOL },
#endif
};
struct trc_cft_def {

View File

@ -89,6 +89,11 @@ void Curl_failf(struct Curl_easy *data,
do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ftp)) \
Curl_trc_ftp(data, __VA_ARGS__); } while(0)
#endif /* !CURL_DISABLE_FTP */
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
#define CURL_TRC_WS(data, ...) \
do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) \
Curl_trc_ws(data, __VA_ARGS__); } while(0)
#endif /* USE_WEBSOCKETS && !CURL_DISABLE_HTTP */
#else /* CURL_HAVE_C99 */
@ -100,6 +105,9 @@ void Curl_failf(struct Curl_easy *data,
#ifndef CURL_DISABLE_FTP
#define CURL_TRC_FTP Curl_trc_ftp
#endif
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
#define CURL_TRC_WS Curl_trc_ws
#endif
#endif /* !CURL_HAVE_C99 */
@ -148,6 +156,11 @@ extern struct curl_trc_feat Curl_trc_feat_ftp;
void Curl_trc_ftp(struct Curl_easy *data,
const char *fmt, ...) CURL_PRINTF(2, 3);
#endif
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
extern struct curl_trc_feat Curl_trc_feat_ws;
void Curl_trc_ws(struct Curl_easy *data,
const char *fmt, ...) CURL_PRINTF(2, 3);
#endif
#else /* defined(CURL_DISABLE_VERBOSE_STRINGS) */

240
lib/ws.c
View File

@ -37,6 +37,7 @@
#include "ws.h"
#include "easyif.h"
#include "transfer.h"
#include "select.h"
#include "nonblock.h"
/* The last 3 #include files should be in this order */
@ -136,6 +137,9 @@ static void ws_dec_info(struct ws_decoder *dec, struct Curl_easy *data,
}
}
static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
const char *buffer, size_t buflen);
typedef ssize_t ws_write_payload(const unsigned char *buf, size_t buflen,
int frame_age, int frame_flags,
curl_off_t payload_offset,
@ -773,7 +777,7 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
}
}
#endif
DEBUGF(infof(data, "WS, using chunk size %zu", chunk_size));
CURL_TRC_WS(data, "WS, using chunk size %zu", chunk_size);
Curl_bufq_init2(&ws->recvbuf, chunk_size, WS_CHUNK_COUNT,
BUFQ_OPT_SOFT_LIMIT);
Curl_bufq_init2(&ws->sendbuf, chunk_size, WS_CHUNK_COUNT,
@ -970,8 +974,8 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
infof(data, "connection expectedly closed?");
return CURLE_GOT_NOTHING;
}
DEBUGF(infof(data, "curl_ws_recv, added %zu bytes from network",
Curl_bufq_len(&ws->recvbuf)));
CURL_TRC_WS(data, "curl_ws_recv, added %zu bytes from network",
Curl_bufq_len(&ws->recvbuf));
}
result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
@ -1001,14 +1005,14 @@ CURL_EXTERN CURLcode curl_ws_recv(struct Curl_easy *data, void *buffer,
ctx.payload_len, ctx.bufidx);
*metap = &ws->frame;
*nread = ws->frame.len;
/* infof(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
buflen, *nread, ws->frame.offset, ws->frame.bytesleft); */
CURL_TRC_WS(data, "curl_ws_recv(len=%zu) -> %zu bytes (frame at %"
CURL_FORMAT_CURL_OFF_T ", %" CURL_FORMAT_CURL_OFF_T " left)",
buflen, *nread, ws->frame.offset, ws->frame.bytesleft);
return CURLE_OK;
}
static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
bool complete)
bool blocking)
{
if(!Curl_bufq_is_empty(&ws->sendbuf)) {
CURLcode result;
@ -1016,7 +1020,11 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
size_t outlen, n;
while(Curl_bufq_peek(&ws->sendbuf, &out, &outlen)) {
if(data->set.connect_only)
if(blocking) {
result = ws_send_raw_blocking(data, ws, (char *)out, outlen);
n = result? 0 : outlen;
}
else if(data->set.connect_only || Curl_is_in_callback(data))
result = Curl_senddata(data, out, outlen, &n);
else {
result = Curl_xfer_send(data, out, outlen, FALSE, &n);
@ -1024,22 +1032,14 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
result = CURLE_AGAIN;
}
if(result) {
if(result == CURLE_AGAIN) {
if(!complete) {
infof(data, "WS: flush EAGAIN, %zu bytes remain in buffer",
Curl_bufq_len(&ws->sendbuf));
return result;
}
/* TODO: the current design does not allow for buffered writes.
* We need to flush the buffer now. There is no ws_flush() later */
n = 0;
continue;
}
else if(result) {
failf(data, "WS: flush, write error %d", result);
return result;
}
if(result == CURLE_AGAIN) {
CURL_TRC_WS(data, "flush EAGAIN, %zu bytes remain in buffer",
Curl_bufq_len(&ws->sendbuf));
return result;
}
else if(result) {
failf(data, "WS: flush, write error %d", result);
return result;
}
else {
infof(data, "WS: flushed %zu bytes", n);
@ -1050,6 +1050,83 @@ static CURLcode ws_flush(struct Curl_easy *data, struct websocket *ws,
return CURLE_OK;
}
static CURLcode ws_send_raw_blocking(CURL *data, struct websocket *ws,
const char *buffer, size_t buflen)
{
CURLcode result = CURLE_OK;
size_t nwritten;
(void)ws;
while(buflen) {
result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
if(result)
return result;
DEBUGASSERT(nwritten <= buflen);
buffer += nwritten;
buflen -= nwritten;
if(buflen) {
curl_socket_t sock = data->conn->sock[FIRSTSOCKET];
timediff_t left_ms;
int ev;
CURL_TRC_WS(data, "ws_send_raw_blocking() partial, %zu left to send",
buflen);
left_ms = Curl_timeleft(data, NULL, FALSE);
if(left_ms < 0) {
failf(data, "Timeout waiting for socket becoming writable");
return CURLE_SEND_ERROR;
}
/* POLLOUT socket */
if(sock == CURL_SOCKET_BAD)
return CURLE_SEND_ERROR;
ev = Curl_socket_check(CURL_SOCKET_BAD, CURL_SOCKET_BAD, sock,
left_ms? left_ms : 500);
if(ev < 0) {
failf(data, "Error while waiting for socket becoming writable");
return CURLE_SEND_ERROR;
}
}
}
return result;
}
static CURLcode ws_send_raw(CURL *data, const void *buffer,
size_t buflen, size_t *pnwritten)
{
struct websocket *ws = data->conn->proto.ws;
CURLcode result;
if(!ws) {
failf(data, "Not a websocket transfer");
return CURLE_SEND_ERROR;
}
if(!buflen)
return CURLE_OK;
if(Curl_is_in_callback(data)) {
/* When invoked from inside callbacks, we do a blocking send as the
* callback will probably not implement partial writes that may then
* mess up the ws framing subsequently.
* We need any pending data to be flushed before sending. */
result = ws_flush(data, ws, TRUE);
if(result)
return result;
result = ws_send_raw_blocking(data, ws, buffer, buflen);
}
else {
/* We need any pending data to be sent or EAGAIN this call. */
result = ws_flush(data, ws, FALSE);
if(result)
return result;
result = Curl_senddata(data, buffer, buflen, pnwritten);
}
CURL_TRC_WS(data, "ws_send_raw(len=%zu) -> %d, %zu",
buflen, result, *pnwritten);
return result;
}
CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
size_t buflen, size_t *sent,
curl_off_t fragsize,
@ -1057,60 +1134,53 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
{
struct websocket *ws;
ssize_t n;
size_t nwritten, space;
size_t space, payload_added;
CURLcode result;
CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
", flags=%x), raw=%d",
buflen, fragsize, flags, data->set.ws_raw_mode);
*sent = 0;
if(!data->conn && data->set.connect_only) {
result = Curl_connect_only_attach(data);
if(result)
return result;
goto out;
}
if(!data->conn) {
failf(data, "No associated connection");
return CURLE_SEND_ERROR;
result = CURLE_SEND_ERROR;
goto out;
}
if(!data->conn->proto.ws) {
failf(data, "Not a websocket transfer");
return CURLE_SEND_ERROR;
result = CURLE_SEND_ERROR;
goto out;
}
ws = data->conn->proto.ws;
/* try flushing any content still waiting to be sent. */
result = ws_flush(data, ws, FALSE);
if(result)
goto out;
if(data->set.ws_raw_mode) {
/* In raw mode, we write directly to the connection */
if(fragsize || flags) {
DEBUGF(infof(data, "ws_send: "
"fragsize and flags cannot be non-zero in raw mode"));
failf(data, "ws_send, raw mode: fragsize and flags cannot be non-zero");
return CURLE_BAD_FUNCTION_ARGUMENT;
}
if(!buflen)
/* nothing to do */
return CURLE_OK;
/* raw mode sends exactly what was requested, and this is from within
the write callback */
if(Curl_is_in_callback(data)) {
result = Curl_xfer_send(data, buffer, buflen, FALSE, &nwritten);
}
else
result = Curl_senddata(data, buffer, buflen, &nwritten);
infof(data, "WS: wanted to send %zu bytes, sent %zu bytes",
buflen, nwritten);
*sent = nwritten;
return result;
result = ws_send_raw(data, buffer, buflen, sent);
goto out;
}
/* Not RAW mode, buf we do the frame encoding */
result = ws_flush(data, ws, FALSE);
if(result)
return result;
/* TODO: the current design does not allow partial writes, afaict.
* It is not clear how the application is supposed to react. */
space = Curl_bufq_space(&ws->sendbuf);
DEBUGF(infof(data, "curl_ws_send(len=%zu), sendbuf len=%zu space %zu",
buflen, Curl_bufq_len(&ws->sendbuf), space));
if(space < 14)
return CURLE_AGAIN;
CURL_TRC_WS(data, "curl_ws_send(len=%zu), sendbuf=%zu space_left=%zu",
buflen, Curl_bufq_len(&ws->sendbuf), space);
if(space < 14) {
result = CURLE_AGAIN;
goto out;
}
if(flags & CURLWS_OFFSET) {
if(fragsize) {
@ -1118,7 +1188,7 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
n = ws_enc_write_head(data, &ws->enc, flags, fragsize,
&ws->sendbuf, &result);
if(n < 0)
return result;
goto out;
}
else {
if((curl_off_t)buflen > ws->enc.payload_remain) {
@ -1132,16 +1202,66 @@ CURL_EXTERN CURLcode curl_ws_send(CURL *data, const void *buffer,
n = ws_enc_write_head(data, &ws->enc, flags, (curl_off_t)buflen,
&ws->sendbuf, &result);
if(n < 0)
return result;
goto out;
}
n = ws_enc_write_payload(&ws->enc, data,
buffer, buflen, &ws->sendbuf, &result);
if(n < 0)
return result;
goto out;
payload_added = (size_t)n;
*sent = (size_t)n;
return ws_flush(data, ws, TRUE);
while(!result && (buflen || !Curl_bufq_is_empty(&ws->sendbuf))) {
/* flush, blocking when in callback */
result = ws_flush(data, ws, Curl_is_in_callback(data));
if(!result) {
DEBUGASSERT(payload_added <= buflen);
/* all buffered data sent. Try sending the rest if there is any. */
*sent += payload_added;
buffer = (const char *)buffer + payload_added;
buflen -= payload_added;
payload_added = 0;
if(buflen) {
n = ws_enc_write_payload(&ws->enc, data,
buffer, buflen, &ws->sendbuf, &result);
if(n < 0)
goto out;
payload_added = Curl_bufq_len(&ws->sendbuf);
}
}
else if(result == CURLE_AGAIN) {
/* partially sent. how much of the call data has been part of it? what
* should we report to out caller so it can retry/send the rest? */
if(payload_added < buflen) {
/* We did not add everything the caller wanted. Return just
* the partial write to our buffer. */
*sent = payload_added;
result = CURLE_OK;
goto out;
}
else if(!buflen) {
/* We have no payload to report a partial write. EAGAIN would make
* the caller repeat this and add the frame again.
* Flush blocking seems the only way out of this. */
*sent = (size_t)n;
result = ws_flush(data, ws, TRUE);
goto out;
}
/* We added the complete data to our sendbuf. Report one byte less as
* sent. This parital success should make the caller invoke us again
* with the last byte. */
*sent = payload_added - 1;
result = Curl_bufq_unwrite(&ws->sendbuf, 1);
if(!result)
result = CURLE_AGAIN;
}
}
out:
CURL_TRC_WS(data, "curl_ws_send(len=%zu, fragsize=%" CURL_FORMAT_CURL_OFF_T
", flags=%x, raw=%d) -> %d, %zu",
buflen, fragsize, flags, data->set.ws_raw_mode, result, *sent);
return result;
}
static void ws_free(struct connectdata *conn)