diff --git a/include/uv-win.h b/include/uv-win.h index dda36fe6..e48b8a9a 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -468,8 +468,8 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); int queue_len; \ } pending_ipc_info; \ uv_write_t* non_overlapped_writes_tail; \ - uv_mutex_t readfile_mutex; \ - volatile HANDLE readfile_thread; + CRITICAL_SECTION readfile_thread_lock; \ + volatile HANDLE readfile_thread_handle; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ diff --git a/src/win/internal.h b/src/win/internal.h index cce4e204..0b185f55 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -99,7 +99,6 @@ extern UV_THREAD_LOCAL int uv__crt_assert_enabled; /* Only used by uv_pipe_t handles. */ #define UV_HANDLE_NON_OVERLAPPED_PIPE 0x01000000 #define UV_HANDLE_PIPESERVER 0x02000000 -#define UV_HANDLE_PIPE_READ_CANCELABLE 0x04000000 /* Only used by uv_tty_t handles. */ #define UV_HANDLE_TTY_READABLE 0x01000000 @@ -178,14 +177,12 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); +void uv__pipe_read_stop(uv_pipe_t* handle); int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb); int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle, const uv_buf_t bufs[], unsigned int nbufs, uv_stream_t* send_handle, uv_write_cb cb); -void uv__pipe_pause_read(uv_pipe_t* handle); -void uv__pipe_unpause_read(uv_pipe_t* handle); -void uv__pipe_stop_read(uv_pipe_t* handle); void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, uv_req_t* req); diff --git a/src/win/pipe.c b/src/win/pipe.c index 177d4127..0ef18773 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -110,7 +110,6 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->pipe.conn.pending_ipc_info.queue_len = 0; handle->ipc = ipc; handle->pipe.conn.non_overlapped_writes_tail = NULL; - handle->pipe.conn.readfile_thread = NULL; UV_REQ_INIT(&handle->pipe.conn.ipc_header_write_req, UV_UNKNOWN_REQ); @@ -123,10 +122,9 @@ static void uv_pipe_connection_init(uv_pipe_t* handle) { handle->read_req.data = handle; handle->pipe.conn.eof_timer = NULL; assert(!(handle->flags & UV_HANDLE_PIPESERVER)); - if (pCancelSynchronousIo && - handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { - uv_mutex_init(&handle->pipe.conn.readfile_mutex); - handle->flags |= UV_HANDLE_PIPE_READ_CANCELABLE; + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + handle->pipe.conn.readfile_thread_handle = NULL; + InitializeCriticalSection(&handle->pipe.conn.readfile_thread_lock); } } @@ -355,11 +353,6 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { FILE_PIPE_LOCAL_INFORMATION pipe_info; uv__ipc_queue_item_t* item; - if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - handle->flags &= ~UV_HANDLE_PIPE_READ_CANCELABLE; - uv_mutex_destroy(&handle->pipe.conn.readfile_mutex); - } - if ((handle->flags & UV_HANDLE_CONNECTION) && handle->stream.conn.shutdown_req != NULL && handle->stream.conn.write_reqs_pending == 0) { @@ -467,6 +460,9 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { handle->read_req.event_handle = NULL; } } + + if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) + DeleteCriticalSection(&handle->pipe.conn.readfile_thread_lock); } if (handle->flags & UV_HANDLE_PIPESERVER) { @@ -712,45 +708,58 @@ error: } -void uv__pipe_pause_read(uv_pipe_t* handle) { - if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - /* Pause the ReadFile task briefly, to work - around the Windows kernel bug that causes - any access to a NamedPipe to deadlock if - any process has called ReadFile */ - HANDLE h; - uv_mutex_lock(&handle->pipe.conn.readfile_mutex); - h = handle->pipe.conn.readfile_thread; - while (h) { - /* spinlock: we expect this to finish quickly, - or we are probably about to deadlock anyways - (in the kernel), so it doesn't matter */ - pCancelSynchronousIo(h); - SwitchToThread(); /* yield thread control briefly */ - h = handle->pipe.conn.readfile_thread; +void uv__pipe_interrupt_read(uv_pipe_t* handle) { + BOOL r; + + if (!(handle->flags & UV_HANDLE_READ_PENDING)) + return; /* No pending reads. */ + if (handle->flags & UV_HANDLE_CANCELLATION_PENDING) + return; /* Already cancelled. */ + if (handle->handle == INVALID_HANDLE_VALUE) + return; /* Pipe handle closed. */ + + if (!(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE)) { + /* Cancel asynchronous read. */ + r = CancelIoEx(handle->handle, &handle->read_req.u.io.overlapped); + assert(r || GetLastError() == ERROR_NOT_FOUND); + + } else { + /* Cancel synchronous read (which is happening in the thread pool). */ + HANDLE thread; + volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; + + EnterCriticalSection(&handle->pipe.conn.readfile_thread_lock); + + thread = *thread_ptr; + if (thread == NULL) { + /* The thread pool thread has not yet reached the point of blocking, we + * can pre-empt it by setting thread_handle to INVALID_HANDLE_VALUE. */ + *thread_ptr = INVALID_HANDLE_VALUE; + + } else { + /* Spin until the thread has acknowledged (by setting the thread to + * INVALID_HANDLE_VALUE) that it is past the point of blocking. */ + while (thread != INVALID_HANDLE_VALUE) { + r = CancelSynchronousIo(thread); + assert(r || GetLastError() == ERROR_NOT_FOUND); + SwitchToThread(); /* Yield thread. */ + thread = *thread_ptr; } + } + + LeaveCriticalSection(&handle->pipe.conn.readfile_thread_lock); } + + /* Set flag to indicate that read has been cancelled. */ + handle->flags |= UV_HANDLE_CANCELLATION_PENDING; } -void uv__pipe_unpause_read(uv_pipe_t* handle) { - if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - uv_mutex_unlock(&handle->pipe.conn.readfile_mutex); - } -} - - -void uv__pipe_stop_read(uv_pipe_t* handle) { - if (pCancelIoEx && - !(handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) && - !(handle->flags & UV_HANDLE_EMULATE_IOCP) && - handle->flags & UV_HANDLE_READING && - handle->read_req.type == UV_READ) { - pCancelIoEx(handle->handle, &handle->read_req.u.io.overlapped); - } +void uv__pipe_read_stop(uv_pipe_t* handle) { handle->flags &= ~UV_HANDLE_READING; - uv__pipe_pause_read((uv_pipe_t*)handle); - uv__pipe_unpause_read((uv_pipe_t*)handle); + DECREASE_ACTIVE_COUNT(handle->loop, handle); + + uv__pipe_interrupt_read(handle); } @@ -760,7 +769,7 @@ void uv_pipe_cleanup(uv_loop_t* loop, uv_pipe_t* handle) { int i; HANDLE pipeHandle; - uv__pipe_stop_read(handle); + uv__pipe_interrupt_read(handle); if (handle->name) { uv__free(handle->name); @@ -959,74 +968,75 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { } -static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* parameter) { - int result; - DWORD bytes; - uv_read_t* req = (uv_read_t*) parameter; +static DWORD WINAPI uv_pipe_zero_readfile_thread_proc(void* arg) { + uv_read_t* req = (uv_read_t*) arg; uv_pipe_t* handle = (uv_pipe_t*) req->data; uv_loop_t* loop = handle->loop; - HANDLE hThread = NULL; + volatile HANDLE* thread_ptr = &handle->pipe.conn.readfile_thread_handle; + CRITICAL_SECTION* lock = &handle->pipe.conn.readfile_thread_lock; + HANDLE prev, thread; + DWORD bytes; DWORD err; - uv_mutex_t *m = &handle->pipe.conn.readfile_mutex; - assert(req != NULL); assert(req->type == UV_READ); assert(handle->type == UV_NAMED_PIPE); - if (handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - uv_mutex_lock(m); /* mutex controls *setting* of readfile_thread */ - if (DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), - GetCurrentProcess(), &hThread, - 0, FALSE, DUPLICATE_SAME_ACCESS)) { - handle->pipe.conn.readfile_thread = hThread; - } else { - hThread = NULL; - } - uv_mutex_unlock(m); + err = 0; + + /* Create a handle to the current thread. */ + if (!DuplicateHandle(GetCurrentProcess(), + GetCurrentThread(), + GetCurrentProcess(), + &thread, + 0, + FALSE, + DUPLICATE_SAME_ACCESS)) { + err = GetLastError(); + goto out1; } -restart_readfile: - if (handle->flags & UV_HANDLE_READING) { - result = ReadFile(handle->handle, - &uv_zero_, - 0, - &bytes, - NULL); - if (!result) { - err = GetLastError(); - if (err == ERROR_OPERATION_ABORTED && - handle->flags & UV_HANDLE_PIPE_READ_CANCELABLE) { - if (handle->flags & UV_HANDLE_READING) { - /* just a brief break to do something else */ - handle->pipe.conn.readfile_thread = NULL; - /* resume after it is finished */ - uv_mutex_lock(m); - handle->pipe.conn.readfile_thread = hThread; - uv_mutex_unlock(m); - goto restart_readfile; - } else { - result = 1; /* successfully stopped reading */ - } - } - } + + /* The lock needs to be held when thread handle is modified. */ + EnterCriticalSection(lock); + if (*thread_ptr == INVALID_HANDLE_VALUE) { + /* uv__pipe_interrupt_read() cancelled reading before we got here. */ + err = ERROR_OPERATION_ABORTED; } else { - result = 1; /* successfully aborted read before it even started */ - } - if (hThread) { - assert(hThread == handle->pipe.conn.readfile_thread); - /* mutex does not control clearing readfile_thread */ - handle->pipe.conn.readfile_thread = NULL; - uv_mutex_lock(m); - /* only when we hold the mutex lock is it safe to - open or close the handle */ - CloseHandle(hThread); - uv_mutex_unlock(m); + /* Let main thread know which worker thread is doing the blocking read. */ + assert(*thread_ptr == NULL); + *thread_ptr = thread; } + LeaveCriticalSection(lock); - if (!result) { + if (err) + goto out2; + + /* Block the thread until data is available on the pipe, or the read is + * cancelled. */ + if (!ReadFile(handle->handle, &uv_zero_, 0, &bytes, NULL)) + err = GetLastError(); + + /* Let the main thread know the worker is past the point of blocking. */ + assert(thread == *thread_ptr); + *thread_ptr = INVALID_HANDLE_VALUE; + + /* Briefly acquire the mutex. Since the main thread holds the lock while it + * is spinning trying to cancel this thread's I/O, we will block here until + * it stops doing that. */ + EnterCriticalSection(lock); + LeaveCriticalSection(lock); + +out2: + /* Close the handle to the current thread. */ + CloseHandle(thread); + +out1: + /* Set request status and post a completion record to the IOCP. */ + if (err) SET_REQ_ERROR(req, err); - } - + else + SET_REQ_SUCCESS(req); POST_COMPLETION_FOR_REQ(loop, req); + return 0; } @@ -1108,6 +1118,7 @@ static void uv_pipe_queue_read(uv_loop_t* loop, uv_pipe_t* handle) { req = &handle->read_req; if (handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + handle->pipe.conn.readfile_thread_handle = NULL; /* Reset cancellation. */ if (!QueueUserWorkItem(&uv_pipe_zero_readfile_thread_proc, req, WT_EXECUTELONGFUNCTION)) { @@ -1608,10 +1619,7 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error, static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle, int error, uv_buf_t buf) { - if (error == ERROR_OPERATION_ABORTED) { - /* do nothing (equivalent to EINTR) */ - } - else if (error == ERROR_BROKEN_PIPE) { + if (error == ERROR_BROKEN_PIPE) { uv_pipe_read_eof(loop, handle, buf); } else { uv_pipe_read_error(loop, handle, error, buf); @@ -1643,12 +1651,17 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, assert(handle->type == UV_NAMED_PIPE); - handle->flags &= ~UV_HANDLE_READ_PENDING; + handle->flags &= ~(UV_HANDLE_READ_PENDING | UV_HANDLE_CANCELLATION_PENDING); eof_timer_stop(handle); if (!REQ_SUCCESS(req)) { - /* An error occurred doing the 0-read. */ - if (handle->flags & UV_HANDLE_READING) { + /* An error occurred doing the zero-read. + * Note that if the read was cancelled by uv__pipe_interrupt_read(), the + * request may indicate an ERROR_OPERATION_ABORTED error. This error isn't + * relevant to the user; we'll restart the read by queueing a new read + * request below. */ + if (handle->flags & UV_HANDLE_READING && + GET_REQ_ERROR(req) != ERROR_OPERATION_ABORTED) { uv_pipe_read_error_or_eof(loop, handle, GET_REQ_ERROR(req), @@ -1756,12 +1769,12 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle, break; } } + } - /* Post another 0-read if still reading and not closing. */ - if ((handle->flags & UV_HANDLE_READING) && - !(handle->flags & UV_HANDLE_READ_PENDING)) { - uv_pipe_queue_read(loop, handle); - } + /* Start another zero-read request if necessary. */ + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { + uv_pipe_queue_read(loop, handle); } DECREASE_PENDING_REQ_COUNT(handle); @@ -2093,7 +2106,15 @@ static int uv__pipe_getname(const uv_pipe_t* handle, char* buffer, size_t* size) return UV_EINVAL; } - uv__pipe_pause_read((uv_pipe_t*)handle); /* cast away const warning */ + /* NtQueryInformationFile will block if another thread is performing a + * blocking operation on the queried handle. If the pipe handle is + * synchronous, there may be a worker thread currently calling ReadFile() on + * the pipe handle, which could cause a deadlock. To avoid this, interrupt + * the read. */ + if (handle->flags & UV_HANDLE_CONNECTION && + handle->flags & UV_HANDLE_NON_OVERLAPPED_PIPE) { + uv__pipe_interrupt_read((uv_pipe_t*) handle); /* cast away const warning */ + } nt_status = pNtQueryInformationFile(handle->handle, &io_status, @@ -2184,7 +2205,6 @@ error: uv__free(name_info); cleanup: - uv__pipe_unpause_read((uv_pipe_t*)handle); /* cast away const warning */ return err; } diff --git a/src/win/stream.c b/src/win/stream.c index 13cbfdcb..6539dc9a 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -105,12 +105,10 @@ int uv_read_stop(uv_stream_t* handle) { err = 0; if (handle->type == UV_TTY) { err = uv_tty_read_stop((uv_tty_t*) handle); + } else if (handle->type == UV_NAMED_PIPE) { + uv__pipe_read_stop((uv_pipe_t*) handle); } else { - if (handle->type == UV_NAMED_PIPE) { - uv__pipe_stop_read((uv_pipe_t*) handle); - } else { - handle->flags &= ~UV_HANDLE_READING; - } + handle->flags &= ~UV_HANDLE_READING; DECREASE_ACTIVE_COUNT(handle->loop, handle); } diff --git a/src/win/winapi.c b/src/win/winapi.c index c3307861..c2097373 100644 --- a/src/win/winapi.c +++ b/src/win/winapi.c @@ -39,13 +39,11 @@ sNtQuerySystemInformation pNtQuerySystemInformation; sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx; sSetFileCompletionNotificationModes pSetFileCompletionNotificationModes; sCreateSymbolicLinkW pCreateSymbolicLinkW; -sCancelIoEx pCancelIoEx; sInitializeConditionVariable pInitializeConditionVariable; sSleepConditionVariableCS pSleepConditionVariableCS; sSleepConditionVariableSRW pSleepConditionVariableSRW; sWakeAllConditionVariable pWakeAllConditionVariable; sWakeConditionVariable pWakeConditionVariable; -sCancelSynchronousIo pCancelSynchronousIo; sGetFinalPathNameByHandleW pGetFinalPathNameByHandleW; @@ -133,9 +131,6 @@ void uv_winapi_init(void) { pCreateSymbolicLinkW = (sCreateSymbolicLinkW) GetProcAddress(kernel32_module, "CreateSymbolicLinkW"); - pCancelIoEx = (sCancelIoEx) - GetProcAddress(kernel32_module, "CancelIoEx"); - pInitializeConditionVariable = (sInitializeConditionVariable) GetProcAddress(kernel32_module, "InitializeConditionVariable"); @@ -151,9 +146,6 @@ void uv_winapi_init(void) { pWakeConditionVariable = (sWakeConditionVariable) GetProcAddress(kernel32_module, "WakeConditionVariable"); - pCancelSynchronousIo = (sCancelSynchronousIo) - GetProcAddress(kernel32_module, "CancelSynchronousIo"); - pGetFinalPathNameByHandleW = (sGetFinalPathNameByHandleW) GetProcAddress(kernel32_module, "GetFinalPathNameByHandleW"); diff --git a/src/win/winapi.h b/src/win/winapi.h index 374b1a96..be3529ff 100644 --- a/src/win/winapi.h +++ b/src/win/winapi.h @@ -4659,10 +4659,6 @@ typedef BOOLEAN (WINAPI* sCreateSymbolicLinkW) LPCWSTR lpTargetFileName, DWORD dwFlags); -typedef BOOL (WINAPI* sCancelIoEx) - (HANDLE hFile, - LPOVERLAPPED lpOverlapped); - typedef VOID (WINAPI* sInitializeConditionVariable) (PCONDITION_VARIABLE ConditionVariable); @@ -4683,9 +4679,6 @@ typedef VOID (WINAPI* sWakeAllConditionVariable) typedef VOID (WINAPI* sWakeConditionVariable) (PCONDITION_VARIABLE ConditionVariable); -typedef BOOL (WINAPI* sCancelSynchronousIo) - (HANDLE hThread); - typedef DWORD (WINAPI* sGetFinalPathNameByHandleW) (HANDLE hFile, LPWSTR lpszFilePath, @@ -4759,13 +4752,11 @@ extern sNtQuerySystemInformation pNtQuerySystemInformation; extern sGetQueuedCompletionStatusEx pGetQueuedCompletionStatusEx; extern sSetFileCompletionNotificationModes pSetFileCompletionNotificationModes; extern sCreateSymbolicLinkW pCreateSymbolicLinkW; -extern sCancelIoEx pCancelIoEx; extern sInitializeConditionVariable pInitializeConditionVariable; extern sSleepConditionVariableCS pSleepConditionVariableCS; extern sSleepConditionVariableSRW pSleepConditionVariableSRW; extern sWakeAllConditionVariable pWakeAllConditionVariable; extern sWakeConditionVariable pWakeConditionVariable; -extern sCancelSynchronousIo pCancelSynchronousIo; extern sGetFinalPathNameByHandleW pGetFinalPathNameByHandleW;