unix: refactor uv_try_write

This simplifies the code, for better clarify (and performance)!

PR-URL: https://github.com/libuv/libuv/pull/2874
Reviewed-By: Jameson Nash <vtjnash@gmail.com>
This commit is contained in:
twosee 2021-05-11 07:44:31 +08:00 committed by GitHub
parent 614bdbc56f
commit 23bebf015b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -799,33 +799,21 @@ static int uv__handle_fd(uv_handle_t* handle) {
}
}
static void uv__write(uv_stream_t* stream) {
static int uv__try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle) {
struct iovec* iov;
QUEUE* q;
uv_write_t* req;
int iovmax;
int iovcnt;
ssize_t n;
int err;
start:
assert(uv__stream_fd(stream) >= 0);
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->nbufs - req->write_index;
iov = (struct iovec*) bufs;
iovcnt = nbufs;
iovmax = uv__getiovmax();
@ -837,8 +825,7 @@ start:
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
if (req->send_handle) {
if (send_handle != NULL) {
int fd_to_send;
struct msghdr msg;
struct cmsghdr *cmsg;
@ -847,12 +834,10 @@ start:
struct cmsghdr alias;
} scratch;
if (uv__is_closing(req->send_handle)) {
err = UV_EBADF;
goto error;
}
if (uv__is_closing(send_handle))
return UV_EBADF;
fd_to_send = uv__handle_fd((uv_handle_t*) req->send_handle);
fd_to_send = uv__handle_fd((uv_handle_t*) send_handle);
memset(&scratch, 0, sizeof(scratch));
@ -882,40 +867,65 @@ start:
do
n = sendmsg(uv__stream_fd(stream), &msg, 0);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0)
req->send_handle = NULL;
} else {
do
n = uv__writev(uv__stream_fd(stream), iov, iovcnt);
while (n == -1 && RETRY_ON_WRITE_ERROR(errno));
}
if (n == -1 && !IS_TRANSIENT_WRITE_ERROR(errno, req->send_handle)) {
err = UV__ERR(errno);
goto error;
if (n >= 0)
return n;
if (IS_TRANSIENT_WRITE_ERROR(errno, send_handle))
return UV_EAGAIN;
return UV__ERR(errno);
}
static void uv__write(uv_stream_t* stream) {
QUEUE* q;
uv_write_t* req;
ssize_t n;
assert(uv__stream_fd(stream) >= 0);
for (;;) {
if (QUEUE_EMPTY(&stream->write_queue))
return;
q = QUEUE_HEAD(&stream->write_queue);
req = QUEUE_DATA(q, uv_write_t, queue);
assert(req->handle == stream);
n = uv__try_write(stream,
&(req->bufs[req->write_index]),
req->nbufs - req->write_index,
req->send_handle);
/* Ensure the handle isn't sent again in case this is a partial write. */
if (n >= 0) {
req->send_handle = NULL;
if (uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
} else if (n != UV_EAGAIN)
break;
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
continue;
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
return;
}
if (n >= 0 && uv__write_req_update(stream, req, n)) {
uv__write_req_finish(req);
return; /* TODO(bnoordhuis) Start trying to write the next request. */
}
/* If this is a blocking stream, try again. */
if (stream->flags & UV_HANDLE_BLOCKING_WRITES)
goto start;
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
return;
error:
req->error = err;
req->error = n;
uv__write_req_finish(req);
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
if (!uv__io_active(&stream->io_watcher, POLLIN))
@ -1390,14 +1400,9 @@ static void uv__stream_connect(uv_stream_t* stream) {
}
int uv_write2(uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;
static int uv__check_before_write(uv_stream_t* stream,
unsigned int nbufs,
uv_stream_t* send_handle) {
assert(nbufs > 0);
assert((stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
@ -1410,7 +1415,7 @@ int uv_write2(uv_write_t* req,
if (!(stream->flags & UV_HANDLE_WRITABLE))
return UV_EPIPE;
if (send_handle) {
if (send_handle != NULL) {
if (stream->type != UV_NAMED_PIPE || !((uv_pipe_t*)stream)->ipc)
return UV_EINVAL;
@ -1430,6 +1435,22 @@ int uv_write2(uv_write_t* req,
#endif
}
return 0;
}
int uv_write2(uv_write_t* req,
uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs,
uv_stream_t* send_handle,
uv_write_cb cb) {
int empty_queue;
int err;
err = uv__check_before_write(stream, nbufs, send_handle);
if (err < 0)
return err;
/* It's legal for write_queue_size > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up write_queue_size later, see also uv__write_req_finish().
@ -1498,57 +1519,20 @@ int uv_write(uv_write_t* req,
}
void uv_try_write_cb(uv_write_t* req, int status) {
/* Should not be called */
abort();
}
int uv_try_write(uv_stream_t* stream,
const uv_buf_t bufs[],
unsigned int nbufs) {
int r;
int has_pollout;
size_t written;
size_t req_size;
uv_write_t req;
int err;
/* Connecting or already writing some data */
if (stream->connect_req != NULL || stream->write_queue_size != 0)
return UV_EAGAIN;
has_pollout = uv__io_active(&stream->io_watcher, POLLOUT);
err = uv__check_before_write(stream, nbufs, NULL);
if (err < 0)
return err;
r = uv_write(&req, stream, bufs, nbufs, uv_try_write_cb);
if (r != 0)
return r;
/* Remove not written bytes from write queue size */
written = uv__count_bufs(bufs, nbufs);
if (req.bufs != NULL)
req_size = uv__write_req_size(&req);
else
req_size = 0;
written -= req_size;
stream->write_queue_size -= req_size;
/* Unqueue request, regardless of immediateness */
QUEUE_REMOVE(&req.queue);
uv__req_unregister(stream->loop, &req);
if (req.bufs != req.bufsml)
uv__free(req.bufs);
req.bufs = NULL;
/* Do not poll for writable, if we wasn't before calling this */
if (!has_pollout) {
uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
if (written == 0 && req_size != 0)
return req.error < 0 ? req.error : UV_EAGAIN;
else
return written;
return uv__try_write(stream, bufs, nbufs, NULL);
}