windows: stdio over non-overlapped pipes

This commit is contained in:
Igor Zinkovsky 2011-10-19 00:48:38 -07:00
parent cb474b24c1
commit 54982a23ef
12 changed files with 575 additions and 58 deletions

View File

@ -190,7 +190,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
struct uv_req_s* next_req;
#define UV_WRITE_PRIVATE_FIELDS \
int ipc_header;
int ipc_header; \
uv_buf_t* write_buffer; \
HANDLE event_handle; \
HANDLE wait_handle; \
uv_write_t* next_non_overlapped_write;
#define UV_CONNECT_PRIVATE_FIELDS \
/* empty */
@ -215,7 +219,13 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
HANDLE event_handle; \
HANDLE wait_handle; \
struct uv_tcp_accept_s* next_pending; \
} uv_tcp_accept_t;
} uv_tcp_accept_t; \
\
typedef struct uv_read_s { \
UV_REQ_FIELDS \
HANDLE event_handle; \
HANDLE wait_handle; \
} uv_read_t;
#define uv_stream_connection_fields \
unsigned int write_reqs_pending; \
@ -226,7 +236,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
#define UV_STREAM_PRIVATE_FIELDS \
unsigned int reqs_pending; \
uv_req_t read_req; \
uv_read_t read_req; \
union { \
struct { uv_stream_connection_fields }; \
struct { uv_stream_server_fields }; \
@ -270,7 +280,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
uv_write_t ipc_header_write_req; \
int ipc_pid; \
uint64_t remaining_ipc_rawdata_bytes; \
WSAPROTOCOL_INFOW* pending_socket_info;
WSAPROTOCOL_INFOW* pending_socket_info; \
uv_write_t* non_overlapped_writes_tail;
#define UV_PIPE_PRIVATE_FIELDS \
HANDLE handle; \

View File

@ -65,6 +65,7 @@ void uv_process_timers(uv_loop_t* loop);
#define UV_HANDLE_ZERO_READ 0x40000
#define UV_HANDLE_TTY_RAW 0x80000
#define UV_HANDLE_EMULATE_IOCP 0x100000
#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x200000
void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle);
void uv_process_endgames(uv_loop_t* loop);

View File

@ -79,6 +79,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
handle->remaining_ipc_rawdata_bytes = 0;
handle->pending_socket_info = NULL;
handle->ipc = ipc;
handle->non_overlapped_writes_tail = NULL;
uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req);
@ -90,6 +91,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
static void uv_pipe_connection_init(uv_pipe_t* handle) {
uv_connection_init((uv_stream_t*) handle);
handle->read_req.data = handle;
handle->eof_timer = NULL;
}
@ -149,19 +151,39 @@ done:
static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle,
HANDLE pipeHandle) {
NTSTATUS nt_status;
IO_STATUS_BLOCK io_status;
FILE_MODE_INFORMATION mode_info;
DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) {
return -1;
}
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
/* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */
nt_status = pNtQueryInformationFile(pipeHandle,
&io_status,
&mode_info,
sizeof(mode_info),
FileModeInformation);
if (nt_status != STATUS_SUCCESS) {
return -1;
}
if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT ||
mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) {
/* Non-overlapped pipe. */
handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE;
} else {
/* Overlapped pipe. Try to associate with IOCP. */
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
}
}
return 0;
}
@ -258,6 +280,17 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
free(handle->pending_socket_info);
handle->pending_socket_info = NULL;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(handle->read_req.wait_handle);
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
}
if (handle->read_req.event_handle) {
CloseHandle(handle->read_req.event_handle);
handle->read_req.event_handle = NULL;
}
}
}
/* Remember the state of this flag because the close callback is */
@ -657,8 +690,99 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
}
static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_read_t* req = (uv_read_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->data;
uv_loop_t* loop = handle->loop;
assert(req != NULL);
assert(req->type == UV_READ);
assert(handle->type == UV_NAMED_PIPE);
result = ReadFile(handle->handle,
&uv_zero_,
0,
&bytes,
NULL);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) {
int result;
DWORD bytes;
uv_write_t* req = (uv_write_t*) parameter;
uv_pipe_t* handle = (uv_pipe_t*) req->handle;
uv_loop_t* loop = handle->loop;
assert(req != NULL);
assert(req->type == UV_WRITE);
assert(handle->type == UV_NAMED_PIPE);
assert(req->write_buffer);
result = WriteFile(handle->handle,
req->write_buffer->base,
req->write_buffer->len,
&bytes,
NULL);
if (!result) {
SET_REQ_ERROR(req, GetLastError());
}
POST_COMPLETION_FOR_REQ(loop, req);
return 0;
}
static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) {
uv_read_t* req;
uv_tcp_t* handle;
req = (uv_read_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->data;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) {
uv_write_t* req;
uv_tcp_t* handle;
req = (uv_write_t*) context;
assert(req != NULL);
handle = (uv_tcp_t*)req->handle;
assert(handle != NULL);
assert(!timed_out);
if (!PostQueuedCompletionStatus(handle->loop->iocp,
req->overlapped.InternalHigh,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
uv_req_t* req;
uv_read_t* req;
int result;
assert(handle->flags & UV_HANDLE_READING);
@ -667,28 +791,60 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) {
assert(handle->handle != INVALID_HANDLE_VALUE);
req = &handle->read_req;
memset(&req->overlapped, 0, sizeof(req->overlapped));
/* Do 0-read */
result = ReadFile(handle->handle,
&uv_zero_,
0,
NULL,
&req->overlapped);
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, GetLastError());
goto error;
}
} else {
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1);
}
if (!result && GetLastError() != ERROR_IO_PENDING) {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, req);
/* Do 0-read */
result = ReadFile(handle->handle,
&uv_zero_,
0,
NULL,
&req->overlapped);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
return;
if (!result && GetLastError() != ERROR_IO_PENDING) {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, GetLastError());
goto error;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (!req->event_handle) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
if (req->wait_handle == INVALID_HANDLE_VALUE) {
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion_read_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
goto error;
}
}
}
}
/* Start the eof timer if there is one */
eof_timer_start(handle);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
return;
error:
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->flags |= UV_HANDLE_READ_PENDING;
handle->reqs_pending++;
}
@ -739,6 +895,54 @@ int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
}
static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
uv_write_t* req) {
req->next_non_overlapped_write = NULL;
if (handle->non_overlapped_writes_tail) {
req->next_non_overlapped_write =
handle->non_overlapped_writes_tail->next_non_overlapped_write;
handle->non_overlapped_writes_tail->next_non_overlapped_write = req;
handle->non_overlapped_writes_tail = req;
} else {
req->next_non_overlapped_write = req;
handle->non_overlapped_writes_tail = req;
}
}
static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) {
uv_write_t* req;
if (handle->non_overlapped_writes_tail) {
req = handle->non_overlapped_writes_tail->next_non_overlapped_write;
if (req == handle->non_overlapped_writes_tail) {
handle->non_overlapped_writes_tail = NULL;
} else {
handle->non_overlapped_writes_tail->next_non_overlapped_write =
req->next_non_overlapped_write;
}
return req;
} else {
/* queue empty */
return NULL;
}
}
static void uv_queue_non_overlapped_write(uv_pipe_t* handle) {
uv_write_t* req = uv_remove_non_overlapped_write_req(handle);
if (req) {
if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc,
req,
WT_EXECUTELONGFUNCTION)) {
uv_fatal_error(GetLastError(), "QueueUserWorkItem");
}
}
}
static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt,
uv_stream_t* send_handle, uv_write_cb cb) {
@ -775,9 +979,12 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
req->handle = (uv_stream_t*) handle;
req->cb = cb;
req->ipc_header = 0;
req->event_handle = NULL;
req->wait_handle = INVALID_HANDLE_VALUE;
memset(&req->overlapped, 0, sizeof(req->overlapped));
if (handle->ipc) {
assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
ipc_frame.header.flags = 0;
/* Use the IPC framing protocol. */
@ -856,24 +1063,49 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req,
}
}
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) {
req->write_buffer = &bufs[0];
uv_insert_non_overlapped_write_req(handle, req);
if (handle->write_reqs_pending == 0) {
uv_queue_non_overlapped_write(handle);
}
if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
} else {
result = WriteFile(handle->handle,
bufs[0].base,
bufs[0].len,
NULL,
&req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
if (result) {
/* Request completed immediately. */
req->queued_bytes = 0;
} else {
/* Request queued by the kernel. */
req->queued_bytes = uv_count_bufs(bufs, bufcnt);
handle->write_queue_size += req->queued_bytes;
}
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
req->event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!req->event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
if (!RegisterWaitForSingleObject(&req->wait_handle,
req->overlapped.hEvent, post_completion_write_wait, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
uv__set_sys_error(loop, GetLastError());
return -1;
}
}
}
handle->reqs_pending++;
@ -999,7 +1231,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
}
assert(bytes == sizeof(ipc_frame.header));
assert(ipc_frame.header.flags <= UV_IPC_UV_STREAM | UV_IPC_RAW_DATA);
assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA));
if (ipc_frame.header.flags & UV_IPC_UV_STREAM) {
assert(avail - sizeof(ipc_frame.header) >=
@ -1094,6 +1326,17 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
handle->write_queue_size -= req->queued_bytes;
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
if (req->wait_handle != INVALID_HANDLE_VALUE) {
UnregisterWait(req->wait_handle);
req->wait_handle = INVALID_HANDLE_VALUE;
}
if (req->event_handle) {
CloseHandle(req->event_handle);
req->event_handle = NULL;
}
}
if (req->ipc_header) {
if (req == &handle->ipc_header_write_req) {
req->type = UV_UNKNOWN_REQ;
@ -1112,6 +1355,13 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle,
}
handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE &&
handle->non_overlapped_writes_tail) {
assert(handle->write_reqs_pending > 0);
uv_queue_non_overlapped_write(handle);
}
if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
uv_want_endgame(loop, (uv_handle_t*)handle);
@ -1277,21 +1527,19 @@ static void eof_timer_close_cb(uv_handle_t* handle) {
void uv_pipe_open(uv_pipe_t* pipe, uv_file file) {
HANDLE os_handle;
/* Special-case stdin with ipc. */
if (file == 0 && pipe->ipc) {
os_handle = (HANDLE)_get_osfhandle(file);
HANDLE os_handle = (HANDLE)_get_osfhandle(file);
if (os_handle == INVALID_HANDLE_VALUE ||
uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) {
return;
}
if (os_handle == INVALID_HANDLE_VALUE ||
uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) {
return;
}
uv_pipe_connection_init(pipe);
uv_pipe_connection_init(pipe);
pipe->handle = os_handle;
if (pipe->ipc) {
assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE));
pipe->ipc_pid = uv_parent_pid();
assert(pipe->ipc_pid != -1);
pipe->handle = os_handle;
}
}

View File

@ -43,6 +43,8 @@ void uv_connection_init(uv_stream_t* handle) {
handle->write_reqs_pending = 0;
uv_req_init(handle->loop, (uv_req_t*) &(handle->read_req));
handle->read_req.event_handle = NULL;
handle->read_req.wait_handle = INVALID_HANDLE_VALUE;
handle->read_req.type = UV_READ;
handle->read_req.data = handle;
}

View File

@ -316,6 +316,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
return;
}
} else {
@ -335,7 +336,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) {
static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
uv_req_t* req;
uv_read_t* req;
uv_buf_t buf;
int result;
DWORD bytes, flags;
@ -375,7 +376,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
handle->flags |= UV_HANDLE_READ_PENDING;
req->overlapped.InternalHigh = bytes;
handle->reqs_pending++;
uv_insert_pending_req(loop, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
/* The req will be processed with IOCP. */
handle->flags |= UV_HANDLE_READ_PENDING;
@ -383,7 +384,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
} else {
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
}
}

View File

@ -239,7 +239,7 @@ static void CALLBACK uv_tty_post_raw_read(void* data, BOOLEAN didTimeout) {
static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) {
uv_req_t* req;
uv_read_t* req;
BOOL r;
assert(handle->flags & UV_HANDLE_READING);
@ -261,7 +261,7 @@ static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) {
if (!r) {
handle->read_raw_wait = NULL;
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
}
handle->flags |= UV_HANDLE_READ_PENDING;
@ -309,7 +309,7 @@ static DWORD CALLBACK uv_tty_line_read_thread(void* data) {
static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) {
uv_req_t* req;
uv_read_t* req;
BOOL r;
assert(handle->flags & UV_HANDLE_READING);
@ -337,7 +337,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) {
if (!r) {
handle->read_line_handle = NULL;
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
goto out;
}
}
@ -347,7 +347,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) {
WT_EXECUTELONGFUNCTION);
if (!r) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
}
out:

View File

@ -4137,6 +4137,13 @@ typedef struct _FILE_BASIC_INFORMATION {
DWORD FileAttributes;
} FILE_BASIC_INFORMATION, *PFILE_BASIC_INFORMATION;
typedef struct _FILE_MODE_INFORMATION {
ULONG Mode;
} FILE_MODE_INFORMATION, *PFILE_MODE_INFORMATION;
#define FILE_SYNCHRONOUS_IO_ALERT 0x00000010
#define FILE_SYNCHRONOUS_IO_NONALERT 0x00000020
typedef enum _FILE_INFORMATION_CLASS {
FileDirectoryInformation = 1,
FileFullDirectoryInformation,

View File

@ -55,6 +55,11 @@ static uv_write_t conn_notify_req;
static int close_cb_called;
static int connection_accepted;
static uv_pipe_t stdin_pipe;
static uv_pipe_t stdout_pipe;
static int on_pipe_read_called;
static int after_write_called;
static void close_cb(uv_handle_t* handle) {
close_cb_called++;
@ -148,6 +153,85 @@ static int ipc_helper() {
}
void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
ASSERT(nread > 0);
ASSERT(memcmp("hello world\n", buf.base, nread) == 0);
on_pipe_read_called++;
free(buf.base);
uv_close((uv_handle_t*)&stdin_pipe, close_cb);
uv_close((uv_handle_t*)&stdout_pipe, close_cb);
}
static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle,
size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*)malloc(suggested_size);
buf.len = suggested_size;
return buf;
}
static void after_pipe_write(uv_write_t* req, int status) {
ASSERT(status == 0);
after_write_called++;
}
static int stdio_over_pipes_helper() {
/* Write several buffers to test that the write order is preserved. */
char* buffers[] = {
"he",
"ll",
"o ",
"wo",
"rl",
"d",
"\n"
};
uv_write_t write_req[COUNTOF(buffers)];
uv_buf_t buf[COUNTOF(buffers)];
int r, i;
uv_loop_t* loop = uv_default_loop();
ASSERT(UV_NAMED_PIPE == uv_guess_handle(0));
ASSERT(UV_NAMED_PIPE == uv_guess_handle(1));
r = uv_pipe_init(loop, &stdin_pipe, 0);
ASSERT(r == 0);
r = uv_pipe_init(loop, &stdout_pipe, 0);
ASSERT(r == 0);
uv_pipe_open(&stdin_pipe, 0);
uv_pipe_open(&stdout_pipe, 1);
r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc,
on_pipe_read);
ASSERT(r == 0);
for (i = 0; i < COUNTOF(buffers); i++) {
buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i]));
}
for (i = 0; i < COUNTOF(buffers); i++) {
r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1,
after_pipe_write);
ASSERT(r == 0);
}
uv_run(loop);
ASSERT(after_write_called == 7);
ASSERT(on_pipe_read_called == 1);
ASSERT(close_cb_called == 2);
return 0;
}
static int maybe_run_test(int argc, char **argv) {
if (strcmp(argv[1], "--list") == 0) {
print_tests(stdout);
@ -158,6 +242,10 @@ static int maybe_run_test(int argc, char **argv) {
return ipc_helper();
}
if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) {
return stdio_over_pipes_helper();
}
if (strcmp(argv[1], "spawn_helper1") == 0) {
return 1;
}

View File

@ -38,6 +38,8 @@
# define TEST_PIPENAME_2 "/tmp/uv-test-sock2"
#endif
#define COUNTOF(a) (sizeof(a) / sizeof(a[0]))
typedef enum {
TCP = 0,
PIPE

View File

@ -20,6 +20,7 @@
*/
TEST_DECLARE (tty)
TEST_DECLARE (stdio_over_pipes)
TEST_DECLARE (ipc)
TEST_DECLARE (tcp_ping_pong)
TEST_DECLARE (tcp_ping_pong_v6)
@ -117,6 +118,7 @@ HELPER_DECLARE (pipe_echo_server)
TASK_LIST_START
TEST_ENTRY (tty)
TEST_ENTRY (stdio_over_pipes)
TEST_ENTRY (ipc)

View File

@ -0,0 +1,154 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
static char exepath[1024];
static size_t exepath_size = 1024;
static char* args[3];
static uv_process_options_t options;
static int close_cb_called;
static int exit_cb_called;
static int on_read_cb_called;
static int after_write_cb_called;
uv_pipe_t out, in;
static uv_loop_t* loop;
#define OUTPUT_SIZE 1024
static char output[OUTPUT_SIZE];
static int output_used;
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
static void close_cb(uv_handle_t* handle) {
printf("close_cb\n");
close_cb_called++;
}
static void exit_cb(uv_process_t* process, int exit_status, int term_signal) {
printf("exit_cb\n");
exit_cb_called++;
ASSERT(exit_status == 0);
ASSERT(term_signal == 0);
uv_close((uv_handle_t*)process, close_cb);
uv_close((uv_handle_t*)&in, close_cb);
uv_close((uv_handle_t*)&out, close_cb);
}
static void init_process_options(char* test, uv_exit_cb exit_cb) {
int r = uv_exepath(exepath, &exepath_size);
ASSERT(r == 0);
exepath[exepath_size] = '\0';
args[0] = exepath;
args[1] = test;
args[2] = NULL;
options.file = exepath;
options.args = args;
options.exit_cb = exit_cb;
}
static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = output + output_used;
buf.len = OUTPUT_SIZE - output_used;
return buf;
}
static void after_write(uv_write_t* req, int status) {
write_req_t* wr;
if (status) {
uv_err_t err = uv_last_error(loop);
fprintf(stderr, "uv_write error: %s\n", uv_strerror(err));
ASSERT(0);
}
wr = (write_req_t*) req;
/* Free the read/write buffer and the request */
free(wr);
after_write_cb_called++;
}
static void on_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
write_req_t* write_req;
int r;
uv_err_t err = uv_last_error(uv_default_loop());
ASSERT(nread > 0 || err.code == UV_EOF);
if (nread > 0) {
output_used += nread;
if (output_used == 12) {
ASSERT(memcmp("hello world\n", output, 12) == 0);
write_req = (write_req_t*)malloc(sizeof(*write_req));
write_req->buf = uv_buf_init(output, output_used);
r = uv_write(&write_req->req, (uv_stream_t*)&in, &write_req->buf, 1, after_write);
ASSERT(r == 0);
}
}
on_read_cb_called++;
}
TEST_IMPL(stdio_over_pipes) {
int r;
uv_process_t process;
loop = uv_default_loop();
init_process_options("stdio_over_pipes_helper", exit_cb);
uv_pipe_init(loop, &out, 0);
options.stdout_stream = &out;
uv_pipe_init(loop, &in, 0);
options.stdin_stream = &in;
r = uv_spawn(loop, &process, options);
ASSERT(r == 0);
r = uv_read_start((uv_stream_t*) &out, on_alloc, on_read);
ASSERT(r == 0);
r = uv_run(uv_default_loop());
ASSERT(r == 0);
ASSERT(on_read_cb_called > 1);
ASSERT(after_write_cb_called == 1);
ASSERT(exit_cb_called == 1);
ASSERT(close_cb_called == 3);
ASSERT(memcmp("hello world\n", output, 12) == 0);
ASSERT(output_used == 12);
return 0;
}

1
uv.gyp
View File

@ -286,6 +286,7 @@
'test/test-ref.c',
'test/test-shutdown-eof.c',
'test/test-spawn.c',
'test/test-stdio-over-pipes.c',
'test/test-tcp-bind-error.c',
'test/test-tcp-bind6-error.c',
'test/test-tcp-close.c',