cf-socket: add socket recv buffering for most tcp cases
- use bufq as recv buffer, also for Windows pre-receive handling - catch small reads followed by larger ones in a single socket call. A common pattern on TLS connections. Closes #10787
This commit is contained in:
parent
4cfa5bcc9a
commit
24726a437e
245
lib/cf-socket.c
245
lib/cf-socket.c
@ -54,6 +54,7 @@
|
||||
#endif
|
||||
|
||||
#include "urldata.h"
|
||||
#include "bufq.h"
|
||||
#include "sendf.h"
|
||||
#include "if2ip.h"
|
||||
#include "strerror.h"
|
||||
@ -729,29 +730,20 @@ CURLcode Curl_socket_connect_result(struct Curl_easy *data,
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
struct io_buffer {
|
||||
char *bufr;
|
||||
size_t allc; /* size of the current allocation */
|
||||
size_t head; /* bufr index for next read */
|
||||
size_t tail; /* bufr index for next write */
|
||||
};
|
||||
|
||||
static void io_buffer_reset(struct io_buffer *iob)
|
||||
{
|
||||
if(iob->bufr)
|
||||
free(iob->bufr);
|
||||
memset(iob, 0, sizeof(*iob));
|
||||
}
|
||||
#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */
|
||||
/* We have a recv buffer to enhance reads with len < NW_SMALL_READS.
|
||||
* This happens often on TLS connections where the TLS implemenation
|
||||
* tries to read the head of a TLS record, determine the length of the
|
||||
* full record and then make a subsequent read for that.
|
||||
* On large reads, we will not fill the buffer to avoid the double copy. */
|
||||
#define NW_RECV_CHUNK_SIZE (64 * 1024)
|
||||
#define NW_RECV_CHUNKS 1
|
||||
#define NW_SMALL_READS (1024)
|
||||
|
||||
struct cf_socket_ctx {
|
||||
int transport;
|
||||
struct Curl_sockaddr_ex addr; /* address to connect to */
|
||||
curl_socket_t sock; /* current attempt socket */
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
struct io_buffer recv_buffer;
|
||||
#endif
|
||||
struct bufq recvbuf; /* used when `buffer_recv` is set */
|
||||
char r_ip[MAX_IPADR_LEN]; /* remote IP as string */
|
||||
int r_port; /* remote port number */
|
||||
char l_ip[MAX_IPADR_LEN]; /* local IP as string */
|
||||
@ -763,6 +755,7 @@ struct cf_socket_ctx {
|
||||
BIT(got_first_byte); /* if first byte was received */
|
||||
BIT(accepted); /* socket was accepted, not connected */
|
||||
BIT(active);
|
||||
BIT(buffer_recv);
|
||||
};
|
||||
|
||||
static void cf_socket_ctx_init(struct cf_socket_ctx *ctx,
|
||||
@ -773,6 +766,56 @@ static void cf_socket_ctx_init(struct cf_socket_ctx *ctx,
|
||||
ctx->sock = CURL_SOCKET_BAD;
|
||||
ctx->transport = transport;
|
||||
Curl_sock_assign_addr(&ctx->addr, ai, transport);
|
||||
Curl_bufq_init(&ctx->recvbuf, NW_RECV_CHUNK_SIZE, NW_RECV_CHUNKS);
|
||||
}
|
||||
|
||||
struct reader_ctx {
|
||||
struct Curl_cfilter *cf;
|
||||
struct Curl_easy *data;
|
||||
};
|
||||
|
||||
static ssize_t nw_in_read(void *reader_ctx,
|
||||
unsigned char *buf, size_t len,
|
||||
CURLcode *err)
|
||||
{
|
||||
struct reader_ctx *rctx = reader_ctx;
|
||||
struct cf_socket_ctx *ctx = rctx->cf->ctx;
|
||||
ssize_t nread;
|
||||
|
||||
*err = CURLE_OK;
|
||||
nread = sread(ctx->sock, buf, len);
|
||||
|
||||
if(-1 == nread) {
|
||||
int sockerr = SOCKERRNO;
|
||||
|
||||
if(
|
||||
#ifdef WSAEWOULDBLOCK
|
||||
/* This is how Windows does it */
|
||||
(WSAEWOULDBLOCK == sockerr)
|
||||
#else
|
||||
/* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned
|
||||
due to its inability to send off data without blocking. We therefore
|
||||
treat both error codes the same here */
|
||||
(EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr)
|
||||
#endif
|
||||
) {
|
||||
/* this is just a case of EWOULDBLOCK */
|
||||
*err = CURLE_AGAIN;
|
||||
nread = -1;
|
||||
}
|
||||
else {
|
||||
char buffer[STRERROR_LEN];
|
||||
|
||||
failf(rctx->data, "Recv failure: %s",
|
||||
Curl_strerror(sockerr, buffer, sizeof(buffer)));
|
||||
rctx->data->state.os_errno = sockerr;
|
||||
*err = CURLE_RECV_ERROR;
|
||||
nread = -1;
|
||||
}
|
||||
}
|
||||
DEBUGF(LOG_CF(rctx->data, rctx->cf, "nw_in_read(len=%zu) -> %d, err=%d",
|
||||
len, (int)nread, *err));
|
||||
return nread;
|
||||
}
|
||||
|
||||
static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
@ -808,10 +851,9 @@ static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
sclose(ctx->sock);
|
||||
ctx->sock = CURL_SOCKET_BAD;
|
||||
}
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
io_buffer_reset(&ctx->recv_buffer);
|
||||
#endif
|
||||
Curl_bufq_reset(&ctx->recvbuf);
|
||||
ctx->active = FALSE;
|
||||
ctx->buffer_recv = FALSE;
|
||||
memset(&ctx->started_at, 0, sizeof(ctx->started_at));
|
||||
memset(&ctx->connected_at, 0, sizeof(ctx->connected_at));
|
||||
}
|
||||
@ -825,6 +867,7 @@ static void cf_socket_destroy(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
|
||||
cf_socket_close(cf, data);
|
||||
DEBUGF(LOG_CF(data, cf, "destroy"));
|
||||
Curl_bufq_free(&ctx->recvbuf);
|
||||
free(ctx);
|
||||
cf->ctx = NULL;
|
||||
}
|
||||
@ -1153,89 +1196,16 @@ static int cf_socket_get_select_socks(struct Curl_cfilter *cf,
|
||||
return rc;
|
||||
}
|
||||
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
|
||||
static CURLcode pre_receive_plain(struct Curl_cfilter *cf,
|
||||
struct Curl_easy *data)
|
||||
{
|
||||
struct cf_socket_ctx *ctx = cf->ctx;
|
||||
struct io_buffer * const iob = &ctx->recv_buffer;
|
||||
|
||||
/* WinSock will destroy unread received data if send() is
|
||||
failed.
|
||||
To avoid lossage of received data, recv() must be
|
||||
performed before every send() if any incoming data is
|
||||
available. However, skip this, if buffer is already full. */
|
||||
if((cf->conn->handler->protocol&PROTO_FAMILY_HTTP) != 0 &&
|
||||
cf->conn->recv[cf->sockindex] == Curl_conn_recv &&
|
||||
(!iob->bufr || (iob->allc > iob->tail))) {
|
||||
const int readymask = Curl_socket_check(ctx->sock, CURL_SOCKET_BAD,
|
||||
CURL_SOCKET_BAD, 0);
|
||||
if(readymask != -1 && (readymask & CURL_CSELECT_IN) != 0) {
|
||||
size_t bytestorecv = iob->allc - iob->tail;
|
||||
ssize_t nread;
|
||||
/* Have some incoming data */
|
||||
if(!iob->bufr) {
|
||||
/* Use buffer double default size for intermediate buffer */
|
||||
iob->allc = 2 * data->set.buffer_size;
|
||||
iob->bufr = malloc(iob->allc);
|
||||
if(!iob->bufr)
|
||||
return CURLE_OUT_OF_MEMORY;
|
||||
iob->tail = 0;
|
||||
iob->head = 0;
|
||||
bytestorecv = iob->allc;
|
||||
}
|
||||
|
||||
nread = sread(ctx->sock, iob->bufr + iob->tail, bytestorecv);
|
||||
if(nread > 0)
|
||||
iob->tail += (size_t)nread;
|
||||
}
|
||||
}
|
||||
return CURLE_OK;
|
||||
}
|
||||
|
||||
static ssize_t get_pre_recved(struct Curl_cfilter *cf, char *buf, size_t len)
|
||||
{
|
||||
struct cf_socket_ctx *ctx = cf->ctx;
|
||||
struct io_buffer * const iob = &ctx->recv_buffer;
|
||||
size_t copysize;
|
||||
if(!iob->bufr)
|
||||
return 0;
|
||||
|
||||
DEBUGASSERT(iob->allc > 0);
|
||||
DEBUGASSERT(iob->tail <= iob->allc);
|
||||
DEBUGASSERT(iob->head <= iob->tail);
|
||||
/* Check and process data that already received and storied in internal
|
||||
intermediate buffer */
|
||||
if(iob->tail > iob->head) {
|
||||
copysize = CURLMIN(len, iob->tail - iob->head);
|
||||
memcpy(buf, iob->bufr + iob->head, copysize);
|
||||
iob->head += copysize;
|
||||
}
|
||||
else
|
||||
copysize = 0; /* buffer was allocated, but nothing was received */
|
||||
|
||||
/* Free intermediate buffer if it has no unprocessed data */
|
||||
if(iob->head == iob->tail)
|
||||
io_buffer_reset(iob);
|
||||
|
||||
return (ssize_t)copysize;
|
||||
}
|
||||
#endif /* USE_RECV_BEFORE_SEND_WORKAROUND */
|
||||
|
||||
static bool cf_socket_data_pending(struct Curl_cfilter *cf,
|
||||
const struct Curl_easy *data)
|
||||
{
|
||||
struct cf_socket_ctx *ctx = cf->ctx;
|
||||
int readable;
|
||||
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
if(ctx->recv_buffer.bufr && ctx->recv_buffer.allc &&
|
||||
ctx->recv_buffer.tail > ctx->recv_buffer.head)
|
||||
return TRUE;
|
||||
#endif
|
||||
|
||||
(void)data;
|
||||
if(!Curl_bufq_is_empty(&ctx->recvbuf))
|
||||
return TRUE;
|
||||
|
||||
readable = SOCKET_READABLE(ctx->sock, 0);
|
||||
return (readable > 0 && (readable & CURL_CSELECT_IN));
|
||||
}
|
||||
@ -1247,20 +1217,21 @@ static ssize_t cf_socket_send(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
curl_socket_t fdsave;
|
||||
ssize_t nwritten;
|
||||
|
||||
*err = CURLE_OK;
|
||||
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
/* WinSock will destroy unread received data if send() is
|
||||
failed.
|
||||
To avoid lossage of received data, recv() must be
|
||||
performed before every send() if any incoming data is
|
||||
available. */
|
||||
if(pre_receive_plain(cf, data)) {
|
||||
*err = CURLE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
if(ctx->buffer_recv && !Curl_bufq_is_full(&ctx->recvbuf)) {
|
||||
nwritten = Curl_bufq_slurp(&ctx->recvbuf, nw_in_read, &rctx, err);
|
||||
if(nwritten < 0 && *err != CURLE_AGAIN) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
*err = CURLE_OK;
|
||||
fdsave = cf->conn->sock[cf->sockindex];
|
||||
cf->conn->sock[cf->sockindex] = ctx->sock;
|
||||
|
||||
@ -1317,47 +1288,50 @@ static ssize_t cf_socket_recv(struct Curl_cfilter *cf, struct Curl_easy *data,
|
||||
|
||||
*err = CURLE_OK;
|
||||
|
||||
#ifdef USE_RECV_BEFORE_SEND_WORKAROUND
|
||||
/* Check and return data that already received and storied in internal
|
||||
intermediate buffer */
|
||||
nread = get_pre_recved(cf, buf, len);
|
||||
if(nread > 0) {
|
||||
*err = CURLE_OK;
|
||||
return nread;
|
||||
}
|
||||
#endif
|
||||
|
||||
fdsave = cf->conn->sock[cf->sockindex];
|
||||
cf->conn->sock[cf->sockindex] = ctx->sock;
|
||||
|
||||
nread = sread(ctx->sock, buf, len);
|
||||
if(ctx->buffer_recv && !Curl_bufq_is_empty(&ctx->recvbuf)) {
|
||||
DEBUGF(LOG_CF(data, cf, "recv from buffer"));
|
||||
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
|
||||
}
|
||||
else {
|
||||
struct reader_ctx rctx;
|
||||
|
||||
if(-1 == nread) {
|
||||
int sockerr = SOCKERRNO;
|
||||
rctx.cf = cf;
|
||||
rctx.data = data;
|
||||
|
||||
if(
|
||||
#ifdef WSAEWOULDBLOCK
|
||||
/* This is how Windows does it */
|
||||
(WSAEWOULDBLOCK == sockerr)
|
||||
#else
|
||||
/* errno may be EWOULDBLOCK or on some systems EAGAIN when it returned
|
||||
due to its inability to send off data without blocking. We therefore
|
||||
treat both error codes the same here */
|
||||
(EWOULDBLOCK == sockerr) || (EAGAIN == sockerr) || (EINTR == sockerr)
|
||||
#endif
|
||||
) {
|
||||
/* this is just a case of EWOULDBLOCK */
|
||||
*err = CURLE_AGAIN;
|
||||
/* "small" reads may trigger filling our buffer, "large" reads
|
||||
* are probably not worth the additional copy */
|
||||
if(ctx->buffer_recv && len < NW_SMALL_READS) {
|
||||
ssize_t nwritten;
|
||||
nwritten = Curl_bufq_slurp(&ctx->recvbuf, nw_in_read, &rctx, err);
|
||||
if(nwritten < 0 && !Curl_bufq_is_empty(&ctx->recvbuf)) {
|
||||
/* we have a partial read with an error. need to deliver
|
||||
* what we got, return the error later. */
|
||||
DEBUGF(LOG_CF(data, cf, "partial read: empty buffer first"));
|
||||
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
|
||||
}
|
||||
else if(nwritten < 0) {
|
||||
nread = -1;
|
||||
goto out;
|
||||
}
|
||||
else if(nwritten == 0) {
|
||||
/* eof */
|
||||
*err = CURLE_OK;
|
||||
nread = 0;
|
||||
}
|
||||
else {
|
||||
DEBUGF(LOG_CF(data, cf, "buffered %zd additional bytes", nwritten));
|
||||
nread = Curl_bufq_read(&ctx->recvbuf, (unsigned char *)buf, len, err);
|
||||
}
|
||||
}
|
||||
else {
|
||||
char buffer[STRERROR_LEN];
|
||||
failf(data, "Recv failure: %s",
|
||||
Curl_strerror(sockerr, buffer, sizeof(buffer)));
|
||||
data->state.os_errno = sockerr;
|
||||
*err = CURLE_RECV_ERROR;
|
||||
nread = nw_in_read(&rctx, (unsigned char *)buf, len, err);
|
||||
}
|
||||
}
|
||||
|
||||
out:
|
||||
DEBUGF(LOG_CF(data, cf, "recv(len=%zu) -> %d, err=%d", len, (int)nread,
|
||||
*err));
|
||||
if(nread > 0 && !ctx->got_first_byte) {
|
||||
@ -1413,6 +1387,11 @@ static void cf_socket_active(struct Curl_cfilter *cf, struct Curl_easy *data)
|
||||
conn_set_primary_ip(cf, data);
|
||||
set_local_ip(cf, data);
|
||||
Curl_persistconninfo(data, cf->conn, ctx->l_ip, ctx->l_port);
|
||||
/* We buffer only for TCP transfers that do not install their own
|
||||
* read function. Those may still have expectations about socket
|
||||
* behaviours from the past. */
|
||||
ctx->buffer_recv = (ctx->transport == TRNSPRT_TCP &&
|
||||
(cf->conn->recv[cf->sockindex] == Curl_conn_recv));
|
||||
}
|
||||
ctx->active = TRUE;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user