diff --git a/include/uv-win.h b/include/uv-win.h index 136b0b45..c6c8394a 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -443,6 +443,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); int queue_len; \ } pending_ipc_info; \ uv_write_t* non_overlapped_writes_tail; \ + uv_mutex_t readfile_mutex; \ + volatile HANDLE readfile_thread; \ void* reserved; #define UV_PIPE_PRIVATE_FIELDS \ diff --git a/src/win/internal.h b/src/win/internal.h index 9eadb712..d7aae53e 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -100,6 +100,7 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled; /* Only used by uv_pipe_t handles. */ #define UV_HANDLE_NON_OVERLAPPED_PIPE 0x01000000 #define UV_HANDLE_PIPESERVER 0x02000000 +#define UV_HANDLE_PIPE_READ_CANCELABLE 0x04000000 /* Only used by uv_tty_t handles. */ #define UV_HANDLE_TTY_READABLE 0x01000000 @@ -181,6 +182,9 @@ int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle, uv_write_cb cb); +void uv__pipe_pause_read(uv_pipe_t* handle); +void uv__pipe_unpause_read(uv_pipe_t* handle); +void uv__pipe_stop_read(uv_pipe_t* handle); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); diff --git a/src/win/pipe.c b/src/win/pipe.c index 3bf2a220..c78051db 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -101,6 +101,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->pending_ipc_info.queue_len = 0; handle->ipc = ipc; handle->non_overlapped_writes_tail = NULL; + handle->readfile_thread = NULL; uv_req_init(loop, (uv_req_t*) &handle->ipc_header_write_req); @@ -112,6 +113,12 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) { uv_connection_init((uv_stream_t*) handle); handle->read_req.data = handle; handle->eof_timer = NULL; + assert(!(handle->flags & UV_HANDLE_PIPESERVER)); + if (pCancelSynchronousIo && + handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + uv_mutex_init(&handle->readfile_mutex); + handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE; + } } @@ -321,6 +328,11 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { FILE_PIPE_LOCAL_INFORMATION pipe_info; uv__ipc_queue_item_t* item; + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE; + uv_mutex_destroy(&handle->readfile_mutex); + } + if ((handle->flags & UV_HANDLE_CONNECTION) && handle->shutdown_req != NULL && handle->write_reqs_pending == 0) { @@ -658,12 +670,49 @@ error: } +void uv__pipe_pause_read(uv_pipe_t* handle) { + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + /* Pause the ReadFile task briefly, to work + around the Windows kernel bug that causes + any access to a NamedPipe to deadlock if + any process has called ReadFile */ + HANDLE h; + uv_mutex_lock(&handle->readfile_mutex); + h = handle->readfile_thread; + while (h) { + /* spinlock: we expect this to finish quickly, + or we are probably about to deadlock anyways + (in the kernel), so it doesn't matter */ + pCancelSynchronousIo(h); + SwitchToThread(); /* yield thread control briefly */ + h = handle->readfile_thread; + } + } +} + + +void uv__pipe_unpause_read(uv_pipe_t* handle) { + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + uv_mutex_unlock(&handle->readfile_mutex); + } +} + + +void uv__pipe_stop_read(uv_pipe_t* handle) { + handle->flags &= ~UV_HANDLE_READING; + uv__pipe_pause_read((uv_pipe_t*)handle); + uv__pipe_unpause_read((uv_pipe_t*)handle); +} + + /* Cleans up uv_pipe_t (server or connection) and all resources associated */ /* with it. */ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { int i; HANDLE pipeHandle; + uv__pipe_stop_read(handle); + if (handle->name) { free(handle->name); handle->name = NULL; @@ -689,6 +738,7 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { CloseHandle(handle->handle); handle->handle = INVALID_HANDLE_VALUE; } + } @@ -867,19 +917,61 @@ static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) { uv_read_t* req = (uv_read_t*) parameter; uv_pipe_t* handle = (uv_pipe_t*) req->data; uv_loop_t* loop = handle->loop; + HANDLE hThread = NULL; + DWORD err; + uv_mutex_t *m = &handle->readfile_mutex; assert(req != NULL); assert(req->type == UV_READ); assert(handle->type == UV_NAMED_PIPE); + if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */ + if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &hThread, + 0, TRUE, DUPLICATE_SAME_ACCESS)) { + handle->readfile_thread = hThread; + } else { + hThread = NULL; + } + uv_mutex_unlock(m); + } +restart_readfile: result = ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL); + if (!result) { + err = GetLastError(); + if (err == ERROR_OPERATION_ABORTED && + handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { + if (handle->flags & UV_HANDLE_READING) { + /* just a brief break to do something else */ + handle->readfile_thread = NULL; + /* resume after it is finished */ + uv_mutex_lock(m); + handle->readfile_thread = hThread; + uv_mutex_unlock(m); + goto restart_readfile; + } else { + result = 1; /* successfully stopped reading */ + } + } + } + if (hThread) { + assert(hThread == handle->readfile_thread); + /* mutex does not control clearing readfile_thread */ + handle->readfile_thread = NULL; + uv_mutex_lock(m); + /* only when we hold the mutex lock is it safe to + open or close the handle */ + CloseHandle(hThread); + uv_mutex_unlock(m); + } if (!result) { - SET_REQ_ERROR(req, GetLastError()); + SET_REQ_ERROR(req, err); } POST_COMPLETION_FOR_REQ(loop, req); @@ -1836,6 +1928,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) { return UV_EINVAL; } + uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */ + nt_status = pNtQueryInformationFile(handle->handle, &io_status, &tmp_name_info, @@ -1846,7 +1940,8 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) { name_info = malloc(name_size); if (!name_info) { *len = 0; - return UV_ENOMEM; + err = UV_ENOMEM; + goto cleanup; } nt_status = pNtQueryInformationFile(handle->handle, @@ -1918,10 +2013,14 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) { buf[addrlen++] = '\0'; *len = addrlen; - return 0; + err = 0; + goto cleanup; error: free(name_info); + +cleanup: + uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */ return err; } diff --git a/src/win/stream.c b/src/win/stream.c index 6553ab11..057f72ec 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -106,7 +106,11 @@ int uv_read_stop(uv_stream_t* handle) { if (handle->type == UV_TTY) { err = uv_tty_read_stop((uv_tty_t*) handle); } else { - handle->flags &= ~UV_HANDLE_READING; + if (handle->type == UV_NAMED_PIPE) { + uv__pipe_stop_read((uv_pipe_t*) handle); + } else { + handle->flags &= ~UV_HANDLE_READING; + } DECREASE_ACTIVE_COUNT(handle->loop, handle); } diff --git a/src/win/winapi.c b/src/win/winapi.c index 3e439ea5..84ce73e3 100644 --- a/src/win/winapi.c +++ b/src/win/winapi.c @@ -51,6 +51,7 @@ sSleepConditionVariableCS pSleepConditionVariableCS; sSleepConditionVariableSRW pSleepConditionVariableSRW; sWakeAllConditionVariable pWakeAllConditionVariable; sWakeConditionVariable pWakeConditionVariable; +sCancelSynchronousIo pCancelSynchronousIo; void uv_winapi_init() { @@ -156,4 +157,7 @@ void uv_winapi_init() { pWakeConditionVariable = (sWakeConditionVariable) GetProcAddress(kernel32_module, "WakeConditionVariable"); + + pCancelSynchronousIo = (sCancelSynchronousIo) + GetProcAddress(kernel32_module, "CancelSynchronousIo"); } diff --git a/src/win/winapi.h b/src/win/winapi.h index 21d7fe4a..1bb0e9aa 100644 --- a/src/win/winapi.h +++ b/src/win/winapi.h @@ -4617,6 +4617,8 @@ typedef VOID (WINAPI* sWakeAllConditionVariable) typedef VOID (WINAPI* sWakeConditionVariable) (PCONDITION_VARIABLE ConditionVariable); +typedef BOOL (WINAPI* sCancelSynchronousIo) + (HANDLE hThread); /* Ntdll function pointers */ extern sRtlNtStatusToDosError pRtlNtStatusToDosError; @@ -4644,5 +4646,6 @@ extern sSleepConditionVariableCS pSleepConditionVariableCS; extern sSleepConditionVariableSRW pSleepConditionVariableSRW; extern sWakeAllConditionVariable pWakeAllConditionVariable; extern sWakeConditionVariable pWakeConditionVariable; +extern sCancelSynchronousIo pCancelSynchronousIo; #endif /* UV_WIN_WINAPI_H_ */ diff --git a/test/test-list.h b/test/test-list.h index 6dbe2230..0ce4b4a6 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -109,6 +109,7 @@ TEST_DECLARE (pipe_connect_bad_name) TEST_DECLARE (pipe_connect_to_file) TEST_DECLARE (pipe_getsockname) TEST_DECLARE (pipe_getsockname_abstract) +TEST_DECLARE (pipe_getsockname_blocking) TEST_DECLARE (pipe_sendmsg) TEST_DECLARE (pipe_server_close) TEST_DECLARE (connection_fail) @@ -402,6 +403,7 @@ TASK_LIST_START TEST_ENTRY (pipe_listen_without_bind) TEST_ENTRY (pipe_getsockname) TEST_ENTRY (pipe_getsockname_abstract) + TEST_ENTRY (pipe_getsockname_blocking) TEST_ENTRY (pipe_sendmsg) TEST_ENTRY (connection_fail) diff --git a/test/test-pipe-getsockname.c b/test/test-pipe-getsockname.c index 396f7257..d4010f3b 100644 --- a/test/test-pipe-getsockname.c +++ b/test/test-pipe-getsockname.c @@ -32,6 +32,8 @@ #ifndef _WIN32 # include /* close */ +#else +# include #endif @@ -120,3 +122,59 @@ TEST_IMPL(pipe_getsockname_abstract) { #endif } +TEST_IMPL(pipe_getsockname_blocking) { +#ifdef _WIN32 + uv_pipe_t reader; + HANDLE readh, writeh; + int readfd; + char buf1[1024], buf2[1024]; + size_t len1, len2; + int r; + + r = CreatePipe(&readh, &writeh, NULL, 65536); + ASSERT(r != 0); + + r = uv_pipe_init(uv_default_loop(), &reader, 0); + ASSERT(r == 0); + readfd = _open_osfhandle((intptr_t)readh, _O_RDONLY); + ASSERT(r != -1); + r = uv_pipe_open(&reader, readfd); + ASSERT(r == 0); + r = uv_read_start((uv_stream_t*)&reader, NULL, NULL); + ASSERT(r == 0); + Sleep(100); + r = uv_read_stop((uv_stream_t*)&reader); + ASSERT(r == 0); + + len1 = sizeof buf1; + r = uv_pipe_getsockname(&reader, buf1, &len1); + ASSERT(r == 0); + + r = uv_read_start((uv_stream_t*)&reader, NULL, NULL); + ASSERT(r == 0); + Sleep(100); + + len2 = sizeof buf2; + r = uv_pipe_getsockname(&reader, buf2, &len2); + ASSERT(r == 0); + + r = uv_read_stop((uv_stream_t*)&reader); + ASSERT(r == 0); + + ASSERT(len1 == len2); + ASSERT(memcmp(buf1, buf2, len1) == 0); + + close_cb_called = 0; + uv_close((uv_handle_t*)&reader, close_cb); + + uv_run(uv_default_loop(), UV_RUN_DEFAULT); + + ASSERT(close_cb_called == 1); + + _close(readfd); + CloseHandle(writeh); +#endif + + MAKE_VALGRIND_HAPPY(); + return 0; +}