diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 86a05115..4f8790c3 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -190,7 +190,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); struct uv_req_s* next_req; #define UV_WRITE_PRIVATE_FIELDS \ - int ipc_header; + int ipc_header; \ + uv_buf_t* write_buffer; \ + HANDLE event_handle; \ + HANDLE wait_handle; \ + uv_write_t* next_non_overlapped_write; #define UV_CONNECT_PRIVATE_FIELDS \ /* empty */ @@ -215,7 +219,13 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); HANDLE event_handle; \ HANDLE wait_handle; \ struct uv_tcp_accept_s* next_pending; \ - } uv_tcp_accept_t; + } uv_tcp_accept_t; \ + \ + typedef struct uv_read_s { \ + UV_REQ_FIELDS \ + HANDLE event_handle; \ + HANDLE wait_handle; \ + } uv_read_t; #define uv_stream_connection_fields \ unsigned int write_reqs_pending; \ @@ -226,7 +236,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); #define UV_STREAM_PRIVATE_FIELDS \ unsigned int reqs_pending; \ - uv_req_t read_req; \ + uv_read_t read_req; \ union { \ struct { uv_stream_connection_fields }; \ struct { uv_stream_server_fields }; \ @@ -270,7 +280,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_write_t ipc_header_write_req; \ int ipc_pid; \ uint64_t remaining_ipc_rawdata_bytes; \ - WSAPROTOCOL_INFOW* pending_socket_info; + WSAPROTOCOL_INFOW* pending_socket_info; \ + uv_write_t* non_overlapped_writes_tail; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/src/win/internal.h b/src/win/internal.h index 1f8eb837..ea0867c1 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -65,6 +65,7 @@ void uv_process_timers(uv_loop_t* loop); #define UV_HANDLE_ZERO_READ 0x40000 #define UV_HANDLE_TTY_RAW 0x80000 #define UV_HANDLE_EMULATE_IOCP 0x100000 +#define UV_HANDLE_NON_OVERLAPPED_PIPE 0x200000 void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); diff --git a/src/win/pipe.c b/src/win/pipe.c index 9c90fda0..bf74e0af 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -79,6 +79,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->remaining_ipc_rawdata_bytes = 0; handle->pending_socket_info = NULL; handle->ipc = ipc; + handle->non_overlapped_writes_tail = NULL; uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); @@ -90,6 +91,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); + handle->read_req.data = handle; handle->eof_timer = NULL; } @@ -149,19 +151,39 @@ done: static int uv_set_pipe_handle(uv_loop_t* loop, uv_pipe_t* handle, HANDLE pipeHandle) { + NTSTATUS nt_status; + IO_STATUS_BLOCK io_status; + FILE_MODE_INFORMATION mode_info; DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { return -1; } - if (CreateIoCompletionPort(pipeHandle, - loop->iocp, - (ULONG_PTR)handle, - 0) == NULL) { + /* Check if the pipe was created with FILE_FLAG_OVERLAPPED. */ + nt_status = pNtQueryInformationFile(pipeHandle, + &io_status, + &mode_info, + sizeof(mode_info), + FileModeInformation); + if (nt_status != STATUS_SUCCESS) { return -1; } + if (mode_info.Mode & FILE_SYNCHRONOUS_IO_ALERT || + mode_info.Mode & FILE_SYNCHRONOUS_IO_NONALERT) { + /* Non-overlapped pipe. */ + handle->flags |= UV_HANDLE_NON_OVERLAPPED_PIPE; + } else { + /* Overlapped pipe. Try to associate with IOCP. */ + if (CreateIoCompletionPort(pipeHandle, + loop->iocp, + (ULONG_PTR)handle, + 0) == NULL) { + handle->flags |= UV_HANDLE_EMULATE_IOCP; + } + } + return 0; } @@ -258,6 +280,17 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { free(handle->pending_socket_info); handle->pending_socket_info = NULL; } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(handle->read_req.wait_handle); + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; + } + if (handle->read_req.event_handle) { + CloseHandle(handle->read_req.event_handle); + handle->read_req.event_handle = NULL; + } + } } /* Remember the state of this flag because the close callback is */ @@ -657,8 +690,99 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { } +static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) { + int result; + DWORD bytes; + uv_read_t* req = (uv_read_t*) parameter; + uv_pipe_t* handle = (uv_pipe_t*) req->data; + uv_loop_t* loop = handle->loop; + + assert(req != NULL); + assert(req->type == UV_READ); + assert(handle->type == UV_NAMED_PIPE); + + result = ReadFile(handle->handle, + &uv_zero_, + 0, + &bytes, + NULL); + + if (!result) { + SET_REQ_ERROR(req, GetLastError()); + } + + POST_COMPLETION_FOR_REQ(loop, req); + return 0; +} + + +static DWORD WINAPI uv_pipe_writefile_thread_proc(void* parameter) { + int result; + DWORD bytes; + uv_write_t* req = (uv_write_t*) parameter; + uv_pipe_t* handle = (uv_pipe_t*) req->handle; + uv_loop_t* loop = handle->loop; + + assert(req != NULL); + assert(req->type == UV_WRITE); + assert(handle->type == UV_NAMED_PIPE); + assert(req->write_buffer); + + result = WriteFile(handle->handle, + req->write_buffer->base, + req->write_buffer->len, + &bytes, + NULL); + + if (!result) { + SET_REQ_ERROR(req, GetLastError()); + } + + POST_COMPLETION_FOR_REQ(loop, req); + return 0; +} + + +static void CALLBACK post_completion_read_wait(void* context, BOOLEAN timed_out) { + uv_read_t* req; + uv_tcp_t* handle; + + req = (uv_read_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->data; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + +static void CALLBACK post_completion_write_wait(void* context, BOOLEAN timed_out) { + uv_write_t* req; + uv_tcp_t* handle; + + req = (uv_write_t*) context; + assert(req != NULL); + handle = (uv_tcp_t*)req->handle; + assert(handle != NULL); + assert(!timed_out); + + if (!PostQueuedCompletionStatus(handle->loop->iocp, + req->overlapped.InternalHigh, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } +} + + static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { - uv_req_t* req; + uv_read_t* req; int result; assert(handle->flags & UV_HANDLE_READING); @@ -667,28 +791,60 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { assert(handle->handle != INVALID_HANDLE_VALUE); req = &handle->read_req; - memset(&req->overlapped, 0, sizeof(req->overlapped)); - /* Do 0-read */ - result = ReadFile(handle->handle, - &uv_zero_, - 0, - NULL, - &req->overlapped); + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, + req, + WT_EXECUTELONGFUNCTION)) { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + } else { + memset(&req->overlapped, 0, sizeof(req->overlapped)); + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->overlapped.hEvent = (HANDLE) ((DWORD) req->event_handle | 1); + } - if (!result && GetLastError() != ERROR_IO_PENDING) { - /* Make this req pending reporting an error. */ - SET_REQ_ERROR(req, WSAGetLastError()); - uv_insert_pending_req(loop, req); + /* Do 0-read */ + result = ReadFile(handle->handle, + &uv_zero_, + 0, + NULL, + &req->overlapped); - handle->flags |= UV_HANDLE_READ_PENDING; - handle->reqs_pending++; - return; + if (!result && GetLastError() != ERROR_IO_PENDING) { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (!req->event_handle) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + } + if (req->wait_handle == INVALID_HANDLE_VALUE) { + if (!RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion_read_wait, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + SET_REQ_ERROR(req, GetLastError()); + goto error; + } + } + } } /* Start the eof timer if there is one */ eof_timer_start(handle); + handle->flags |= UV_HANDLE_READ_PENDING; + handle->reqs_pending++; + return; +error: + uv_insert_pending_req(loop, (uv_req_t*)req); handle->flags |= UV_HANDLE_READ_PENDING; handle->reqs_pending++; } @@ -739,6 +895,54 @@ int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, } +static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle, + uv_write_t* req) { + req->next_non_overlapped_write = NULL; + if (handle->non_overlapped_writes_tail) { + req->next_non_overlapped_write = + handle->non_overlapped_writes_tail->next_non_overlapped_write; + handle->non_overlapped_writes_tail->next_non_overlapped_write = req; + handle->non_overlapped_writes_tail = req; + } else { + req->next_non_overlapped_write = req; + handle->non_overlapped_writes_tail = req; + } +} + + +static uv_write_t* uv_remove_non_overlapped_write_req(uv_pipe_t* handle) { + uv_write_t* req; + + if (handle->non_overlapped_writes_tail) { + req = handle->non_overlapped_writes_tail->next_non_overlapped_write; + + if (req == handle->non_overlapped_writes_tail) { + handle->non_overlapped_writes_tail = NULL; + } else { + handle->non_overlapped_writes_tail->next_non_overlapped_write = + req->next_non_overlapped_write; + } + + return req; + } else { + /* queue empty */ + return NULL; + } +} + + +static void uv_queue_non_overlapped_write(uv_pipe_t* handle) { + uv_write_t* req = uv_remove_non_overlapped_write_req(handle); + if (req) { + if (!QueueUserWorkItem(&uv_pipe_writefile_thread_proc, + req, + WT_EXECUTELONGFUNCTION)) { + uv_fatal_error(GetLastError(), "QueueUserWorkItem"); + } + } +} + + static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, uv_buf_t bufs[], int bufcnt, uv_stream_t* send_handle, uv_write_cb cb) { @@ -775,9 +979,12 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, req->handle = (uv_stream_t*) handle; req->cb = cb; req->ipc_header = 0; + req->event_handle = NULL; + req->wait_handle = INVALID_HANDLE_VALUE; memset(&req->overlapped, 0, sizeof(req->overlapped)); if (handle->ipc) { + assert(!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); ipc_frame.header.flags = 0; /* Use the IPC framing protocol. */ @@ -856,24 +1063,49 @@ static int uv_pipe_write_impl(uv_loop_t* loop, uv_write_t* req, } } - result = WriteFile(handle->handle, - bufs[0].base, - bufs[0].len, - NULL, - &req->overlapped); + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + req->write_buffer = &bufs[0]; + uv_insert_non_overlapped_write_req(handle, req); + if (handle->write_reqs_pending == 0) { + uv_queue_non_overlapped_write(handle); + } - if (!result && GetLastError() != ERROR_IO_PENDING) { - uv__set_sys_error(loop, GetLastError()); - return -1; - } - - if (result) { - /* Request completed immediately. */ - req->queued_bytes = 0; - } else { /* Request queued by the kernel. */ req->queued_bytes = uv_count_bufs(bufs, bufcnt); handle->write_queue_size += req->queued_bytes; + } else { + result = WriteFile(handle->handle, + bufs[0].base, + bufs[0].len, + NULL, + &req->overlapped); + + if (!result && GetLastError() != ERROR_IO_PENDING) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + if (result) { + /* Request completed immediately. */ + req->queued_bytes = 0; + } else { + /* Request queued by the kernel. */ + req->queued_bytes = uv_count_bufs(bufs, bufcnt); + handle->write_queue_size += req->queued_bytes; + } + + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + req->event_handle = CreateEvent(NULL, 0, 0, NULL); + if (!req->event_handle) { + uv_fatal_error(GetLastError(), "CreateEvent"); + } + if (!RegisterWaitForSingleObject(&req->wait_handle, + req->overlapped.hEvent, post_completion_write_wait, (void*) req, + INFINITE, WT_EXECUTEINWAITTHREAD)) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + } } handle->reqs_pending++; @@ -999,7 +1231,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, } assert(bytes == sizeof(ipc_frame.header)); - assert(ipc_frame.header.flags <= UV_IPC_UV_STREAM | UV_IPC_RAW_DATA); + assert(ipc_frame.header.flags <= (UV_IPC_UV_STREAM | UV_IPC_RAW_DATA)); if (ipc_frame.header.flags & UV_IPC_UV_STREAM) { assert(avail - sizeof(ipc_frame.header) >= @@ -1094,6 +1326,17 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, handle->write_queue_size -= req->queued_bytes; + if (handle->flags & UV_HANDLE_EMULATE_IOCP) { + if (req->wait_handle != INVALID_HANDLE_VALUE) { + UnregisterWait(req->wait_handle); + req->wait_handle = INVALID_HANDLE_VALUE; + } + if (req->event_handle) { + CloseHandle(req->event_handle); + req->event_handle = NULL; + } + } + if (req->ipc_header) { if (req == &handle->ipc_header_write_req) { req->type = UV_UNKNOWN_REQ; @@ -1112,6 +1355,13 @@ void uv_process_pipe_write_req(uv_loop_t* loop, uv_pipe_t* handle, } handle->write_reqs_pending--; + + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE && + handle->non_overlapped_writes_tail) { + assert(handle->write_reqs_pending > 0); + uv_queue_non_overlapped_write(handle); + } + if (handle->write_reqs_pending == 0 && handle->flags & UV_HANDLE_SHUTTING) { uv_want_endgame(loop, (uv_handle_t*)handle); @@ -1277,21 +1527,19 @@ static void eof_timer_close_cb(uv_handle_t* handle) { void uv_pipe_open(uv_pipe_t* pipe, uv_file file) { - HANDLE os_handle; - - /* Special-case stdin with ipc. */ - if (file == 0 && pipe->ipc) { - os_handle = (HANDLE)_get_osfhandle(file); + HANDLE os_handle = (HANDLE)_get_osfhandle(file); - if (os_handle == INVALID_HANDLE_VALUE || - uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { - return; - } + if (os_handle == INVALID_HANDLE_VALUE || + uv_set_pipe_handle(pipe->loop, pipe, os_handle) == -1) { + return; + } - uv_pipe_connection_init(pipe); + uv_pipe_connection_init(pipe); + pipe->handle = os_handle; + + if (pipe->ipc) { + assert(!(pipe->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)); pipe->ipc_pid = uv_parent_pid(); assert(pipe->ipc_pid != -1); - - pipe->handle = os_handle; } } diff --git a/src/win/stream.c b/src/win/stream.c index f1211784..c2354eec 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -43,6 +43,8 @@ void uv_connection_init(uv_stream_t* handle) { handle->write_reqs_pending = 0; uv_req_init(handle->loop, (uv_req_t*) &(handle->read_req)); + handle->read_req.event_handle = NULL; + handle->read_req.wait_handle = INVALID_HANDLE_VALUE; handle->read_req.type = UV_READ; handle->read_req.data = handle; } diff --git a/src/win/tcp.c b/src/win/tcp.c index 897ea5e9..ee0591e7 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -316,6 +316,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { INFINITE, WT_EXECUTEINWAITTHREAD)) { SET_REQ_ERROR(req, GetLastError()); uv_insert_pending_req(loop, (uv_req_t*)req); + handle->reqs_pending++; return; } } else { @@ -335,7 +336,7 @@ static void uv_tcp_queue_accept(uv_tcp_t* handle, uv_tcp_accept_t* req) { static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { - uv_req_t* req; + uv_read_t* req; uv_buf_t buf; int result; DWORD bytes, flags; @@ -375,7 +376,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { handle->flags |= UV_HANDLE_READ_PENDING; req->overlapped.InternalHigh = bytes; handle->reqs_pending++; - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { /* The req will be processed with IOCP. */ handle->flags |= UV_HANDLE_READ_PENDING; @@ -383,7 +384,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { } else { /* Make this req pending reporting an error. */ SET_REQ_ERROR(req, WSAGetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); handle->reqs_pending++; } } diff --git a/src/win/tty.c b/src/win/tty.c index 16064eed..c02c102f 100644 --- a/src/win/tty.c +++ b/src/win/tty.c @@ -239,7 +239,7 @@ static void CALLBACK uv_tty_post_raw_read(void* data, BOOLEAN didTimeout) { static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) { - uv_req_t* req; + uv_read_t* req; BOOL r; assert(handle->flags & UV_HANDLE_READING); @@ -261,7 +261,7 @@ static void uv_tty_queue_read_raw(uv_loop_t* loop, uv_tty_t* handle) { if (!r) { handle->read_raw_wait = NULL; SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } handle->flags |= UV_HANDLE_READ_PENDING; @@ -309,7 +309,7 @@ static DWORD CALLBACK uv_tty_line_read_thread(void* data) { static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { - uv_req_t* req; + uv_read_t* req; BOOL r; assert(handle->flags & UV_HANDLE_READING); @@ -337,7 +337,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { if (!r) { handle->read_line_handle = NULL; SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); goto out; } } @@ -347,7 +347,7 @@ static void uv_tty_queue_read_line(uv_loop_t* loop, uv_tty_t* handle) { WT_EXECUTELONGFUNCTION); if (!r) { SET_REQ_ERROR(req, GetLastError()); - uv_insert_pending_req(loop, req); + uv_insert_pending_req(loop, (uv_req_t*)req); } out: diff --git a/src/win/winapi.h b/src/win/winapi.h index 8434363e..9ed808ea 100644 --- a/src/win/winapi.h +++ b/src/win/winapi.h @@ -4137,6 +4137,13 @@ typedef struct _FILE_BASIC_INFORMATION { DWORD FileAttributes; } FILE_BASIC_INFORMATION, *PFILE_BASIC_INFORMATION; +typedef struct _FILE_MODE_INFORMATION { + ULONG Mode; +} FILE_MODE_INFORMATION, *PFILE_MODE_INFORMATION; + +#define FILE_SYNCHRONOUS_IO_ALERT 0x00000010 +#define FILE_SYNCHRONOUS_IO_NONALERT 0x00000020 + typedef enum _FILE_INFORMATION_CLASS { FileDirectoryInformation = 1, FileFullDirectoryInformation, diff --git a/test/run-tests.c b/test/run-tests.c index fa7b8b8f..84ee85f6 100644 --- a/test/run-tests.c +++ b/test/run-tests.c @@ -55,6 +55,11 @@ static uv_write_t conn_notify_req; static int close_cb_called; static int connection_accepted; +static uv_pipe_t stdin_pipe; +static uv_pipe_t stdout_pipe; +static int on_pipe_read_called; +static int after_write_called; + static void close_cb(uv_handle_t* handle) { close_cb_called++; @@ -148,6 +153,85 @@ static int ipc_helper() { } +void on_pipe_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + ASSERT(nread > 0); + ASSERT(memcmp("hello world\n", buf.base, nread) == 0); + on_pipe_read_called++; + + free(buf.base); + + uv_close((uv_handle_t*)&stdin_pipe, close_cb); + uv_close((uv_handle_t*)&stdout_pipe, close_cb); +} + + +static uv_buf_t on_pipe_read_alloc(uv_handle_t* handle, + size_t suggested_size) { + uv_buf_t buf; + buf.base = (char*)malloc(suggested_size); + buf.len = suggested_size; + return buf; +} + + +static void after_pipe_write(uv_write_t* req, int status) { + ASSERT(status == 0); + after_write_called++; +} + + +static int stdio_over_pipes_helper() { + /* Write several buffers to test that the write order is preserved. */ + char* buffers[] = { + "he", + "ll", + "o ", + "wo", + "rl", + "d", + "\n" + }; + + uv_write_t write_req[COUNTOF(buffers)]; + uv_buf_t buf[COUNTOF(buffers)]; + int r, i; + uv_loop_t* loop = uv_default_loop(); + + ASSERT(UV_NAMED_PIPE == uv_guess_handle(0)); + ASSERT(UV_NAMED_PIPE == uv_guess_handle(1)); + + r = uv_pipe_init(loop, &stdin_pipe, 0); + ASSERT(r == 0); + r = uv_pipe_init(loop, &stdout_pipe, 0); + ASSERT(r == 0); + + uv_pipe_open(&stdin_pipe, 0); + uv_pipe_open(&stdout_pipe, 1); + + r = uv_read_start((uv_stream_t*)&stdin_pipe, on_pipe_read_alloc, + on_pipe_read); + ASSERT(r == 0); + + for (i = 0; i < COUNTOF(buffers); i++) { + buf[i] = uv_buf_init((char*)buffers[i], strlen(buffers[i])); + } + + for (i = 0; i < COUNTOF(buffers); i++) { + r = uv_write(&write_req[i], (uv_stream_t*)&stdout_pipe, &buf[i], 1, + after_pipe_write); + ASSERT(r == 0); + } + + uv_run(loop); + + ASSERT(after_write_called == 7); + ASSERT(on_pipe_read_called == 1); + ASSERT(close_cb_called == 2); + + return 0; +} + + static int maybe_run_test(int argc, char **argv) { if (strcmp(argv[1], "--list") == 0) { print_tests(stdout); @@ -158,6 +242,10 @@ static int maybe_run_test(int argc, char **argv) { return ipc_helper(); } + if (strcmp(argv[1], "stdio_over_pipes_helper") == 0) { + return stdio_over_pipes_helper(); + } + if (strcmp(argv[1], "spawn_helper1") == 0) { return 1; } diff --git a/test/task.h b/test/task.h index 76c69033..e28b393b 100644 --- a/test/task.h +++ b/test/task.h @@ -38,6 +38,8 @@ # define TEST_PIPENAME_2 "/tmp/uv-test-sock2" #endif +#define COUNTOF(a) (sizeof(a) / sizeof(a[0])) + typedef enum { TCP = 0, PIPE diff --git a/test/test-list.h b/test/test-list.h index 92535847..17b98c21 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -20,6 +20,7 @@ */ TEST_DECLARE (tty) +TEST_DECLARE (stdio_over_pipes) TEST_DECLARE (ipc) TEST_DECLARE (tcp_ping_pong) TEST_DECLARE (tcp_ping_pong_v6) @@ -117,6 +118,7 @@ HELPER_DECLARE (pipe_echo_server) TASK_LIST_START TEST_ENTRY (tty) + TEST_ENTRY (stdio_over_pipes) TEST_ENTRY (ipc) diff --git a/test/test-stdio-over-pipes.c b/test/test-stdio-over-pipes.c new file mode 100644 index 00000000..fd96fc2d --- /dev/null +++ b/test/test-stdio-over-pipes.c @@ -0,0 +1,154 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + + +static char exepath[1024]; +static size_t exepath_size = 1024; +static char* args[3]; +static uv_process_options_t options; +static int close_cb_called; +static int exit_cb_called; +static int on_read_cb_called; +static int after_write_cb_called; +uv_pipe_t out, in; +static uv_loop_t* loop; +#define OUTPUT_SIZE 1024 +static char output[OUTPUT_SIZE]; +static int output_used; + +typedef struct { + uv_write_t req; + uv_buf_t buf; +} write_req_t; + + +static void close_cb(uv_handle_t* handle) { + printf("close_cb\n"); + close_cb_called++; +} + + +static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { + printf("exit_cb\n"); + exit_cb_called++; + ASSERT(exit_status == 0); + ASSERT(term_signal == 0); + uv_close((uv_handle_t*)process, close_cb); + uv_close((uv_handle_t*)&in, close_cb); + uv_close((uv_handle_t*)&out, close_cb); +} + + +static void init_process_options(char* test, uv_exit_cb exit_cb) { + int r = uv_exepath(exepath, &exepath_size); + ASSERT(r == 0); + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = test; + args[2] = NULL; + options.file = exepath; + options.args = args; + options.exit_cb = exit_cb; +} + + +static uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { + uv_buf_t buf; + buf.base = output + output_used; + buf.len = OUTPUT_SIZE - output_used; + return buf; +} + + +static void after_write(uv_write_t* req, int status) { + write_req_t* wr; + + if (status) { + uv_err_t err = uv_last_error(loop); + fprintf(stderr, "uv_write error: %s\n", uv_strerror(err)); + ASSERT(0); + } + + wr = (write_req_t*) req; + + /* Free the read/write buffer and the request */ + free(wr); + + after_write_cb_called++; +} + + +static void on_read(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { + write_req_t* write_req; + int r; + uv_err_t err = uv_last_error(uv_default_loop()); + + ASSERT(nread > 0 || err.code == UV_EOF); + + if (nread > 0) { + output_used += nread; + if (output_used == 12) { + ASSERT(memcmp("hello world\n", output, 12) == 0); + write_req = (write_req_t*)malloc(sizeof(*write_req)); + write_req->buf = uv_buf_init(output, output_used); + r = uv_write(&write_req->req, (uv_stream_t*)&in, &write_req->buf, 1, after_write); + ASSERT(r == 0); + } + } + + on_read_cb_called++; +} + + +TEST_IMPL(stdio_over_pipes) { + int r; + uv_process_t process; + loop = uv_default_loop(); + + init_process_options("stdio_over_pipes_helper", exit_cb); + + uv_pipe_init(loop, &out, 0); + options.stdout_stream = &out; + uv_pipe_init(loop, &in, 0); + options.stdin_stream = ∈ + + r = uv_spawn(loop, &process, options); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*) &out, on_alloc, on_read); + ASSERT(r == 0); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + ASSERT(on_read_cb_called > 1); + ASSERT(after_write_cb_called == 1); + ASSERT(exit_cb_called == 1); + ASSERT(close_cb_called == 3); + ASSERT(memcmp("hello world\n", output, 12) == 0); + ASSERT(output_used == 12); + + return 0; +} + diff --git a/uv.gyp b/uv.gyp index 3503f27f..9fe0867b 100644 --- a/uv.gyp +++ b/uv.gyp @@ -286,6 +286,7 @@ 'test/test-ref.c', 'test/test-shutdown-eof.c', 'test/test-spawn.c', + 'test/test-stdio-over-pipes.c', 'test/test-tcp-bind-error.c', 'test/test-tcp-bind6-error.c', 'test/test-tcp-close.c',