RTSP: improved RTP parser

- fix HTTP header parsing to report incomplete
  lines it buffers as consumed!
- re-implement the RTP parser for interleave RTP
  messages for robustness. It is now keeping its
  state at the connection
- RTSP protocol handler "readwrite" implementation
  now tracks if the response is before/in/after
  header parsing or "in" a bod by calling
  "Curl_http_readwrite_headers()" itself. This
  allows it to know when non-RTP bytes are "junk"
  or HEADER or BODY.
- tested with #12035 and various small receive
  sizes where current master fails

Closes #12052
This commit is contained in:
Stefan Eissing 2023-10-07 15:13:09 +02:00 committed by Daniel Stenberg
parent 117c9bd978
commit 7eb31c852d
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
4 changed files with 280 additions and 160 deletions

View File

@ -3986,6 +3986,7 @@ CURLcode Curl_http_readwrite_headers(struct Curl_easy *data,
char *end_ptr;
/* header line within buffer loop */
*stop_reading = FALSE;
do {
size_t rest_length;
size_t full_length;
@ -4023,7 +4024,7 @@ CURLcode Curl_http_readwrite_headers(struct Curl_easy *data,
break;
}
}
*nread = 0;
break; /* read more and try again */
}

View File

@ -45,8 +45,8 @@
#include "curl_memory.h"
#include "memdebug.h"
#define RTP_PKT_LENGTH(p) ((((int)((unsigned char)((p)[2]))) << 8) | \
((int)((unsigned char)((p)[3]))))
#define RTP_PKT_LENGTH(p) ((((unsigned int)((unsigned char)((p)[2]))) << 8) | \
((unsigned int)((unsigned char)((p)[3]))))
/* protocol-specific functions set up to be called by the main engine */
static CURLcode rtsp_do(struct Curl_easy *data, bool *done);
@ -88,7 +88,7 @@ static int rtsp_getsock_do(struct Curl_easy *data, struct connectdata *conn,
}
static
CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len);
CURLcode rtp_client_write(struct Curl_easy *data, const char *ptr, size_t len);
static
CURLcode rtsp_parse_transport(struct Curl_easy *data, char *transport);
@ -585,153 +585,257 @@ static CURLcode rtsp_do(struct Curl_easy *data, bool *done)
return result;
}
static CURLcode rtsp_filter_rtp(struct Curl_easy *data,
struct connectdata *conn,
const char *buf,
size_t blen,
bool in_body,
size_t *pconsumed)
{
struct rtsp_conn *rtspc = &(conn->proto.rtspc);
CURLcode result = CURLE_OK;
*pconsumed = 0;
while(blen) {
switch(rtspc->state) {
case RTP_PARSE_SKIP: {
DEBUGASSERT(Curl_dyn_len(&rtspc->buf) == 0);
if(in_body && buf[0] != '$') {
/* in BODY and no valid start, do not consume and return */
goto out;
}
while(blen && buf[0] != '$') {
if(!in_body && buf[0] == 'R' &&
data->set.rtspreq != RTSPREQ_RECEIVE) {
if(strncmp(buf, "RTSP/", (blen < 5) ? blen : 5) == 0) {
/* This could be the next response, no consume and return */
if(*pconsumed) {
DEBUGF(infof(data, "RTP rtsp_filter_rtp[SKIP] RTSP/ prefix, "
"skipping %zd bytes of junk", *pconsumed));
}
rtspc->state = RTP_PARSE_SKIP;
rtspc->in_header = TRUE;
goto out;
}
}
/* junk, consume without buffering */
*pconsumed += 1;
++buf;
--blen;
}
if(blen && buf[0] == '$') {
/* possible start of an RTP message, buffer */
if(Curl_dyn_addn(&rtspc->buf, buf, 1)) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
*pconsumed += 1;
++buf;
--blen;
rtspc->state = RTP_PARSE_CHANNEL;
}
break;
}
case RTP_PARSE_CHANNEL: {
int idx = ((unsigned char)buf[0]) / 8;
int off = ((unsigned char)buf[0]) % 8;
DEBUGASSERT(Curl_dyn_len(&rtspc->buf) == 1);
if(!(data->state.rtp_channel_mask[idx] & (1 << off))) {
/* invalid channel number, junk or BODY data */
rtspc->state = RTP_PARSE_SKIP;
if(in_body) {
/* we do not consume this byte, it is BODY data */
DEBUGF(infof(data, "RTSP: invalid RTP channel %d in BODY, "
"treating as BODY data", idx));
if(*pconsumed == 0) {
/* We did not consume the initial '$' in our buffer, but had
* it from an earlier call. We cannot un-consume it and have
* to write it directly as BODY data */
result = Curl_client_write(data, CLIENTWRITE_BODY,
Curl_dyn_ptr(&rtspc->buf), 1);
++data->req.bytecount;
Curl_dyn_free(&rtspc->buf);
if(result)
goto out;
}
else {
/* un-consume the '$' and leave */
Curl_dyn_free(&rtspc->buf);
*pconsumed -= 1;
--buf;
++blen;
goto out;
}
}
else {
/* not BODY, forget the junk '$'. Do not consume this byte,
* it might be a start */
infof(data, "RTSP: invalid RTP channel %d, skipping", idx);
Curl_dyn_free(&rtspc->buf);
}
break;
}
/* a valid channel, so we expect this to be a real RTP message */
rtspc->rtp_channel = (unsigned char)buf[0];
if(Curl_dyn_addn(&rtspc->buf, buf, 1)) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
*pconsumed += 1;
++buf;
--blen;
rtspc->state = RTP_PARSE_LEN;
break;
}
case RTP_PARSE_LEN: {
size_t rtp_len = Curl_dyn_len(&rtspc->buf);
const char *rtp_buf;
DEBUGASSERT(rtp_len >= 2 && rtp_len < 4);
if(Curl_dyn_addn(&rtspc->buf, buf, 1)) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
*pconsumed += 1;
++buf;
--blen;
if(rtp_len == 2)
break;
rtp_buf = Curl_dyn_ptr(&rtspc->buf);
rtspc->rtp_len = RTP_PKT_LENGTH(rtp_buf) + 4;
rtspc->state = RTP_PARSE_DATA;
break;
}
case RTP_PARSE_DATA: {
size_t rtp_len = Curl_dyn_len(&rtspc->buf);
size_t needed;
DEBUGASSERT(rtp_len < rtspc->rtp_len);
needed = rtspc->rtp_len - rtp_len;
if(needed <= blen) {
if(Curl_dyn_addn(&rtspc->buf, buf, needed)) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
*pconsumed += needed;
buf += needed;
blen -= needed;
/* complete RTP message in buffer */
DEBUGF(infof(data, "RTP write channel %d rtp_len %zu",
rtspc->rtp_channel, rtspc->rtp_len));
result = rtp_client_write(data, Curl_dyn_ptr(&rtspc->buf),
rtspc->rtp_len);
Curl_dyn_free(&rtspc->buf);
rtspc->state = RTP_PARSE_SKIP;
if(result)
goto out;
}
else {
if(Curl_dyn_addn(&rtspc->buf, buf, blen)) {
result = CURLE_OUT_OF_MEMORY;
goto out;
}
*pconsumed += blen;
buf += blen;
blen = 0;
}
break;
}
default:
DEBUGASSERT(0);
return CURLE_RECV_ERROR;
}
}
out:
return result;
}
static CURLcode rtsp_rtp_readwrite(struct Curl_easy *data,
struct connectdata *conn,
ssize_t *nread,
bool *readmore) {
struct SingleRequest *k = &data->req;
bool *readmore)
{
struct rtsp_conn *rtspc = &(conn->proto.rtspc);
unsigned char *rtp_channel_mask = data->state.rtp_channel_mask;
CURLcode result = CURLE_OK;
size_t consumed = 0;
char *buf;
size_t blen;
bool in_body;
char *rtp; /* moving pointer to rtp data */
ssize_t rtp_dataleft; /* how much data left to parse in this round */
CURLcode result;
bool interleaved = false;
size_t skip_size = 0;
if(!data->req.header)
rtspc->in_header = FALSE;
in_body = (data->req.headerline && !rtspc->in_header) &&
(data->req.size >= 0) &&
(data->req.bytecount < data->req.size);
if(Curl_dyn_len(&rtspc->buf)) {
/* There was some leftover data the last time. Append new buffers */
if(Curl_dyn_addn(&rtspc->buf, k->str, *nread))
return CURLE_OUT_OF_MEMORY;
rtp = Curl_dyn_ptr(&rtspc->buf);
rtp_dataleft = Curl_dyn_len(&rtspc->buf);
}
else {
/* Just parse the request buffer directly */
rtp = k->str;
rtp_dataleft = *nread;
DEBUGASSERT(*nread >= 0);
blen = (size_t)(*nread);
buf = data->req.str;
*readmore = FALSE;
if(!blen) {
goto out;
}
while(rtp_dataleft > 0) {
if(rtp[0] == '$') {
if(rtp_dataleft > 4) {
unsigned char rtp_channel;
int rtp_length;
int idx;
int off;
/* If header parsing is not onging, extract RTP messsages */
if(!rtspc->in_header) {
result = rtsp_filter_rtp(data, conn, buf, blen, in_body, &consumed);
if(result)
goto out;
buf += consumed;
blen -= consumed;
}
/* Parse the header */
/* The channel identifier immediately follows and is 1 byte */
rtp_channel = (unsigned char)rtp[1];
idx = rtp_channel / 8;
off = rtp_channel % 8;
if(!(rtp_channel_mask[idx] & (1 << off))) {
/* invalid channel number, maybe not an RTP packet */
rtp++;
rtp_dataleft--;
skip_size++;
continue;
}
if(skip_size > 0) {
DEBUGF(infof(data, "Skip the malformed interleaved data %lu "
"bytes", skip_size));
}
skip_size = 0;
rtspc->rtp_channel = rtp_channel;
/* we want to parse headers, do so */
if(data->req.header && blen) {
bool stop_reading;
rtspc->in_header = TRUE;
data->req.str = buf;
*nread = blen;
result = Curl_http_readwrite_headers(data, conn, nread, &stop_reading);
if(result)
goto out;
/* The length is two bytes */
rtp_length = RTP_PKT_LENGTH(rtp);
DEBUGASSERT(*nread >= 0);
blen = (size_t)(*nread);
buf = data->req.str;
if(rtp_dataleft < rtp_length + 4) {
/* Need more - incomplete payload */
*readmore = TRUE;
break;
}
interleaved = true;
/* We have the full RTP interleaved packet
* Write out the header including the leading '$' */
DEBUGF(infof(data, "RTP write channel %d rtp_length %d",
rtspc->rtp_channel, rtp_length));
result = rtp_client_write(data, &rtp[0], rtp_length + 4);
if(result) {
*readmore = FALSE;
return result;
}
if(!data->req.header)
rtspc->in_header = FALSE;
/* Move forward in the buffer */
rtp_dataleft -= rtp_length + 4;
rtp += rtp_length + 4;
if(data->set.rtspreq == RTSPREQ_RECEIVE) {
/* If we are in a passive receive, give control back
* to the app as often as we can.
*/
k->keepon &= ~KEEP_RECV;
}
}
else {
/* Need more - incomplete header */
*readmore = TRUE;
break;
}
}
else {
/* If the following data begins with 'RTSP/', which might be an RTSP
message, we should stop skipping the data. */
/* If `k-> headerline> 0 && !interleaved` is true, we are maybe in the
middle of an RTSP message. It is difficult to determine this, so we
stop skipping. */
size_t prefix_len = (rtp_dataleft < 5) ? rtp_dataleft : 5;
if((k->headerline > 0 && !interleaved) ||
strncmp(rtp, "RTSP/", prefix_len) == 0) {
if(skip_size > 0) {
DEBUGF(infof(data, "Skip the malformed interleaved data %lu "
"bytes", skip_size));
}
break; /* maybe is an RTSP message */
}
/* Skip incorrect data util the next RTP packet or RTSP message */
do {
rtp++;
rtp_dataleft--;
skip_size++;
} while(rtp_dataleft > 0 && rtp[0] != '$' && rtp[0] != 'R');
if(!rtspc->in_header) {
/* If header parsing is done and data left, extract RTP messages */
in_body = (data->req.headerline && !rtspc->in_header) &&
(data->req.size >= 0) &&
(data->req.bytecount < data->req.size);
result = rtsp_filter_rtp(data, conn, buf, blen, in_body, &consumed);
if(result)
goto out;
buf += consumed;
blen -= consumed;
}
}
if(rtp_dataleft && rtp[0] == '$') {
DEBUGF(infof(data, "RTP Rewinding %zd %s", rtp_dataleft,
*readmore ? "(READMORE)" : ""));
data->req.str = buf;
*nread = blen;
if(rtspc->state != RTP_PARSE_SKIP)
*readmore = TRUE;
/* Store the incomplete RTP packet for a "rewind" */
if(!Curl_dyn_len(&rtspc->buf)) {
/* nothing was stored, add this data */
if(Curl_dyn_addn(&rtspc->buf, rtp, rtp_dataleft))
return CURLE_OUT_OF_MEMORY;
}
else {
/* keep the remainder */
Curl_dyn_tail(&rtspc->buf, rtp_dataleft);
}
/* As far as the transfer is concerned, this data is consumed */
*nread = 0;
return CURLE_OK;
out:
if(!*readmore && data->set.rtspreq == RTSPREQ_RECEIVE) {
/* In special mode RECEIVE, we just process one chunk of network
* data, so we stop the transfer here, if we have no incomplete
* RTP message pending. */
data->req.keepon &= ~KEEP_RECV;
}
/* Fix up k->str to point just after the last RTP packet */
k->str += *nread - rtp_dataleft;
*nread = rtp_dataleft;
/* If we get here, we have finished with the leftover/merge buffer */
Curl_dyn_free(&rtspc->buf);
return CURLE_OK;
return result;
}
static
CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len)
CURLcode rtp_client_write(struct Curl_easy *data, const char *ptr, size_t len)
{
size_t wrote;
curl_write_callback writeit;
@ -756,7 +860,7 @@ CURLcode rtp_client_write(struct Curl_easy *data, char *ptr, size_t len)
}
Curl_set_in_callback(data, true);
wrote = writeit(ptr, 1, len, user_ptr);
wrote = writeit((char *)ptr, 1, len, user_ptr);
Curl_set_in_callback(data, false);
if(CURL_WRITEFUNC_PAUSE == wrote) {

View File

@ -39,6 +39,12 @@ CURLcode Curl_rtsp_parseheader(struct Curl_easy *data, char *header);
#endif /* CURL_DISABLE_RTSP */
typedef enum {
RTP_PARSE_SKIP,
RTP_PARSE_CHANNEL,
RTP_PARSE_LEN,
RTP_PARSE_DATA
} rtp_parse_st;
/*
* RTSP Connection data
*
@ -47,6 +53,9 @@ CURLcode Curl_rtsp_parseheader(struct Curl_easy *data, char *header);
struct rtsp_conn {
struct dynbuf buf;
int rtp_channel;
size_t rtp_len;
rtp_parse_st state;
BIT(in_header);
};
/****************************************************************************

View File

@ -423,7 +423,6 @@ static CURLcode readwrite_data(struct Curl_easy *data,
{
CURLcode result = CURLE_OK;
ssize_t nread; /* number of bytes read */
size_t excess = 0; /* excess bytes read */
bool readmore = FALSE; /* used by RTP to signal for more data */
int maxloops = 100;
curl_off_t max_recv = data->set.max_recv_speed?
@ -439,6 +438,7 @@ static CURLcode readwrite_data(struct Curl_easy *data,
read or we get a CURLE_AGAIN */
do {
bool is_empty_data = FALSE;
size_t excess = 0; /* excess bytes read */
size_t buffersize = data->set.buffer_size;
size_t bytestoread = buffersize;
/* For HTTP/2 and HTTP/3, read data without caring about the content
@ -555,9 +555,11 @@ static CURLcode readwrite_data(struct Curl_easy *data,
is non-headers. */
if(!k->header && (nread > 0 || is_empty_data)) {
if(data->req.no_body) {
if(data->req.no_body && nread > 0) {
/* data arrives although we want none, bail out */
streamclose(conn, "ignoring body");
DEBUGF(infof(data, "did not want a BODY, but seeing %zd bytes",
nread));
*done = TRUE;
result = CURLE_WEIRD_SERVER_REPLY;
goto out;
@ -642,17 +644,6 @@ static CURLcode readwrite_data(struct Curl_easy *data,
(k->bytecount + nread >= k->maxdownload)) {
excess = (size_t)(k->bytecount + nread - k->maxdownload);
if(excess > 0 && !k->ignorebody) {
infof(data,
"Excess found in a read:"
" excess = %zu"
", size = %" CURL_FORMAT_CURL_OFF_T
", maxdownload = %" CURL_FORMAT_CURL_OFF_T
", bytecount = %" CURL_FORMAT_CURL_OFF_T,
excess, k->size, k->maxdownload, k->bytecount);
connclose(conn, "excess found in a read");
}
nread = (ssize_t) (k->maxdownload - k->bytecount);
if(nread < 0) /* this should be unusual */
nread = 0;
@ -718,24 +709,39 @@ static CURLcode readwrite_data(struct Curl_easy *data,
} /* if(!header and data to read) */
if(conn->handler->readwrite && excess) {
/* Parse the excess data */
k->str += nread;
if(excess > 0 && !k->ignorebody) {
if(conn->handler->readwrite) {
/* Give protocol handler a chance to do something with it */
k->str += nread;
if(&k->str[excess] > &buf[data->set.buffer_size]) {
/* the excess amount was too excessive(!), make sure
it doesn't read out of buffer */
excess = &buf[data->set.buffer_size] - k->str;
}
nread = (ssize_t)excess;
result = conn->handler->readwrite(data, conn, &nread, &readmore);
if(result)
goto out;
if(&k->str[excess] > &buf[data->set.buffer_size]) {
/* the excess amount was too excessive(!), make sure
it doesn't read out of buffer */
excess = &buf[data->set.buffer_size] - k->str;
if(readmore) {
DEBUGASSERT(nread == 0);
k->keepon |= KEEP_RECV; /* we're not done reading */
}
else if(nread == 0)
break;
/* protocol handler did not consume all excess data */
excess = nread;
}
if(excess) {
infof(data,
"Excess found in a read:"
" excess = %zu"
", size = %" CURL_FORMAT_CURL_OFF_T
", maxdownload = %" CURL_FORMAT_CURL_OFF_T
", bytecount = %" CURL_FORMAT_CURL_OFF_T,
excess, k->size, k->maxdownload, k->bytecount);
connclose(conn, "excess found in a read");
}
nread = (ssize_t)excess;
result = conn->handler->readwrite(data, conn, &nread, &readmore);
if(result)
goto out;
if(readmore)
k->keepon |= KEEP_RECV; /* we're not done reading */
break;
}
if(is_empty_data) {