websockets: refactor decode chain

- use client writer stack for decoding frames
- move websocket protocol handler to ws.c

Closes #12713
This commit is contained in:
Stefan Eissing 2024-01-16 12:06:57 +01:00 committed by Daniel Stenberg
parent 49ca84144e
commit 3378d2bd09
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
5 changed files with 249 additions and 217 deletions

View File

@ -100,29 +100,14 @@
* Forward declarations.
*/
static int http_getsock_do(struct Curl_easy *data,
struct connectdata *conn,
curl_socket_t *socks);
static bool http_should_fail(struct Curl_easy *data);
static CURLcode http_setup_conn(struct Curl_easy *data,
struct connectdata *conn);
static CURLcode http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos,
bool *done);
#ifdef USE_WEBSOCKETS
static CURLcode ws_setup_conn(struct Curl_easy *data,
struct connectdata *conn);
#endif
/*
* HTTP handler interface.
*/
const struct Curl_handler Curl_handler_http = {
"HTTP", /* scheme */
http_setup_conn, /* setup_connection */
Curl_http_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
@ -130,11 +115,11 @@ const struct Curl_handler Curl_handler_http = {
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_getsock */
http_getsock_do, /* doing_getsock */
Curl_http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ZERO_NULL, /* disconnect */
http_write_resp, /* write_resp */
Curl_http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTP, /* defport */
@ -144,39 +129,13 @@ const struct Curl_handler Curl_handler_http = {
PROTOPT_USERPWDCTRL
};
#ifdef USE_WEBSOCKETS
const struct Curl_handler Curl_handler_ws = {
"WS", /* scheme */
ws_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
Curl_http_connect, /* connect_it */
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_getsock */
http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
Curl_ws_disconnect, /* disconnect */
http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTP, /* defport */
CURLPROTO_WS, /* protocol */
CURLPROTO_HTTP, /* family */
PROTOPT_CREDSPERREQUEST | /* flags */
PROTOPT_USERPWDCTRL
};
#endif
#ifdef USE_SSL
/*
* HTTPS handler interface.
*/
const struct Curl_handler Curl_handler_https = {
"HTTPS", /* scheme */
http_setup_conn, /* setup_connection */
Curl_http_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
@ -184,11 +143,11 @@ const struct Curl_handler Curl_handler_https = {
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_getsock */
http_getsock_do, /* doing_getsock */
Curl_http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ZERO_NULL, /* disconnect */
http_write_resp, /* write_resp */
Curl_http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTPS, /* defport */
@ -198,36 +157,10 @@ const struct Curl_handler Curl_handler_https = {
PROTOPT_USERPWDCTRL
};
#ifdef USE_WEBSOCKETS
const struct Curl_handler Curl_handler_wss = {
"WSS", /* scheme */
ws_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
Curl_http_connect, /* connect_it */
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_getsock */
http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
Curl_ws_disconnect, /* disconnect */
http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTPS, /* defport */
CURLPROTO_WSS, /* protocol */
CURLPROTO_HTTP, /* family */
PROTOPT_SSL | PROTOPT_CREDSPERREQUEST | /* flags */
PROTOPT_USERPWDCTRL
};
#endif
#endif
static CURLcode http_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
CURLcode Curl_http_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
/* allocate the HTTP-specific struct for the Curl_easy, only to survive
during this request */
@ -250,16 +183,6 @@ static CURLcode http_setup_conn(struct Curl_easy *data,
return CURLE_OK;
}
#ifdef USE_WEBSOCKETS
static CURLcode ws_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
/* websockets is 1.1 only (for now) */
data->state.httpwant = CURL_HTTP_VERSION_1_1;
return http_setup_conn(data, conn);
}
#endif
#ifndef CURL_DISABLE_PROXY
/*
* checkProxyHeaders() checks the linked list of custom proxy headers
@ -1594,9 +1517,9 @@ CURLcode Curl_http_connect(struct Curl_easy *data, bool *done)
/* this returns the socket to wait for in the DO and DOING state for the multi
interface and then we're always _sending_ a request and thus we wait for
the single socket to become writable only */
static int http_getsock_do(struct Curl_easy *data,
struct connectdata *conn,
curl_socket_t *socks)
int Curl_http_getsock_do(struct Curl_easy *data,
struct connectdata *conn,
curl_socket_t *socks)
{
/* write mode */
(void)conn;
@ -4133,11 +4056,10 @@ static CURLcode http_rw_headers(struct Curl_easy *data,
if(result)
return result;
k->header = FALSE; /* no more header to parse! */
if(data->set.connect_only) {
*pconsumed += blen; /* ws accept handled the data */
blen = 0;
if(data->set.connect_only)
k->keepon &= ~KEEP_RECV; /* read no more content */
*pconsumed += blen;
blen = 0;
}
}
#endif
else {
@ -4611,10 +4533,10 @@ CURLcode Curl_http_write_resp_hds(struct Curl_easy *data,
}
}
static CURLcode http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos,
bool *done)
CURLcode Curl_http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos,
bool *done)
{
CURLcode result;
size_t consumed;

View File

@ -54,14 +54,6 @@ extern const struct Curl_handler Curl_handler_http;
extern const struct Curl_handler Curl_handler_https;
#endif
#ifdef USE_WEBSOCKETS
extern const struct Curl_handler Curl_handler_ws;
#ifdef USE_SSL
extern const struct Curl_handler Curl_handler_wss;
#endif
#endif /* websockets */
struct dynhds;
CURLcode Curl_bump_headersize(struct Curl_easy *data,
@ -147,9 +139,17 @@ CURLcode Curl_http_firstwrite(struct Curl_easy *data,
bool *done);
/* protocol-specific functions set up to be called by the main engine */
CURLcode Curl_http_setup_conn(struct Curl_easy *data,
struct connectdata *conn);
CURLcode Curl_http(struct Curl_easy *data, bool *done);
CURLcode Curl_http_done(struct Curl_easy *data, CURLcode, bool premature);
CURLcode Curl_http_connect(struct Curl_easy *data, bool *done);
int Curl_http_getsock_do(struct Curl_easy *data, struct connectdata *conn,
curl_socket_t *socks);
CURLcode Curl_http_write_resp(struct Curl_easy *data,
const char *buf, size_t blen,
bool is_eos,
bool *done);
/* These functions are in http.c */
CURLcode Curl_http_input_auth(struct Curl_easy *data, bool proxy,

View File

@ -296,13 +296,6 @@ static CURLcode chop_write(struct Curl_easy *data,
if(!skip_body_write &&
((type & CLIENTWRITE_BODY) ||
((type & CLIENTWRITE_HEADER) && data->set.include_header))) {
#ifdef USE_WEBSOCKETS
if(conn->handler->protocol & (CURLPROTO_WS|CURLPROTO_WSS)) {
writebody = Curl_ws_writecb;
writebody_ptr = data;
}
else
#endif
writebody = data->set.fwrite_func;
}
if((type & (CLIENTWRITE_HEADER|CLIENTWRITE_INFO)) &&

316
lib/ws.c
View File

@ -24,7 +24,7 @@
#include "curl_setup.h"
#include <curl/curl.h>
#ifdef USE_WEBSOCKETS
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
#include "urldata.h"
#include "bufq.h"
@ -354,6 +354,136 @@ static void update_meta(struct websocket *ws,
ws->frame.bytesleft = (payload_len - payload_offset - cur_len);
}
/* WebSockets decoding client writer */
struct ws_cw_ctx {
struct Curl_cwriter super;
struct bufq buf;
};
static CURLcode ws_cw_init(struct Curl_easy *data,
struct Curl_cwriter *writer)
{
struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer;
(void)data;
Curl_bufq_init2(&ctx->buf, WS_CHUNK_SIZE, 1, BUFQ_OPT_SOFT_LIMIT);
return CURLE_OK;
}
static void ws_cw_close(struct Curl_easy *data, struct Curl_cwriter *writer)
{
struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer;
(void) data;
Curl_bufq_free(&ctx->buf);
}
struct ws_cw_dec_ctx {
struct Curl_easy *data;
struct websocket *ws;
struct Curl_cwriter *next_writer;
int cw_type;
};
static ssize_t ws_cw_dec_next(const unsigned char *buf, size_t buflen,
int frame_age, int frame_flags,
curl_off_t payload_offset,
curl_off_t payload_len,
void *user_data,
CURLcode *err)
{
struct ws_cw_dec_ctx *ctx = user_data;
struct Curl_easy *data = ctx->data;
struct websocket *ws = ctx->ws;
curl_off_t remain = (payload_len - (payload_offset + buflen));
(void)frame_age;
if((frame_flags & CURLWS_PING) && !remain) {
/* auto-respond to PINGs, only works for single-frame payloads atm */
size_t bytes;
infof(data, "WS: auto-respond to PING with a PONG");
/* send back the exact same content as a PONG */
*err = curl_ws_send(data, buf, buflen, &bytes, 0, CURLWS_PONG);
if(*err)
return -1;
}
else if(buflen || !remain) {
/* forward the decoded frame to the next client writer. */
update_meta(ws, frame_age, frame_flags, payload_offset,
payload_len, buflen);
*err = Curl_cwriter_write(data, ctx->next_writer, ctx->cw_type,
(const char *)buf, buflen);
if(*err)
return -1;
}
*err = CURLE_OK;
return (ssize_t)buflen;
}
static CURLcode ws_cw_write(struct Curl_easy *data,
struct Curl_cwriter *writer, int type,
const char *buf, size_t nbytes)
{
struct ws_cw_ctx *ctx = (struct ws_cw_ctx *)writer;
struct websocket *ws;
CURLcode result;
if(!(type & CLIENTWRITE_BODY) || data->set.ws_raw_mode)
return Curl_cwriter_write(data, writer->next, type, buf, nbytes);
ws = data->conn->proto.ws;
if(!ws) {
failf(data, "WS: not a websocket transfer");
return CURLE_FAILED_INIT;
}
if(nbytes) {
ssize_t nwritten;
nwritten = Curl_bufq_write(&ctx->buf, (const unsigned char *)buf,
nbytes, &result);
if(nwritten < 0) {
infof(data, "WS: error adding data to buffer %d", result);
return result;
}
}
while(!Curl_bufq_is_empty(&ctx->buf)) {
struct ws_cw_dec_ctx pass_ctx;
pass_ctx.data = data;
pass_ctx.ws = ws;
pass_ctx.next_writer = writer->next;
pass_ctx.cw_type = type;
result = ws_dec_pass(&ws->dec, data, &ctx->buf,
ws_cw_dec_next, &pass_ctx);
if(result == CURLE_AGAIN)
/* insufficient amount of data, keep it for later.
* we pretend to have written all since we have a copy */
return CURLE_OK;
else if(result) {
infof(data, "WS: decode error %d", (int)result);
return result;
}
}
if((type & CLIENTWRITE_EOS) && !Curl_bufq_is_empty(&ctx->buf)) {
infof(data, "WS: decode ending with %zd frame bytes remaining",
Curl_bufq_len(&ctx->buf));
return CURLE_RECV_ERROR;
}
return CURLE_OK;
}
/* WebSocket payload decoding client writer. */
static const struct Curl_cwtype ws_cw_decode = {
"ws-decode",
NULL,
ws_cw_init,
ws_cw_write,
ws_cw_close,
sizeof(struct ws_cw_ctx)
};
static void ws_enc_info(struct ws_encoder *enc, struct Curl_easy *data,
const char *msg)
{
@ -618,6 +748,7 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
{
struct SingleRequest *k = &data->req;
struct websocket *ws;
struct Curl_cwriter *ws_dec_writer;
CURLcode result;
DEBUGASSERT(data->conn);
@ -627,7 +758,8 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
if(!ws)
return CURLE_OUT_OF_MEMORY;
data->conn->proto.ws = ws;
Curl_bufq_init(&ws->recvbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT);
Curl_bufq_init2(&ws->recvbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT,
BUFQ_OPT_SOFT_LIMIT);
Curl_bufq_init2(&ws->sendbuf, WS_CHUNK_SIZE, WS_CHUNK_COUNT,
BUFQ_OPT_SOFT_LIMIT);
ws_dec_init(&ws->dec);
@ -666,6 +798,18 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
infof(data, "Received 101, switch to WebSocket; mask %02x%02x%02x%02x",
ws->enc.mask[0], ws->enc.mask[1], ws->enc.mask[2], ws->enc.mask[3]);
/* Install our client writer that decodes WS frames payload */
result = Curl_cwriter_create(&ws_dec_writer, data, &ws_cw_decode,
CURL_CW_CONTENT_DECODE);
if(result)
return result;
result = Curl_cwriter_add(data, ws_dec_writer);
if(result) {
Curl_cwriter_free(data, ws_dec_writer);
return result;
}
if(data->set.connect_only) {
ssize_t nwritten;
/* In CONNECT_ONLY setup, the payloads from `mem` need to be received
@ -677,107 +821,17 @@ CURLcode Curl_ws_accept(struct Curl_easy *data,
return result;
infof(data, "%zu bytes websocket payload", nread);
}
else { /* !connect_only */
/* And pass any additional data to the writers */
if(nread) {
result = Curl_client_write(data, CLIENTWRITE_BODY, (char *)mem, nread);
}
}
k->upgr101 = UPGR101_RECEIVED;
return result;
}
static ssize_t ws_client_write(const unsigned char *buf, size_t buflen,
int frame_age, int frame_flags,
curl_off_t payload_offset,
curl_off_t payload_len,
void *userp,
CURLcode *err)
{
struct Curl_easy *data = userp;
struct websocket *ws;
size_t wrote;
curl_off_t remain = (payload_len - (payload_offset + buflen));
(void)frame_age;
if(!data->conn || !data->conn->proto.ws) {
*err = CURLE_FAILED_INIT;
return -1;
}
ws = data->conn->proto.ws;
if((frame_flags & CURLWS_PING) && !remain) {
/* auto-respond to PINGs, only works for single-frame payloads atm */
size_t bytes;
infof(data, "WS: auto-respond to PING with a PONG");
/* send back the exact same content as a PONG */
*err = curl_ws_send(data, buf, buflen, &bytes, 0, CURLWS_PONG);
if(*err)
return -1;
}
else if(buflen || !remain) {
/* deliver the decoded frame to the user callback. The application
* may invoke curl_ws_meta() to access frame information. */
update_meta(ws, frame_age, frame_flags, payload_offset,
payload_len, buflen);
Curl_set_in_callback(data, true);
wrote = data->set.fwrite_func((char *)buf, 1,
buflen, data->set.out);
Curl_set_in_callback(data, false);
if(wrote != buflen) {
*err = CURLE_RECV_ERROR;
return -1;
}
}
*err = CURLE_OK;
return (ssize_t)buflen;
}
/* Curl_ws_writecb() is the write callback for websocket traffic. The
websocket data is provided to this raw, in chunks. This function should
handle/decode the data and call the "real" underlying callback accordingly.
*/
size_t Curl_ws_writecb(char *buffer, size_t size /* 1 */,
size_t nitems, void *userp)
{
struct Curl_easy *data = userp;
if(data->set.ws_raw_mode)
return data->set.fwrite_func(buffer, size, nitems, data->set.out);
else if(nitems) {
struct websocket *ws;
CURLcode result;
if(!data->conn || !data->conn->proto.ws) {
failf(data, "WS: not a websocket transfer");
return nitems - 1;
}
ws = data->conn->proto.ws;
if(buffer) {
ssize_t nwritten;
nwritten = Curl_bufq_write(&ws->recvbuf, (const unsigned char *)buffer,
nitems, &result);
if(nwritten < 0) {
infof(data, "WS: error adding data to buffer %d", (int)result);
return nitems - 1;
}
buffer = NULL;
}
while(!Curl_bufq_is_empty(&ws->recvbuf)) {
result = ws_dec_pass(&ws->dec, data, &ws->recvbuf,
ws_client_write, data);
if(result == CURLE_AGAIN)
/* insufficient amount of data, keep it for later.
* we pretend to have written all since we have a copy */
return nitems;
else if(result) {
infof(data, "WS: decode error %d", (int)result);
return nitems - 1;
}
}
}
return nitems;
}
struct ws_collect {
struct Curl_easy *data;
void *buffer;
@ -1085,14 +1139,23 @@ static void ws_free(struct connectdata *conn)
}
}
static CURLcode ws_setup_conn(struct Curl_easy *data,
struct connectdata *conn)
{
/* websockets is 1.1 only (for now) */
data->state.httpwant = CURL_HTTP_VERSION_1_1;
return Curl_http_setup_conn(data, conn);
}
void Curl_ws_done(struct Curl_easy *data)
{
(void)data;
}
CURLcode Curl_ws_disconnect(struct Curl_easy *data,
struct connectdata *conn,
bool dead_connection)
static CURLcode ws_disconnect(struct Curl_easy *data,
struct connectdata *conn,
bool dead_connection)
{
(void)data;
(void)dead_connection;
@ -1110,6 +1173,57 @@ CURL_EXTERN const struct curl_ws_frame *curl_ws_meta(struct Curl_easy *data)
return NULL;
}
const struct Curl_handler Curl_handler_ws = {
"WS", /* scheme */
ws_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
Curl_http_connect, /* connect_it */
ZERO_NULL, /* connecting */
ZERO_NULL, /* doing */
ZERO_NULL, /* proto_getsock */
Curl_http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ws_disconnect, /* disconnect */
Curl_http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTP, /* defport */
CURLPROTO_WS, /* protocol */
CURLPROTO_HTTP, /* family */
PROTOPT_CREDSPERREQUEST | /* flags */
PROTOPT_USERPWDCTRL
};
#ifdef USE_SSL
const struct Curl_handler Curl_handler_wss = {
"WSS", /* scheme */
ws_setup_conn, /* setup_connection */
Curl_http, /* do_it */
Curl_http_done, /* done */
ZERO_NULL, /* do_more */
Curl_http_connect, /* connect_it */
NULL, /* connecting */
ZERO_NULL, /* doing */
NULL, /* proto_getsock */
Curl_http_getsock_do, /* doing_getsock */
ZERO_NULL, /* domore_getsock */
ZERO_NULL, /* perform_getsock */
ws_disconnect, /* disconnect */
Curl_http_write_resp, /* write_resp */
ZERO_NULL, /* connection_check */
ZERO_NULL, /* attach connection */
PORT_HTTPS, /* defport */
CURLPROTO_WSS, /* protocol */
CURLPROTO_HTTP, /* family */
PROTOPT_SSL | PROTOPT_CREDSPERREQUEST | /* flags */
PROTOPT_USERPWDCTRL
};
#endif
#else
CURL_EXTERN CURLcode curl_ws_recv(CURL *curl, void *buffer, size_t buflen,

View File

@ -25,7 +25,7 @@
***************************************************************************/
#include "curl_setup.h"
#ifdef USE_WEBSOCKETS
#if defined(USE_WEBSOCKETS) && !defined(CURL_DISABLE_HTTP)
#ifdef USE_HYPER
#define REQTYPE void
@ -75,11 +75,14 @@ struct websocket {
CURLcode Curl_ws_request(struct Curl_easy *data, REQTYPE *req);
CURLcode Curl_ws_accept(struct Curl_easy *data, const char *mem, size_t len);
size_t Curl_ws_writecb(char *buffer, size_t size, size_t nitems, void *userp);
void Curl_ws_done(struct Curl_easy *data);
CURLcode Curl_ws_disconnect(struct Curl_easy *data,
struct connectdata *conn,
bool dead_connection);
extern const struct Curl_handler Curl_handler_ws;
#ifdef USE_SSL
extern const struct Curl_handler Curl_handler_wss;
#endif
#else
#define Curl_ws_request(x,y) CURLE_OK
#define Curl_ws_done(x) Curl_nop_stmt