unix: implement uv_stream_set_blocking()

Commit 393c1c5 ("unix: set non-block mode in uv_{pipe,tcp,udp}_open")
causes a regression in the io.js cluster module.

The io.js documentation states that `worker.send()` and `process.send()`
are synchronous but they no longer were after upgrading to libuv v1.2.1.

The reason they are synchronous is because of backpressure - or rather,
lack of backpressure: a slow consumer eventually causes a fast producer
to run out of memory because the backlog of pending messages in the
producer can grow unchecked.

Ergo, implement uv_stream_set_blocking() on UNIX platforms to let io.js
enable the old blocking behavior for pipes again.

Refs: https://github.com/iojs/io.js/issues/760
PR-URL: https://github.com/libuv/libuv/pull/187
Reviewed-By: Saúl Ibarra Corretgé <saghul@gmail.com>
This commit is contained in:
Ben Noordhuis 2015-02-09 12:53:52 +01:00
parent e5bdea8ed6
commit b36d4ff930
6 changed files with 111 additions and 3 deletions

View File

@ -189,6 +189,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
test/test-pipe-sendmsg.c \
test/test-pipe-server-close.c \
test/test-pipe-close-stdout-read-stdin.c \
test/test-pipe-set-non-blocking.c \
test/test-platform-output.c \
test/test-poll-close.c \
test/test-poll-close-doesnt-corrupt-stack.c \

View File

@ -206,12 +206,14 @@ API
Relying too much on this API is not recommended. It is likely to change
significantly in the future.
Currently this only works on Windows and only for
:c:type:`uv_pipe_t` handles.
Currently only works on Windows for :c:type:`uv_pipe_t` handles.
On UNIX platforms, all :c:type:`uv_stream_t` handles are supported.
Also libuv currently makes no ordering guarantee when the blocking mode
is changed after write requests have already been submitted. Therefore it is
recommended to set the blocking mode immediately after opening or creating
the stream.
.. versionchanged:: 1.4.0 UNIX implementation added.
.. seealso:: The :c:type:`uv_handle_t` API functions also apply.

View File

@ -1573,5 +1573,8 @@ void uv__stream_close(uv_stream_t* handle) {
int uv_stream_set_blocking(uv_stream_t* handle, int blocking) {
return UV_ENOSYS;
/* Don't need to check the file descriptor, uv__nonblock()
* will fail with EBADF if it's not valid.
*/
return uv__nonblock(uv__stream_fd(handle), !blocking);
}

View File

@ -167,6 +167,7 @@ TEST_DECLARE (pipe_ref4)
#ifndef _WIN32
TEST_DECLARE (pipe_close_stdout_read_stdin)
#endif
TEST_DECLARE (pipe_set_non_blocking)
TEST_DECLARE (process_ref)
TEST_DECLARE (has_ref)
TEST_DECLARE (active)
@ -339,6 +340,7 @@ TASK_LIST_START
#ifndef _WIN32
TEST_ENTRY (pipe_close_stdout_read_stdin)
#endif
TEST_ENTRY (pipe_set_non_blocking)
TEST_ENTRY (tty)
TEST_ENTRY (stdio_over_pipes)
TEST_ENTRY (ip6_pton)

View File

@ -0,0 +1,99 @@
/* Copyright (c) 2015, Ben Noordhuis <info@bnoordhuis.nl>
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#ifdef _WIN32
TEST_IMPL(pipe_set_non_blocking) {
RETURN_SKIP("Test not implemented on Windows.");
}
#else /* !_WIN32 */
#include <errno.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
struct thread_ctx {
uv_barrier_t barrier;
int fd;
};
static void thread_main(void* arg) {
struct thread_ctx* ctx;
char buf[4096];
ssize_t n;
ctx = arg;
uv_barrier_wait(&ctx->barrier);
do
n = read(ctx->fd, buf, sizeof(buf));
while (n > 0 || (n == -1 && errno == EINTR));
ASSERT(n == 0);
}
TEST_IMPL(pipe_set_non_blocking) {
struct thread_ctx ctx;
uv_pipe_t pipe_handle;
uv_thread_t thread;
size_t nwritten;
char data[4096];
uv_buf_t buf;
int fd[2];
int n;
ASSERT(0 == uv_pipe_init(uv_default_loop(), &pipe_handle, 0));
ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, fd));
ASSERT(0 == uv_pipe_open(&pipe_handle, fd[0]));
ASSERT(0 == uv_stream_set_blocking((uv_stream_t*) &pipe_handle, 1));
ctx.fd = fd[1];
ASSERT(0 == uv_barrier_init(&ctx.barrier, 2));
ASSERT(0 == uv_thread_create(&thread, thread_main, &ctx));
uv_barrier_wait(&ctx.barrier);
buf.len = sizeof(data);
buf.base = data;
memset(data, '.', sizeof(data));
nwritten = 0;
while (nwritten < 10 << 20) {
/* The stream is in blocking mode so uv_try_write() should always succeed
* with the exact number of bytes that we wanted written.
*/
n = uv_try_write((uv_stream_t*) &pipe_handle, &buf, 1);
ASSERT(n == sizeof(data));
nwritten += n;
}
uv_close((uv_handle_t*) &pipe_handle, NULL);
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
ASSERT(0 == close(fd[1])); /* fd[0] is closed by uv_close(). */
ASSERT(0 == uv_thread_join(&thread));
uv_barrier_destroy(&ctx.barrier);
MAKE_VALGRIND_HAPPY();
return 0;
}
#endif /* !_WIN32 */

1
uv.gyp
View File

@ -329,6 +329,7 @@
'test/test-pipe-sendmsg.c',
'test/test-pipe-server-close.c',
'test/test-pipe-close-stdout-read-stdin.c',
'test/test-pipe-set-non-blocking.c',
'test/test-platform-output.c',
'test/test-poll.c',
'test/test-poll-close.c',