This is an attempt to fix some resource management issues on Windows. Win32 sockets have an issue where it sends an RST packet if there is an outstanding overlapped calls. We can avoid that by being certain to explicitly cancel our read and write requests first. This also removes some conditional cleanup code, since we might as well clean it up eagerly (like unix). Otherwise, it looks to me like these might cause the accept callbacks to be run after the endgame had freed the memory for them. The comment here seems mixed up between send and recv buffers. The default behavior on calling `closesocket` is already to do a graceful shutdown (see https://docs.microsoft.com/en-us/windows/win32/api/winsock/nf-winsock-closesocket with default l_onoff=zero) if it is the last open handle. The expected behavior if there are pending reads in flight is to send an RST packet, notifying the client that the server connection was destroyed before acknowledging the EOF. Additionally, we need to cancel writes explicitly: we need to notify Win32 that it is okay to cancel these writes (so it doesn't also generate an RST packet on the wire). Refs: https://github.com/libuv/libuv/pull/3035 Refs: https://github.com/nodejs/node/pull/35946 Refs: https://github.com/nodejs/node/issues/35904 Fixes: https://github.com/libuv/libuv/issues/3034 PR-URL: https://github.com/libuv/libuv/pull/3036 Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
985 lines
24 KiB
C
985 lines
24 KiB
C
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
|
*
|
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
* of this software and associated documentation files (the "Software"), to
|
|
* deal in the Software without restriction, including without limitation the
|
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
* sell copies of the Software, and to permit persons to whom the Software is
|
|
* furnished to do so, subject to the following conditions:
|
|
*
|
|
* The above copyright notice and this permission notice shall be included in
|
|
* all copies or substantial portions of the Software.
|
|
*
|
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
* IN THE SOFTWARE.
|
|
*/
|
|
|
|
#include "uv.h"
|
|
#include "task.h"
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
|
|
static uv_pipe_t channel;
|
|
static uv_tcp_t tcp_server;
|
|
static uv_tcp_t tcp_server2;
|
|
static uv_tcp_t tcp_connection;
|
|
|
|
static int exit_cb_called;
|
|
static int read_cb_called;
|
|
static int tcp_write_cb_called;
|
|
static int tcp_read_cb_called;
|
|
static int on_pipe_read_called;
|
|
static int local_conn_accepted;
|
|
static int remote_conn_accepted;
|
|
static int tcp_server_listening;
|
|
static uv_write_t write_req;
|
|
static uv_write_t write_req2;
|
|
static uv_write_t conn_notify_req;
|
|
static int close_cb_called;
|
|
static int connection_accepted;
|
|
static int tcp_conn_read_cb_called;
|
|
static int tcp_conn_write_cb_called;
|
|
static int closed_handle_data_read;
|
|
static int closed_handle_write;
|
|
static int send_zero_write;
|
|
|
|
typedef struct {
|
|
uv_connect_t conn_req;
|
|
uv_write_t tcp_write_req;
|
|
uv_tcp_t conn;
|
|
} tcp_conn;
|
|
|
|
#define CONN_COUNT 100
|
|
#define BACKLOG 128
|
|
#define LARGE_SIZE 100000
|
|
|
|
static uv_buf_t large_buf;
|
|
static char buffer[LARGE_SIZE];
|
|
static uv_write_t write_reqs[300];
|
|
static int write_reqs_completed;
|
|
|
|
static unsigned int write_until_data_queued(void);
|
|
static void send_handle_and_close(void);
|
|
|
|
|
|
static void close_server_conn_cb(uv_handle_t* handle) {
|
|
free(handle);
|
|
}
|
|
|
|
|
|
static void on_connection(uv_stream_t* server, int status) {
|
|
uv_tcp_t* conn;
|
|
int r;
|
|
|
|
if (!local_conn_accepted) {
|
|
/* Accept the connection and close it. Also and close the server. */
|
|
ASSERT_EQ(status, 0);
|
|
ASSERT_PTR_EQ(&tcp_server, server);
|
|
|
|
conn = malloc(sizeof(*conn));
|
|
ASSERT_NOT_NULL(conn);
|
|
r = uv_tcp_init(server->loop, conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept(server, (uv_stream_t*)conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_close((uv_handle_t*)conn, close_server_conn_cb);
|
|
uv_close((uv_handle_t*)server, NULL);
|
|
local_conn_accepted = 1;
|
|
}
|
|
}
|
|
|
|
|
|
static void exit_cb(uv_process_t* process,
|
|
int64_t exit_status,
|
|
int term_signal) {
|
|
printf("exit_cb\n");
|
|
exit_cb_called++;
|
|
ASSERT_EQ(exit_status, 0);
|
|
ASSERT_EQ(term_signal, 0);
|
|
uv_close((uv_handle_t*)process, NULL);
|
|
}
|
|
|
|
|
|
static void on_alloc(uv_handle_t* handle,
|
|
size_t suggested_size,
|
|
uv_buf_t* buf) {
|
|
buf->base = malloc(suggested_size);
|
|
buf->len = suggested_size;
|
|
}
|
|
|
|
|
|
static void close_client_conn_cb(uv_handle_t* handle) {
|
|
tcp_conn* p = (tcp_conn*)handle->data;
|
|
free(p);
|
|
}
|
|
|
|
|
|
static void connect_cb(uv_connect_t* req, int status) {
|
|
uv_close((uv_handle_t*)req->handle, close_client_conn_cb);
|
|
}
|
|
|
|
|
|
static void make_many_connections(void) {
|
|
tcp_conn* conn;
|
|
struct sockaddr_in addr;
|
|
int r, i;
|
|
|
|
for (i = 0; i < CONN_COUNT; i++) {
|
|
conn = malloc(sizeof(*conn));
|
|
ASSERT_NOT_NULL(conn);
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &conn->conn);
|
|
ASSERT_EQ(r, 0);
|
|
ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_connect(&conn->conn_req,
|
|
(uv_tcp_t*) &conn->conn,
|
|
(const struct sockaddr*) &addr,
|
|
connect_cb);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
conn->conn.data = conn;
|
|
}
|
|
}
|
|
|
|
|
|
static void on_read(uv_stream_t* handle,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
int r;
|
|
uv_pipe_t* pipe;
|
|
uv_handle_type pending;
|
|
uv_buf_t outbuf;
|
|
|
|
pipe = (uv_pipe_t*) handle;
|
|
|
|
if (nread == 0) {
|
|
/* Everything OK, but nothing read. */
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
if (nread < 0) {
|
|
if (nread == UV_EOF) {
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
printf("error recving on channel: %s\n", uv_strerror(nread));
|
|
abort();
|
|
}
|
|
|
|
fprintf(stderr, "got %d bytes\n", (int)nread);
|
|
|
|
pending = uv_pipe_pending_type(pipe);
|
|
if (!tcp_server_listening) {
|
|
ASSERT_EQ(1, uv_pipe_pending_count(pipe));
|
|
ASSERT_GT(nread, 0);
|
|
ASSERT_NOT_NULL(buf->base);
|
|
ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
|
|
read_cb_called++;
|
|
|
|
/* Accept the pending TCP server, and start listening on it. */
|
|
ASSERT_EQ(pending, UV_TCP);
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
tcp_server_listening = 1;
|
|
|
|
/* Make sure that the expected data is correctly multiplexed. */
|
|
ASSERT_MEM_EQ("hello\n", buf->base, nread);
|
|
|
|
outbuf = uv_buf_init("foobar\n", 7);
|
|
r = uv_write(&write_req, (uv_stream_t*)pipe, &outbuf, 1, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
/* Create a bunch of connections to get both servers to accept. */
|
|
make_many_connections();
|
|
} else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
|
|
/* Remote server has accepted a connection. Close the channel. */
|
|
ASSERT_EQ(0, uv_pipe_pending_count(pipe));
|
|
ASSERT_EQ(pending, UV_UNKNOWN_HANDLE);
|
|
remote_conn_accepted = 1;
|
|
uv_close((uv_handle_t*)&channel, NULL);
|
|
}
|
|
|
|
free(buf->base);
|
|
}
|
|
|
|
#ifdef _WIN32
|
|
static void on_read_listen_after_bound_twice(uv_stream_t* handle,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
int r;
|
|
uv_pipe_t* pipe;
|
|
uv_handle_type pending;
|
|
|
|
pipe = (uv_pipe_t*) handle;
|
|
|
|
if (nread == 0) {
|
|
/* Everything OK, but nothing read. */
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
if (nread < 0) {
|
|
if (nread == UV_EOF) {
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
printf("error recving on channel: %s\n", uv_strerror(nread));
|
|
abort();
|
|
}
|
|
|
|
fprintf(stderr, "got %d bytes\n", (int)nread);
|
|
|
|
ASSERT_GT(uv_pipe_pending_count(pipe), 0);
|
|
pending = uv_pipe_pending_type(pipe);
|
|
ASSERT_GT(nread, 0);
|
|
ASSERT_NOT_NULL(buf->base);
|
|
ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
|
|
read_cb_called++;
|
|
|
|
if (read_cb_called == 1) {
|
|
/* Accept the first TCP server, and start listening on it. */
|
|
ASSERT_EQ(pending, UV_TCP);
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, on_connection);
|
|
ASSERT_EQ(r, 0);
|
|
} else if (read_cb_called == 2) {
|
|
/* Accept the second TCP server, and start listening on it. */
|
|
ASSERT_EQ(pending, UV_TCP);
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server2);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_server2);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&tcp_server2, BACKLOG, on_connection);
|
|
ASSERT_EQ(r, UV_EADDRINUSE);
|
|
|
|
uv_close((uv_handle_t*)&tcp_server, NULL);
|
|
uv_close((uv_handle_t*)&tcp_server2, NULL);
|
|
ASSERT_EQ(0, uv_pipe_pending_count(pipe));
|
|
uv_close((uv_handle_t*)&channel, NULL);
|
|
}
|
|
|
|
free(buf->base);
|
|
}
|
|
#endif
|
|
|
|
void spawn_helper(uv_pipe_t* channel,
|
|
uv_process_t* process,
|
|
const char* helper) {
|
|
uv_process_options_t options;
|
|
size_t exepath_size;
|
|
char exepath[1024];
|
|
char* args[3];
|
|
int r;
|
|
uv_stdio_container_t stdio[3];
|
|
|
|
r = uv_pipe_init(uv_default_loop(), channel, 1);
|
|
ASSERT_EQ(r, 0);
|
|
ASSERT_NE(channel->ipc, 0);
|
|
|
|
exepath_size = sizeof(exepath);
|
|
r = uv_exepath(exepath, &exepath_size);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
exepath[exepath_size] = '\0';
|
|
args[0] = exepath;
|
|
args[1] = (char*)helper;
|
|
args[2] = NULL;
|
|
|
|
memset(&options, 0, sizeof(options));
|
|
options.file = exepath;
|
|
options.args = args;
|
|
options.exit_cb = exit_cb;
|
|
options.stdio = stdio;
|
|
options.stdio_count = ARRAY_SIZE(stdio);
|
|
|
|
stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
|
|
stdio[0].data.stream = (uv_stream_t*) channel;
|
|
stdio[1].flags = UV_INHERIT_FD;
|
|
stdio[1].data.fd = 1;
|
|
stdio[2].flags = UV_INHERIT_FD;
|
|
stdio[2].data.fd = 2;
|
|
|
|
r = uv_spawn(uv_default_loop(), process, &options);
|
|
ASSERT_EQ(r, 0);
|
|
}
|
|
|
|
|
|
static void on_tcp_write(uv_write_t* req, int status) {
|
|
ASSERT_EQ(status, 0);
|
|
ASSERT_PTR_EQ(req->handle, &tcp_connection);
|
|
tcp_write_cb_called++;
|
|
}
|
|
|
|
|
|
static void on_read_alloc(uv_handle_t* handle,
|
|
size_t suggested_size,
|
|
uv_buf_t* buf) {
|
|
buf->base = malloc(suggested_size);
|
|
buf->len = suggested_size;
|
|
}
|
|
|
|
|
|
static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
|
|
ASSERT_GT(nread, 0);
|
|
ASSERT_MEM_EQ("hello again\n", buf->base, nread);
|
|
ASSERT_PTR_EQ(tcp, &tcp_connection);
|
|
free(buf->base);
|
|
|
|
tcp_read_cb_called++;
|
|
|
|
uv_close((uv_handle_t*)tcp, NULL);
|
|
uv_close((uv_handle_t*)&channel, NULL);
|
|
}
|
|
|
|
|
|
static void on_read_connection(uv_stream_t* handle,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
int r;
|
|
uv_buf_t outbuf;
|
|
uv_pipe_t* pipe;
|
|
uv_handle_type pending;
|
|
|
|
pipe = (uv_pipe_t*) handle;
|
|
if (nread == 0) {
|
|
/* Everything OK, but nothing read. */
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
if (nread < 0) {
|
|
if (nread == UV_EOF) {
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
printf("error recving on channel: %s\n", uv_strerror(nread));
|
|
abort();
|
|
}
|
|
|
|
fprintf(stderr, "got %d bytes\n", (int)nread);
|
|
|
|
ASSERT_EQ(1, uv_pipe_pending_count(pipe));
|
|
pending = uv_pipe_pending_type(pipe);
|
|
|
|
ASSERT_GT(nread, 0);
|
|
ASSERT_NOT_NULL(buf->base);
|
|
ASSERT_NE(pending, UV_UNKNOWN_HANDLE);
|
|
read_cb_called++;
|
|
|
|
/* Accept the pending TCP connection */
|
|
ASSERT_EQ(pending, UV_TCP);
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_connection);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
/* Make sure that the expected data is correctly multiplexed. */
|
|
ASSERT_MEM_EQ("hello\n", buf->base, nread);
|
|
|
|
/* Write/read to/from the connection */
|
|
outbuf = uv_buf_init("world\n", 6);
|
|
r = uv_write(&write_req, (uv_stream_t*)&tcp_connection, &outbuf, 1,
|
|
on_tcp_write);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_read_start((uv_stream_t*)&tcp_connection, on_read_alloc, on_tcp_read);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
free(buf->base);
|
|
}
|
|
|
|
|
|
#ifndef _WIN32
|
|
static void on_read_closed_handle(uv_stream_t* handle,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
if (nread == 0 || nread == UV_EOF) {
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
if (nread < 0) {
|
|
printf("error recving on channel: %s\n", uv_strerror(nread));
|
|
abort();
|
|
}
|
|
|
|
closed_handle_data_read += nread;
|
|
free(buf->base);
|
|
}
|
|
#endif
|
|
|
|
|
|
static void on_read_send_zero(uv_stream_t* handle,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
ASSERT(nread == 0 || nread == UV_EOF);
|
|
free(buf->base);
|
|
}
|
|
|
|
|
|
static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
|
|
uv_process_t process;
|
|
int r;
|
|
|
|
spawn_helper(&channel, &process, helper);
|
|
uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
|
|
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
|
|
TEST_IMPL(ipc_listen_before_write) {
|
|
#if defined(NO_SEND_HANDLE_ON_PIPE)
|
|
RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
|
|
#endif
|
|
int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
|
|
ASSERT_EQ(local_conn_accepted, 1);
|
|
ASSERT_EQ(remote_conn_accepted, 1);
|
|
ASSERT_EQ(read_cb_called, 1);
|
|
ASSERT_EQ(exit_cb_called, 1);
|
|
return r;
|
|
}
|
|
|
|
|
|
TEST_IMPL(ipc_listen_after_write) {
|
|
#if defined(NO_SEND_HANDLE_ON_PIPE)
|
|
RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
|
|
#endif
|
|
int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
|
|
ASSERT_EQ(local_conn_accepted, 1);
|
|
ASSERT_EQ(remote_conn_accepted, 1);
|
|
ASSERT_EQ(read_cb_called, 1);
|
|
ASSERT_EQ(exit_cb_called, 1);
|
|
return r;
|
|
}
|
|
|
|
|
|
TEST_IMPL(ipc_tcp_connection) {
|
|
#if defined(NO_SEND_HANDLE_ON_PIPE)
|
|
RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
|
|
#endif
|
|
int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
|
|
ASSERT_EQ(read_cb_called, 1);
|
|
ASSERT_EQ(tcp_write_cb_called, 1);
|
|
ASSERT_EQ(tcp_read_cb_called, 1);
|
|
ASSERT_EQ(exit_cb_called, 1);
|
|
return r;
|
|
}
|
|
|
|
#ifndef _WIN32
|
|
TEST_IMPL(ipc_closed_handle) {
|
|
int r;
|
|
r = run_ipc_test("ipc_helper_closed_handle", on_read_closed_handle);
|
|
ASSERT_EQ(r, 0);
|
|
return 0;
|
|
}
|
|
#endif
|
|
|
|
|
|
#ifdef _WIN32
|
|
TEST_IMPL(listen_with_simultaneous_accepts) {
|
|
uv_tcp_t server;
|
|
int r;
|
|
struct sockaddr_in addr;
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_simultaneous_accepts(&server, 1);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
ASSERT_EQ(server.reqs_pending, 32);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
|
|
TEST_IMPL(listen_no_simultaneous_accepts) {
|
|
uv_tcp_t server;
|
|
int r;
|
|
struct sockaddr_in addr;
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_bind(&server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_simultaneous_accepts(&server, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
ASSERT_EQ(server.reqs_pending, 1);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
TEST_IMPL(ipc_listen_after_bind_twice) {
|
|
#if defined(NO_SEND_HANDLE_ON_PIPE)
|
|
RETURN_SKIP(NO_SEND_HANDLE_ON_PIPE);
|
|
#endif
|
|
int r = run_ipc_test("ipc_helper_bind_twice", on_read_listen_after_bound_twice);
|
|
ASSERT_EQ(read_cb_called, 2);
|
|
ASSERT_EQ(exit_cb_called, 1);
|
|
return r;
|
|
}
|
|
#endif
|
|
|
|
TEST_IMPL(ipc_send_zero) {
|
|
int r;
|
|
r = run_ipc_test("ipc_helper_send_zero", on_read_send_zero);
|
|
ASSERT_EQ(r, 0);
|
|
return 0;
|
|
}
|
|
|
|
|
|
/* Everything here runs in a child process. */
|
|
|
|
static tcp_conn conn;
|
|
|
|
|
|
static void close_cb(uv_handle_t* handle) {
|
|
close_cb_called++;
|
|
}
|
|
|
|
|
|
static void conn_notify_write_cb(uv_write_t* req, int status) {
|
|
uv_close((uv_handle_t*)&tcp_server, close_cb);
|
|
uv_close((uv_handle_t*)&channel, close_cb);
|
|
}
|
|
|
|
|
|
static void tcp_connection_write_cb(uv_write_t* req, int status) {
|
|
ASSERT_PTR_EQ(&conn.conn, req->handle);
|
|
uv_close((uv_handle_t*)req->handle, close_cb);
|
|
uv_close((uv_handle_t*)&channel, close_cb);
|
|
uv_close((uv_handle_t*)&tcp_server, close_cb);
|
|
tcp_conn_write_cb_called++;
|
|
}
|
|
|
|
|
|
static void closed_handle_large_write_cb(uv_write_t* req, int status) {
|
|
ASSERT_EQ(status, 0);
|
|
ASSERT(closed_handle_data_read = LARGE_SIZE);
|
|
if (++write_reqs_completed == ARRAY_SIZE(write_reqs)) {
|
|
write_reqs_completed = 0;
|
|
if (write_until_data_queued() > 0)
|
|
send_handle_and_close();
|
|
}
|
|
}
|
|
|
|
|
|
static void closed_handle_write_cb(uv_write_t* req, int status) {
|
|
ASSERT_EQ(status, UV_EBADF);
|
|
closed_handle_write = 1;
|
|
}
|
|
|
|
|
|
static void send_zero_write_cb(uv_write_t* req, int status) {
|
|
ASSERT_EQ(status, 0);
|
|
send_zero_write++;
|
|
}
|
|
|
|
static void on_tcp_child_process_read(uv_stream_t* tcp,
|
|
ssize_t nread,
|
|
const uv_buf_t* buf) {
|
|
uv_buf_t outbuf;
|
|
int r;
|
|
|
|
if (nread < 0) {
|
|
if (nread == UV_EOF) {
|
|
free(buf->base);
|
|
return;
|
|
}
|
|
|
|
printf("error recving on tcp connection: %s\n", uv_strerror(nread));
|
|
abort();
|
|
}
|
|
|
|
ASSERT_GT(nread, 0);
|
|
ASSERT_MEM_EQ("world\n", buf->base, nread);
|
|
on_pipe_read_called++;
|
|
free(buf->base);
|
|
|
|
/* Write to the socket */
|
|
outbuf = uv_buf_init("hello again\n", 12);
|
|
r = uv_write(&conn.tcp_write_req, tcp, &outbuf, 1, tcp_connection_write_cb);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
tcp_conn_read_cb_called++;
|
|
}
|
|
|
|
|
|
static void connect_child_process_cb(uv_connect_t* req, int status) {
|
|
int r;
|
|
|
|
ASSERT_EQ(status, 0);
|
|
r = uv_read_start(req->handle, on_read_alloc, on_tcp_child_process_read);
|
|
ASSERT_EQ(r, 0);
|
|
}
|
|
|
|
|
|
static void ipc_on_connection(uv_stream_t* server, int status) {
|
|
int r;
|
|
uv_buf_t buf;
|
|
|
|
if (!connection_accepted) {
|
|
/*
|
|
* Accept the connection and close it. Also let the other
|
|
* side know.
|
|
*/
|
|
ASSERT_EQ(status, 0);
|
|
ASSERT_PTR_EQ(&tcp_server, server);
|
|
|
|
r = uv_tcp_init(server->loop, &conn.conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept(server, (uv_stream_t*)&conn.conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_close((uv_handle_t*)&conn.conn, close_cb);
|
|
|
|
buf = uv_buf_init("accepted_connection\n", 20);
|
|
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
|
|
NULL, conn_notify_write_cb);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
connection_accepted = 1;
|
|
}
|
|
}
|
|
|
|
|
|
static void close_and_free_cb(uv_handle_t* handle) {
|
|
close_cb_called++;
|
|
free(handle);
|
|
}
|
|
|
|
static void ipc_on_connection_tcp_conn(uv_stream_t* server, int status) {
|
|
int r;
|
|
uv_buf_t buf;
|
|
uv_tcp_t* conn;
|
|
|
|
ASSERT_EQ(status, 0);
|
|
ASSERT_PTR_EQ(&tcp_server, server);
|
|
|
|
conn = malloc(sizeof(*conn));
|
|
ASSERT_NOT_NULL(conn);
|
|
|
|
r = uv_tcp_init(server->loop, conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_accept(server, (uv_stream_t*)conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
/* Send the accepted connection to the other process */
|
|
buf = uv_buf_init("hello\n", 6);
|
|
r = uv_write2(&conn_notify_req, (uv_stream_t*)&channel, &buf, 1,
|
|
(uv_stream_t*)conn, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_read_start((uv_stream_t*) conn,
|
|
on_read_alloc,
|
|
on_tcp_child_process_read);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_close((uv_handle_t*)conn, close_and_free_cb);
|
|
}
|
|
|
|
|
|
int ipc_helper(int listen_after_write) {
|
|
/*
|
|
* This is launched from test-ipc.c. stdin is a duplex channel that we
|
|
* over which a handle will be transmitted.
|
|
*/
|
|
struct sockaddr_in addr;
|
|
int r;
|
|
uv_buf_t buf;
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_pipe_open(&channel, 0);
|
|
|
|
ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
if (!listen_after_write) {
|
|
r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
|
|
ASSERT_EQ(r, 0);
|
|
}
|
|
|
|
buf = uv_buf_init("hello\n", 6);
|
|
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
|
|
(uv_stream_t*)&tcp_server, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
if (listen_after_write) {
|
|
r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection);
|
|
ASSERT_EQ(r, 0);
|
|
}
|
|
|
|
notify_parent_process();
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(connection_accepted, 1);
|
|
ASSERT_EQ(close_cb_called, 3);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
|
|
int ipc_helper_tcp_connection(void) {
|
|
/*
|
|
* This is launched from test-ipc.c. stdin is a duplex channel
|
|
* over which a handle will be transmitted.
|
|
*/
|
|
|
|
int r;
|
|
struct sockaddr_in addr;
|
|
|
|
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_pipe_open(&channel, 0);
|
|
|
|
ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_listen((uv_stream_t*)&tcp_server, BACKLOG, ipc_on_connection_tcp_conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
/* Make a connection to the server */
|
|
r = uv_tcp_init(uv_default_loop(), &conn.conn);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_connect(&conn.conn_req,
|
|
(uv_tcp_t*) &conn.conn,
|
|
(const struct sockaddr*) &addr,
|
|
connect_child_process_cb);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(tcp_conn_read_cb_called, 1);
|
|
ASSERT_EQ(tcp_conn_write_cb_called, 1);
|
|
ASSERT_EQ(close_cb_called, 4);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
static unsigned int write_until_data_queued() {
|
|
unsigned int i;
|
|
int r;
|
|
|
|
i = 0;
|
|
do {
|
|
r = uv_write(&write_reqs[i],
|
|
(uv_stream_t*)&channel,
|
|
&large_buf,
|
|
1,
|
|
closed_handle_large_write_cb);
|
|
ASSERT_EQ(r, 0);
|
|
i++;
|
|
} while (channel.write_queue_size == 0 &&
|
|
i < ARRAY_SIZE(write_reqs));
|
|
|
|
return channel.write_queue_size;
|
|
}
|
|
|
|
static void send_handle_and_close() {
|
|
int r;
|
|
struct sockaddr_in addr;
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_write2(&write_req,
|
|
(uv_stream_t*)&channel,
|
|
&large_buf,
|
|
1,
|
|
(uv_stream_t*)&tcp_server,
|
|
closed_handle_write_cb);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_close((uv_handle_t*)&tcp_server, NULL);
|
|
}
|
|
|
|
int ipc_helper_closed_handle(void) {
|
|
int r;
|
|
|
|
memset(buffer, '.', LARGE_SIZE);
|
|
large_buf = uv_buf_init(buffer, LARGE_SIZE);
|
|
|
|
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_pipe_open(&channel, 0);
|
|
|
|
ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
|
|
|
|
if (write_until_data_queued() > 0)
|
|
send_handle_and_close();
|
|
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(closed_handle_write, 1);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
|
|
int ipc_helper_bind_twice(void) {
|
|
/*
|
|
* This is launched from test-ipc.c. stdin is a duplex channel
|
|
* over which two handles will be transmitted.
|
|
*/
|
|
struct sockaddr_in addr;
|
|
int r;
|
|
uv_buf_t buf;
|
|
|
|
ASSERT_EQ(0, uv_ip4_addr("0.0.0.0", TEST_PORT, &addr));
|
|
|
|
r = uv_pipe_init(uv_default_loop(), &channel, 1);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_pipe_open(&channel, 0);
|
|
|
|
ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
|
|
|
|
buf = uv_buf_init("hello\n", 6);
|
|
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server);
|
|
ASSERT_EQ(r, 0);
|
|
r = uv_tcp_init(uv_default_loop(), &tcp_server2);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_tcp_bind(&tcp_server, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
r = uv_tcp_bind(&tcp_server2, (const struct sockaddr*) &addr, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_write2(&write_req, (uv_stream_t*)&channel, &buf, 1,
|
|
(uv_stream_t*)&tcp_server, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
r = uv_write2(&write_req2, (uv_stream_t*)&channel, &buf, 1,
|
|
(uv_stream_t*)&tcp_server2, NULL);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|
|
|
|
int ipc_helper_send_zero(void) {
|
|
int r;
|
|
uv_buf_t zero_buf;
|
|
|
|
zero_buf = uv_buf_init(0, 0);
|
|
|
|
r = uv_pipe_init(uv_default_loop(), &channel, 0);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
uv_pipe_open(&channel, 0);
|
|
|
|
ASSERT_EQ(1, uv_is_readable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(1, uv_is_writable((uv_stream_t*) &channel));
|
|
ASSERT_EQ(0, uv_is_closing((uv_handle_t*) &channel));
|
|
|
|
r = uv_write(&write_req,
|
|
(uv_stream_t*)&channel,
|
|
&zero_buf,
|
|
1,
|
|
send_zero_write_cb);
|
|
|
|
ASSERT_EQ(r, 0);
|
|
|
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
|
ASSERT_EQ(r, 0);
|
|
|
|
ASSERT_EQ(send_zero_write, 1);
|
|
|
|
MAKE_VALGRIND_HAPPY();
|
|
return 0;
|
|
}
|