MQTT: improve receive of ACKs

- add `mq->recvbuf` to provide buffering of incomplete
  ACK responses
- continue ACK reading until sufficient bytes available
- fixes test failures on low network receives

Closes #12071
This commit is contained in:
Stefan Eissing 2023-10-09 11:36:37 +02:00 committed by Daniel Stenberg
parent 5032f04ee9
commit b0f3d71c1f
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
3 changed files with 63 additions and 31 deletions

View File

@ -89,4 +89,5 @@ int Curl_dyn_vprintf(struct dynbuf *dyn, const char *format, va_list ap_save);
#define DYN_H1_TRAILER 4096
#define DYN_PINGPPONG_CMD (64*1024)
#define DYN_IMAP_CMD (64*1024)
#define DYN_MQTT_RECV (64*1024)
#endif

View File

@ -109,6 +109,7 @@ static CURLcode mqtt_setup_conn(struct Curl_easy *data,
mq = calloc(1, sizeof(struct MQTT));
if(!mq)
return CURLE_OUT_OF_MEMORY;
Curl_dyn_init(&mq->recvbuf, DYN_MQTT_RECV);
data->req.p.mqtt = mq;
return CURLE_OK;
}
@ -350,36 +351,66 @@ static CURLcode mqtt_disconnect(struct Curl_easy *data)
struct MQTT *mq = data->req.p.mqtt;
result = mqtt_send(data, (char *)"\xe0\x00", 2);
Curl_safefree(mq->sendleftovers);
Curl_dyn_free(&mq->recvbuf);
return result;
}
static CURLcode mqtt_recv_atleast(struct Curl_easy *data, size_t nbytes)
{
struct MQTT *mq = data->req.p.mqtt;
size_t rlen = Curl_dyn_len(&mq->recvbuf);
CURLcode result;
if(rlen < nbytes) {
unsigned char readbuf[1024];
ssize_t nread;
DEBUGASSERT(nbytes - rlen < sizeof(readbuf));
result = Curl_read(data, data->conn->sock[FIRSTSOCKET],
(char *)readbuf, nbytes - rlen, &nread);
if(result)
return result;
DEBUGASSERT(nread >= 0);
if(Curl_dyn_addn(&mq->recvbuf, readbuf, (size_t)nread))
return CURLE_OUT_OF_MEMORY;
rlen = Curl_dyn_len(&mq->recvbuf);
}
return (rlen >= nbytes)? CURLE_OK : CURLE_AGAIN;
}
static void mqtt_recv_consume(struct Curl_easy *data, size_t nbytes)
{
struct MQTT *mq = data->req.p.mqtt;
size_t rlen = Curl_dyn_len(&mq->recvbuf);
if(rlen <= nbytes)
Curl_dyn_reset(&mq->recvbuf);
else
Curl_dyn_tail(&mq->recvbuf, rlen - nbytes);
}
static CURLcode mqtt_verify_connack(struct Curl_easy *data)
{
struct MQTT *mq = data->req.p.mqtt;
CURLcode result;
struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_CONNACK_LEN];
ssize_t nread;
char *ptr;
result = Curl_read(data, sockfd, (char *)readbuf, MQTT_CONNACK_LEN, &nread);
result = mqtt_recv_atleast(data, MQTT_CONNACK_LEN);
if(result)
goto fail;
Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
/* verify CONNACK */
DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_CONNACK_LEN);
ptr = Curl_dyn_ptr(&mq->recvbuf);
Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_CONNACK_LEN);
/* fixme */
if(nread < MQTT_CONNACK_LEN) {
if(ptr[0] != 0x00 || ptr[1] != 0x00) {
failf(data, "Expected %02x%02x but got %02x%02x",
0x00, 0x00, ptr[0], ptr[1]);
Curl_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
/* verify CONNACK */
if(readbuf[0] != 0x00 || readbuf[1] != 0x00) {
failf(data, "Expected %02x%02x but got %02x%02x",
0x00, 0x00, readbuf[0], readbuf[1]);
result = CURLE_WEIRD_SERVER_REPLY;
}
mqtt_recv_consume(data, MQTT_CONNACK_LEN);
fail:
return result;
}
@ -452,31 +483,29 @@ fail:
*/
static CURLcode mqtt_verify_suback(struct Curl_easy *data)
{
CURLcode result;
struct MQTT *mq = data->req.p.mqtt;
struct connectdata *conn = data->conn;
curl_socket_t sockfd = conn->sock[FIRSTSOCKET];
unsigned char readbuf[MQTT_SUBACK_LEN];
ssize_t nread;
struct mqtt_conn *mqtt = &conn->proto.mqtt;
CURLcode result;
char *ptr;
result = Curl_read(data, sockfd, (char *)readbuf, MQTT_SUBACK_LEN, &nread);
result = mqtt_recv_atleast(data, MQTT_SUBACK_LEN);
if(result)
goto fail;
Curl_debug(data, CURLINFO_HEADER_IN, (char *)readbuf, (size_t)nread);
/* verify SUBACK */
DEBUGASSERT(Curl_dyn_len(&mq->recvbuf) >= MQTT_SUBACK_LEN);
ptr = Curl_dyn_ptr(&mq->recvbuf);
Curl_debug(data, CURLINFO_HEADER_IN, ptr, MQTT_SUBACK_LEN);
/* fixme */
if(nread < MQTT_SUBACK_LEN) {
if(((unsigned char)ptr[0]) != ((mqtt->packetid >> 8) & 0xff) ||
((unsigned char)ptr[1]) != (mqtt->packetid & 0xff) ||
ptr[2] != 0x00) {
Curl_dyn_reset(&mq->recvbuf);
result = CURLE_WEIRD_SERVER_REPLY;
goto fail;
}
/* verify SUBACK */
if(readbuf[0] != ((mqtt->packetid >> 8) & 0xff) ||
readbuf[1] != (mqtt->packetid & 0xff) ||
readbuf[2] != 0x00)
result = CURLE_WEIRD_SERVER_REPLY;
mqtt_recv_consume(data, MQTT_SUBACK_LEN);
fail:
return result;
}
@ -713,6 +742,7 @@ static CURLcode mqtt_done(struct Curl_easy *data,
(void)status;
(void)premature;
Curl_safefree(mq->sendleftovers);
Curl_dyn_free(&mq->recvbuf);
return CURLE_OK;
}

View File

@ -56,6 +56,7 @@ struct MQTT {
size_t npacket; /* byte counter */
unsigned char firstbyte;
size_t remaining_length;
struct dynbuf recvbuf;
};
#endif /* HEADER_CURL_MQTT_H */