diff --git a/include/uv-win.h b/include/uv-win.h index d9f65ba5..12588e96 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -96,7 +96,6 @@ typedef struct uv_buf_t { struct uv_req_s accept_req; \ #define uv_pipe_server_fields \ - char* name; \ uv_pipe_accept_t accept_reqs[4]; \ uv_pipe_accept_t* pending_accepts; @@ -104,6 +103,7 @@ typedef struct uv_buf_t { HANDLE handle; #define UV_PIPE_PRIVATE_FIELDS \ + char* name; \ union { \ struct { uv_pipe_server_fields }; \ struct { uv_pipe_connection_fields }; \ diff --git a/src/uv-win.c b/src/uv-win.c index 838b6d31..8241340a 100644 --- a/src/uv-win.c +++ b/src/uv-win.c @@ -222,6 +222,7 @@ static char uv_zero_[] = ""; /* mark if IPv6 sockets are supported */ static BOOL uv_allow_ipv6 = FALSE; + /* * Subclass of uv_handle_t. Used for integration of c-ares. */ @@ -374,6 +375,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ERROR_NO_UNICODE_TRANSLATION: return UV_ECHARSET; case ERROR_BROKEN_PIPE: return UV_EOF; case ERROR_PIPE_BUSY: return UV_EBUSY; + case ERROR_SEM_TIMEOUT: return UV_ETIMEDOUT; default: return UV_UNKNOWN; } } @@ -517,6 +519,7 @@ void uv_init() { static void uv_req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; + req->error = uv_ok_; } @@ -1028,10 +1031,14 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle, uv_pipe_accept_t* req) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); - if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && - GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { - /* Make this req pending reporting an error. */ - req->error = uv_new_sys_error(GetLastError()); + if (!ConnectNamedPipe(pipeHandle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING) { + if (GetLastError() == ERROR_PIPE_CONNECTED) { + req->pipeHandle = pipeHandle; + req->error = uv_ok_; + } else { + /* Make this req pending reporting an error. */ + req->error = uv_new_sys_error(GetLastError()); + } uv_insert_pending_req((uv_req_t*) req); handle->reqs_pending++; return; @@ -2314,9 +2321,7 @@ static void uv_poll() { /* Package was dequeued */ req = uv_overlapped_to_req(overlapped); - if (success) { - req->error = uv_ok_; - } else { + if (!success) { req->error = uv_new_sys_error(GetLastError()); } @@ -2970,6 +2975,7 @@ int uv_pipe_init(uv_pipe_t* handle) { handle->type = UV_NAMED_PIPE; handle->reqs_pending = 0; handle->pending_accepts = NULL; + handle->name = NULL; uv_counters()->pipe_init++; @@ -3033,48 +3039,120 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { return 0; } + +static int uv_set_pipe_handle(uv_pipe_t* handle, HANDLE pipeHandle) { + DWORD mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + + if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { + return -1; + } + + if (CreateIoCompletionPort(pipeHandle, + uv_iocp_, + (ULONG_PTR)handle, + 0) == NULL) { + return -1; + } + + return 0; +} + + +static DWORD WINAPI pipe_connect_thread_proc(void* parameter) { + HANDLE pipeHandle = INVALID_HANDLE_VALUE; + int errno; + uv_pipe_t* handle; + uv_connect_t* req; + + req = (uv_connect_t*)parameter; + assert(req); + handle = (uv_pipe_t*)req->handle; + assert(handle); + + /* We're here because CreateFile on a pipe returned ERROR_PIPE_BUSY. We wait for the pipe to become available with WaitNamedPipe. */ + while (WaitNamedPipe(handle->name, 30000)) { + /* The pipe is now available, try to connect. */ + pipeHandle = CreateFile(handle->name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); + + if (pipeHandle != INVALID_HANDLE_VALUE) { + break; + } + } + + if (pipeHandle != INVALID_HANDLE_VALUE && !uv_set_pipe_handle(handle, pipeHandle)) { + handle->handle = pipeHandle; + req->error = uv_ok_; + } else { + req->error = uv_new_sys_error(GetLastError()); + } + + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + /* Post completed */ + if (!PostQueuedCompletionStatus(uv_iocp_, + 0, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + + return 0; +} + + /* TODO: make this work with UTF8 name */ -/* TODO: run this in the thread pool */ int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb) { int errno; - DWORD mode; + HANDLE pipeHandle; + + handle->handle = INVALID_HANDLE_VALUE; uv_req_init((uv_req_t*) req); req->type = UV_CONNECT; req->handle = (uv_stream_t*) handle; req->cb = cb; - memset(&req->overlapped, 0, sizeof(req->overlapped)); + pipeHandle = CreateFile(name, + GENERIC_READ | GENERIC_WRITE, + 0, + NULL, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + NULL); - handle->handle = CreateFile(name, - GENERIC_READ | GENERIC_WRITE, - 0, - NULL, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - NULL); + if (pipeHandle == INVALID_HANDLE_VALUE) { + if (GetLastError() == ERROR_PIPE_BUSY) { + /* Wait for the server to make a pipe instance available. */ + handle->name = strdup(name); + if (!handle->name) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } - if (handle->handle == INVALID_HANDLE_VALUE && - GetLastError() != ERROR_IO_PENDING) { + if (!QueueUserWorkItem(&pipe_connect_thread_proc, req, WT_EXECUTELONGFUNCTION)) { + errno = GetLastError(); + goto error; + } + + return 0; + } + + errno = GetLastError(); + goto error; + } + + if (uv_set_pipe_handle((uv_pipe_t*)req->handle, pipeHandle)) { errno = GetLastError(); goto error; } - mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - - if (!SetNamedPipeHandleState(handle->handle, &mode, NULL, NULL)) { - errno = GetLastError(); - goto error; - } - - if (CreateIoCompletionPort(handle->handle, - uv_iocp_, - (ULONG_PTR)handle, - 0) == NULL) { - errno = GetLastError(); - goto error; - } + handle->handle = pipeHandle; req->error = uv_ok_; uv_insert_pending_req((uv_req_t*) req); @@ -3082,13 +3160,11 @@ int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, return 0; error: - if (handle->handle != INVALID_HANDLE_VALUE) { - CloseHandle(handle->handle); + if (pipeHandle != INVALID_HANDLE_VALUE) { + CloseHandle(pipeHandle); } - req->error = uv_new_sys_error(errno); - uv_insert_pending_req((uv_req_t*) req); - handle->reqs_pending++; - return 0; + uv_set_sys_error(errno); + return -1; } @@ -3097,6 +3173,11 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { int i; HANDLE pipeHandle; + if (handle->name) { + free(handle->name); + handle->name; + } + if (handle->flags & UV_HANDLE_PIPESERVER) { for (i = 0; i < COUNTOF(handle->accept_reqs); i++) { pipeHandle = handle->accept_reqs[i].pipeHandle; @@ -3105,7 +3186,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { } } - } else { + } else if (handle->handle != INVALID_HANDLE_VALUE) { CloseHandle(handle->handle); } diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index d7524f74..1732e84f 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -261,13 +261,6 @@ static void maybe_connect_some() { req = (uv_connect_t*) req_alloc(); r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb); ASSERT(r == 0); - -#ifdef _WIN32 - /* HACK: This is temporary to give the pipes server enough time to create new handles. - * This will go away once uv_pipe_connect can deal with UV_EBUSY. - */ - Sleep(1); -#endif } } }