From b5e7798a89411a732e31b436731e17dbc94505d3 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Fri, 6 Dec 2013 19:10:47 +0400 Subject: [PATCH] stream: introduce uv_try_write(...) `uv_try_write(stream, buf, size)` acts like `uv_write()`, but without queueing actual write until UV_POLLOUT (or IOCP completion). This is useful for doing writes using on-stack `uv_write_t` requests. fix #1025 --- Makefile.am | 1 + include/uv.h | 10 +++ src/unix/stream.c | 49 +++++++++++++ src/win/stream.c | 6 ++ test/test-list.h | 3 + test/test-tcp-try-write.c | 143 ++++++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 7 files changed, 213 insertions(+) create mode 100644 test/test-tcp-try-write.c diff --git a/Makefile.am b/Makefile.am index 4e41a6ef..2229e86f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -184,6 +184,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-tcp-unexpected-read.c \ test/test-tcp-write-to-half-open-connection.c \ test/test-tcp-writealot.c \ + test/test-tcp-try-write.c \ test/test-thread.c \ test/test-threadpool-cancel.c \ test/test-threadpool.c \ diff --git a/include/uv.h b/include/uv.h index 7f34ef20..d6485e52 100644 --- a/include/uv.h +++ b/include/uv.h @@ -673,6 +673,16 @@ UV_EXTERN int uv_write2(uv_write_t* req, uv_stream_t* send_handle, uv_write_cb cb); +/* + * Same as `uv_write()`, but won't queue write request if it can't be completed + * immediately. + * Will return either: + * - positive number of bytes written + * - zero - if queued write is needed + * - negative error code + */ +UV_EXTERN int uv_try_write(uv_stream_t* handle, const char* buf, size_t length); + /* uv_write_t is a subclass of uv_req_t */ struct uv_write_s { UV_REQ_FIELDS diff --git a/src/unix/stream.c b/src/unix/stream.c index 8fa8066e..afd2a051 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -1299,6 +1299,55 @@ int uv_write(uv_write_t* req, } +void uv_try_write_cb(uv_write_t* req, int status) { + /* Should not be called */ + abort(); +} + + +int uv_try_write(uv_stream_t* stream, const char* buf, size_t size) { + int r; + int has_pollout; + size_t written; + size_t req_size; + uv_write_t req; + uv_buf_t bufstruct; + + /* Connecting or already writing some data */ + if (stream->connect_req != NULL || stream->write_queue_size != 0) + return 0; + + has_pollout = uv__io_active(&stream->io_watcher, UV__POLLOUT); + + bufstruct = uv_buf_init((char*) buf, size); + r = uv_write(&req, stream, &bufstruct, 1, uv_try_write_cb); + if (r != 0) + return r; + + /* Remove not written bytes from write queue size */ + written = size; + if (req.bufs != NULL) + req_size = uv__write_req_size(&req); + else + req_size = 0; + written -= req_size; + stream->write_queue_size -= req_size; + + /* Unqueue request, regardless of immediateness */ + QUEUE_REMOVE(&req.queue); + uv__req_unregister(stream->loop, &req); + if (req.bufs != req.bufsml) + free(req.bufs); + req.bufs = NULL; + + /* Do not poll for writable, if we wasn't before calling this */ + if (!has_pollout) + uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); + + return (int) written; +} + + static int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb, diff --git a/src/win/stream.c b/src/win/stream.c index 5c792f25..da62883d 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -202,6 +202,12 @@ int uv_write2(uv_write_t* req, } +int uv_try_write(uv_stream_t* handle, const char* buf, size_t length) { + /* NOTE: Won't work with overlapped writes */ + return UV_ENOSYS; +} + + int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { uv_loop_t* loop = handle->loop; diff --git a/test/test-list.h b/test/test-list.h index 4be44bbc..f744a205 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -52,6 +52,7 @@ TEST_DECLARE (pipe_ping_pong) TEST_DECLARE (delayed_accept) TEST_DECLARE (multiple_listen) TEST_DECLARE (tcp_writealot) +TEST_DECLARE (tcp_try_write) TEST_DECLARE (tcp_open) TEST_DECLARE (tcp_connect_error_after_write) TEST_DECLARE (tcp_shutdown_after_write) @@ -294,6 +295,8 @@ TASK_LIST_START TEST_ENTRY (tcp_writealot) TEST_HELPER (tcp_writealot, tcp4_echo_server) + TEST_ENTRY (tcp_try_write) + TEST_ENTRY (tcp_open) TEST_HELPER (tcp_open, tcp4_echo_server) diff --git a/test/test-tcp-try-write.c b/test/test-tcp-try-write.c new file mode 100644 index 00000000..3fd61660 --- /dev/null +++ b/test/test-tcp-try-write.c @@ -0,0 +1,143 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#define MAX_BYTES 1024 * 1024 + +#ifdef _WIN32 + +TEST_IMPL(tcp_try_write) { + + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#else /* !_WIN32 */ + +static uv_tcp_t server; +static uv_tcp_t client; +static uv_tcp_t incoming; +static int connect_cb_called; +static int close_cb_called; +static int connection_cb_called; +static int bytes_read; +static int bytes_written; + + +static void close_cb(uv_handle_t* handle) { + close_cb_called++; +} + + +static void connect_cb(uv_connect_t* req, int status) { + static char zeroes[1024]; + int r; + uv_buf_t buf; + ASSERT(status == 0); + connect_cb_called++; + + do { + r = uv_try_write((uv_stream_t*) &client, zeroes, sizeof(zeroes)); + ASSERT(r >= 0); + bytes_written += r; + + /* Partial write */ + if (r != (int) sizeof(zeroes)) + break; + } while (1); + uv_close((uv_handle_t*) &client, close_cb); +} + + +static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) { + static char base[1024]; + + buf->base = base; + buf->len = sizeof(base); +} + + +static void read_cb(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) { + if (nread < 0) { + uv_close((uv_handle_t*) tcp, close_cb); + uv_close((uv_handle_t*) &server, close_cb); + return; + } + + bytes_read += nread; +} + + +static void connection_cb(uv_stream_t* tcp, int status) { + ASSERT(status == 0); + + ASSERT(0 == uv_tcp_init(tcp->loop, &incoming)); + ASSERT(0 == uv_accept(tcp, (uv_stream_t*) &incoming)); + + connection_cb_called++; + ASSERT(0 == uv_read_start((uv_stream_t*) &incoming, alloc_cb, read_cb)); +} + + +static void start_server(void) { + struct sockaddr_in addr; + + ASSERT(0 == uv_ip4_addr("0.0.0.0", TEST_PORT, &addr)); + + ASSERT(0 == uv_tcp_init(uv_default_loop(), &server)); + ASSERT(0 == uv_tcp_bind(&server, (struct sockaddr*) &addr)); + ASSERT(0 == uv_listen((uv_stream_t*) &server, 128, connection_cb)); +} + + +TEST_IMPL(tcp_try_write) { + uv_connect_t connect_req; + struct sockaddr_in addr; + + start_server(); + + ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + ASSERT(0 == uv_tcp_init(uv_default_loop(), &client)); + ASSERT(0 == uv_tcp_connect(&connect_req, + &client, + (struct sockaddr*) &addr, + connect_cb)); + + ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT)); + + ASSERT(connect_cb_called == 1); + ASSERT(close_cb_called == 3); + ASSERT(connection_cb_called == 1); + ASSERT(bytes_read == bytes_written); + ASSERT(bytes_written > 0); + + MAKE_VALGRIND_HAPPY(); + return 0; +} + +#endif /* !_WIN32 */ diff --git a/uv.gyp b/uv.gyp index 712e59f1..962efacf 100644 --- a/uv.gyp +++ b/uv.gyp @@ -362,6 +362,7 @@ 'test/test-tcp-open.c', 'test/test-tcp-write-to-half-open-connection.c', 'test/test-tcp-writealot.c', + 'test/test-tcp-try-write.c', 'test/test-tcp-unexpected-read.c', 'test/test-tcp-read-stop.c', 'test/test-threadpool.c',