diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 1b6d86df..72709c8a 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -91,8 +91,6 @@ typedef int uv_file; #define UV_STREAM_PRIVATE_FIELDS \ - uv_read_cb read_cb; \ - uv_alloc_cb alloc_cb; \ uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 6610e016..656e85df 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -132,8 +132,6 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_alloc_cb alloc_cb; \ - uv_read_cb read_cb; \ uv_req_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ diff --git a/include/uv.h b/include/uv.h index 747f3851..1358e4b9 100644 --- a/include/uv.h +++ b/include/uv.h @@ -338,6 +338,9 @@ uv_buf_t uv_buf_init(char* base, size_t len); #define UV_STREAM_FIELDS \ /* number of bytes queued for writing */ \ size_t write_queue_size; \ + uv_alloc_cb alloc_cb; \ + uv_read_cb read_cb; \ + uv_read2_cb read2_cb; \ /* private */ \ UV_STREAM_PRIVATE_FIELDS @@ -425,8 +428,8 @@ int uv_write2(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, struct uv_write_s { UV_REQ_FIELDS uv_write_cb cb; - uv_stream_t* handle; uv_stream_t* send_handle; + uv_stream_t* handle; UV_WRITE_PRIVATE_FIELDS }; diff --git a/src/unix/stream.c b/src/unix/stream.c index 2cd98867..a2580888 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -355,6 +355,8 @@ static void uv__write(uv_stream_t* stream) { struct cmsghdr *cmsg; int fd_to_send = req->send_handle->fd; + assert(fd_to_send >= 0); + msg.msg_name = NULL; msg.msg_namelen = 0; msg.msg_iov = iov; @@ -778,7 +780,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -810,6 +813,16 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) } +int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read2_cb read_cb) { + int r; + r = uv_read_start(stream, alloc_cb, NULL); + assert(stream->read_cb == NULL); + stream->read2_cb = read_cb; + return r; +} + + int uv_read_stop(uv_stream_t* stream) { uv_tcp_t* tcp = (uv_tcp_t*)stream; @@ -817,6 +830,7 @@ int uv_read_stop(uv_stream_t* stream) { ev_io_stop(tcp->loop->ev, &tcp->read_watcher); tcp->read_cb = NULL; + tcp->read2_cb = NULL; tcp->alloc_cb = NULL; return 0; } diff --git a/test/run-tests.c b/test/run-tests.c index 4595d4df..f80dfbcb 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -83,7 +83,8 @@ static int ipc_helper() { ASSERT(r == 0); buf = uv_buf_init("hello\n", 6); - r = uv_write(&write_req, (uv_stream_t*)&channel, &buf, 1, NULL); + r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1, + (uv_stream_t*)&server, NULL); ASSERT(r == 0); r = uv_run(uv_default_loop()); diff --git a/test/test-ipc.c b/test/test-ipc.c index 26c84ff7..c16f9ce7 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -29,11 +29,18 @@ static char exepath[1024]; static size_t exepath_size = 1024; static char* args[3]; static uv_pipe_t channel; +static uv_tcp_t tcp_server; static int exit_cb_called; +static int read2_cb_called; static uv_write_t write_req; +static void ipc_on_connection(uv_stream_t* server, int status) { + ASSERT(status == 0); + ASSERT((uv_stream_t*)&tcp_server == server); +} + static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { printf("exit_cb\n"); @@ -49,22 +56,40 @@ static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { } -static void on_read(uv_stream_t* pipe, ssize_t nread, uv_buf_t buf) { +static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, + uv_handle_type pending) { int r; uv_buf_t outbuf; - /* listen on the handle provided.... */ - if (nread) { - outbuf = uv_buf_init("world\n", 6); - r = uv_write(&write_req, pipe, &outbuf, 1, NULL); - ASSERT(r == 0); - - fprintf(stderr, "got %d bytes\n", (int)nread); - } - - if (buf.base) { + if (nread == 0) { + /* Everything OK, but nothing read. */ free(buf.base); + return; } + + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); + read2_cb_called++; + + /* Accept the pending TCP server, and start listening on it. */ + ASSERT(pending == UV_TCP); + r = uv_tcp_init(uv_default_loop(), &tcp_server); + ASSERT(r == 0); + + r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server); + ASSERT(r == 0); + + r = uv_listen((uv_stream_t*)&tcp_server, 12, ipc_on_connection); + ASSERT(r == 0); + + /* Make sure that the expected data is correctly multiplexed. */ + ASSERT(memcmp("hello\n", buf.base, buf.len) == 0); + fprintf(stderr, "got %d bytes\n", (int)nread); + + outbuf = uv_buf_init("world\n", 6); + r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL); + ASSERT(r == 0); + + free(buf.base); } @@ -92,11 +117,12 @@ TEST_IMPL(ipc) { r = uv_spawn(uv_default_loop(), &process, options); ASSERT(r == 0); - uv_read_start((uv_stream_t*)&channel, on_alloc, on_read); + uv_read2_start((uv_stream_t*)&channel, on_alloc, on_read); r = uv_run(uv_default_loop()); ASSERT(r == 0); + ASSERT(read2_cb_called == 1); ASSERT(exit_cb_called == 1); return 0; } diff --git a/test/test-spawn.c b/test/test-spawn.c index 5bb6ed62..238e6c9c 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -229,7 +229,7 @@ TEST_IMPL(spawn_detect_pipe_name_collisions_on_windows) { init_process_options("spawn_helper2", exit_cb); - uv_pipe_init(uv_default_loop(), &out); + uv_pipe_init(uv_default_loop(), &out, 0); options.stdout_stream = &out; /* Create a pipe that'll cause a collision. */ diff --git a/uv.gyp b/uv.gyp index e85927aa..0db92a93 100644 --- a/uv.gyp +++ b/uv.gyp @@ -260,6 +260,7 @@ 'test/test-getsockname.c', 'test/test-hrtime.c', 'test/test-idle.c', + 'test/test-ipc.c', 'test/test-list.h', 'test/test-loop-handles.c', 'test/test-pass-always.c',