diff --git a/include/uv.h b/include/uv.h index f0ec376b..9c24c789 100644 --- a/include/uv.h +++ b/include/uv.h @@ -851,7 +851,8 @@ enum { * uv_pipe_t is a subclass of uv_stream_t. * * Representing a pipe stream or pipe server. On Windows this is a Named - * Pipe. On Unix this is a Unix domain socket. + * Pipe or a Unix domain socket depends on the path name. + * On Unix this is a Unix domain socket. */ struct uv_pipe_s { UV_HANDLE_FIELDS diff --git a/include/uv/win.h b/include/uv/win.h index 58d10b8d..b6d54198 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -468,7 +468,8 @@ typedef struct { #define uv_pipe_server_fields \ int pending_instances; \ uv_pipe_accept_t* accept_reqs; \ - uv_pipe_accept_t* pending_accepts; + uv_pipe_accept_t* pending_accepts; \ + LPFN_ACCEPTEX func_acceptex; #define uv_pipe_connection_fields \ uv_timer_t* eof_timer; \ @@ -482,11 +483,15 @@ typedef struct { int ipc_xfer_queue_length; \ uv_write_t* non_overlapped_writes_tail; \ CRITICAL_SECTION readfile_thread_lock; \ - volatile HANDLE readfile_thread_handle; + volatile HANDLE readfile_thread_handle; \ + LPFN_CONNECTEX func_connectex; #define UV_PIPE_PRIVATE_FIELDS \ HANDLE handle; \ - WCHAR* name; \ + union { \ + char* pathname; \ + WCHAR* name; \ + }; \ union { \ struct { uv_pipe_server_fields } serv; \ struct { uv_pipe_connection_fields } conn; \ diff --git a/src/uv-common.h b/src/uv-common.h index 372f0c4b..5c7177e3 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -122,6 +122,7 @@ enum { /* Only used by uv_pipe_t handles. */ UV_HANDLE_NON_OVERLAPPED_PIPE = 0x01000000, UV_HANDLE_PIPESERVER = 0x02000000, + UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET = 0x04000000, /* Only used by uv_tty_t handles. */ UV_HANDLE_TTY_READABLE = 0x01000000, diff --git a/src/win/pipe.c b/src/win/pipe.c index d05bfd28..8cc98f63 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -34,6 +34,7 @@ #include #include +#include /* A zero-size buffer for use by uv_pipe_read */ static char uv_zero_[] = ""; @@ -98,6 +99,25 @@ static void eof_timer_destroy(uv_pipe_t* pipe); static void eof_timer_close_cb(uv_handle_t* handle); +static int uv__is_named_pipe_prefix(const char *s) { + /* Tell if the name is started by the named pipe prefix */ + return strstr(s, pipe_prefix) == s; +} + + +static void uv__close_pipe_handle(uv_pipe_t* handle, HANDLE h) { + /* Microsoft says don't close socket with CloseHandle, so when we use + * unix domain socket we should use closesocket instead of CloseHandle. + * https://learn.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-closehandle + */ + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + closesocket((SOCKET) h); + } else { + CloseHandle(h); + } +} + + /* Does the file path contain embedded nul bytes? */ static int includes_nul(const char *s, size_t n) { if (n == 0) @@ -123,6 +143,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->pipe.conn.ipc_xfer_queue_length = 0; handle->ipc = ipc; handle->pipe.conn.non_overlapped_writes_tail = NULL; + handle->pipe.serv.func_acceptex = NULL; + handle->pipe.conn.func_connectex = NULL; return 0; } @@ -197,7 +219,7 @@ static HANDLE open_named_pipe(const WCHAR* name, DWORD* duplex_flags) { static void close_pipe(uv_pipe_t* pipe) { assert(pipe->u.fd == -1 || pipe->u.fd > 2); if (pipe->u.fd == -1) - CloseHandle(pipe->handle); + uv__close_pipe_handle(pipe, pipe->handle); else _close(pipe->u.fd); @@ -205,7 +227,7 @@ static void close_pipe(uv_pipe_t* pipe) { pipe->handle = INVALID_HANDLE_VALUE; } - +/* Not yet support unix domain socket */ static int uv__pipe_server( HANDLE* pipeHandle_ptr, DWORD access, char* name, size_t nameSize, unsigned long long random) { @@ -246,6 +268,7 @@ static int uv__pipe_server( } +/* Not yet support unix domain socket */ static int uv__create_pipe_pair( HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr, unsigned int server_flags, unsigned int client_flags, @@ -346,6 +369,7 @@ static int uv__create_pipe_pair( } +/* Not yet support unix domain socket */ int uv_pipe(uv_file fds[2], int read_flags, int write_flags) { uv_file temp[2]; int err; @@ -391,6 +415,7 @@ int uv_pipe(uv_file fds[2], int read_flags, int write_flags) { } +/* Not yet support unix domain socket */ int uv__create_stdio_pipe_pair(uv_loop_t* loop, uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) { /* The parent_pipe is always the server_pipe and kept by libuv. @@ -479,7 +504,9 @@ static int uv__set_pipe_handle(uv_loop_t* loop, if (handle->handle != INVALID_HANDLE_VALUE) return UV_EBUSY; - if (!SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { + /* Skip if the handle is a unix domain socket */ + if (!(handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) && + !SetNamedPipeHandleState(pipeHandle, &mode, NULL, NULL)) { err = GetLastError(); if (err == ERROR_ACCESS_DENIED) { /* @@ -526,6 +553,9 @@ static int uv__set_pipe_handle(uv_loop_t* loop, loop->iocp, (ULONG_PTR) handle, 0) == NULL) { + /* Unix domain socket should always support IOCP. */ + assert(!(handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET)); + handle->flags |= UV_HANDLE_EMULATE_IOCP; } } @@ -538,7 +568,54 @@ static int uv__set_pipe_handle(uv_loop_t* loop, } -static int pipe_alloc_accept(uv_loop_t* loop, uv_pipe_t* handle, +static int pipe_alloc_accept_unix_domain_socket(uv_loop_t* loop, uv_pipe_t* handle, + uv_pipe_accept_t* req, const char * name, int* err, BOOL firstInstance) { + assert(req->pipeHandle == INVALID_HANDLE_VALUE); + + /* Create a non bound socket for AcceptEx second parameter. */ + SOCKET accept_fd = socket(AF_UNIX, SOCK_STREAM, IPPROTO_IP); + if (accept_fd == INVALID_SOCKET) { + *err = WSAGetLastError(); + return 0; + } + req->pipeHandle = (HANDLE) accept_fd; + + if (firstInstance) { + /* First instance, only possible at bind, create the server socket. */ + SOCKET server_fd = socket(AF_UNIX, SOCK_STREAM, IPPROTO_IP); + if (server_fd == INVALID_SOCKET) { + *err = WSAGetLastError(); + return 0; + } + + struct sockaddr_un addr = {0}; + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, name); + + int ret = bind(server_fd, (const struct sockaddr*)&addr, sizeof(struct sockaddr_un)); + if (ret == SOCKET_ERROR) { + *err = WSAGetLastError(); + closesocket(server_fd); + return 0; + } + + /* Associate it with IOCP so we can get events. */ + if (CreateIoCompletionPort((HANDLE) server_fd, + loop->iocp, + (ULONG_PTR) handle, + 0) == NULL) { + uv_fatal_error(GetLastError(), "CreateIoCompletionPort"); + } + + /* First instance, save for AcceptEx first parameter. */ + handle->handle = (HANDLE) server_fd; + } + + return 1; +} + + +static int pipe_alloc_accept_named_pipe(uv_loop_t* loop, uv_pipe_t* handle, uv_pipe_accept_t* req, BOOL firstInstance) { assert(req->pipeHandle == INVALID_HANDLE_VALUE); @@ -711,7 +788,7 @@ int uv_pipe_bind2(uv_pipe_t* handle, size_t namelen, unsigned int flags) { uv_loop_t* loop = handle->loop; - int i, err; + int i, err = 0; uv_pipe_accept_t* req; char* name_copy; @@ -731,6 +808,18 @@ int uv_pipe_bind2(uv_pipe_t* handle, return UV_EINVAL; } + int use_win_named_pipe = uv__is_named_pipe_prefix(name); + + if (!use_win_named_pipe) { + if (flags & UV_PIPE_NO_TRUNCATE) + if (namelen > UNIX_PATH_MAX) + return UV_EINVAL; + + if (namelen > UNIX_PATH_MAX) + namelen = UNIX_PATH_MAX; + } + + /* Already bound? */ if (handle->flags & UV_HANDLE_BOUND) { return UV_EINVAL; } @@ -751,10 +840,17 @@ int uv_pipe_bind2(uv_pipe_t* handle, handle->pipe.serv.pending_instances = default_pending_pipe_instances; } - err = UV_ENOMEM; + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + /* Only use 1 pending instance when use unix domain socket, cause + * call AcceptEx multiple times seems result in multiple accept events. + * Not the expected queue behavior, that only one of them is triggered. */ + handle->pipe.serv.pending_instances = 1; + } + handle->pipe.serv.accept_reqs = (uv_pipe_accept_t*) uv__malloc(sizeof(uv_pipe_accept_t) * handle->pipe.serv.pending_instances); if (handle->pipe.serv.accept_reqs == NULL) { + err = UV_ENOMEM; goto error; } @@ -766,32 +862,51 @@ int uv_pipe_bind2(uv_pipe_t* handle, req->next_pending = NULL; } - /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ - err = uv__convert_utf8_to_utf16(name_copy, &handle->name); - uv__free(name_copy); - name_copy = NULL; + if (use_win_named_pipe) { + /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ + err = uv__convert_utf8_to_utf16(name_copy, &handle->name); + uv__free(name_copy); + name_copy = NULL; + } else { + /* Use unix domain socket we save the original path name copy. */ + handle->pathname = name_copy; + name_copy = NULL; + } if (err) { goto error; } - /* - * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. - * If this fails then there's already a pipe server for the given pipe name. - */ - if (!pipe_alloc_accept(loop, - handle, - &handle->pipe.serv.accept_reqs[0], - TRUE)) { - err = GetLastError(); - if (err == ERROR_ACCESS_DENIED) { - err = UV_EADDRINUSE; - } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { - err = UV_EACCES; - } else { - err = uv_translate_sys_error(err); + if (!use_win_named_pipe) { + /* + * Prefix not match named pipe, use unix domain socket. + */ + int uds_err = 0; + handle->flags |= UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET; + if (!pipe_alloc_accept_unix_domain_socket( + loop, handle, &handle->pipe.serv.accept_reqs[0], name, &uds_err, TRUE)) { + err = uv_translate_sys_error(uds_err); + goto error; + } + } else { + /* + * Attempt to create the first pipe with FILE_FLAG_FIRST_PIPE_INSTANCE. + * If this fails then there's already a pipe server for the given pipe name. + */ + if (!pipe_alloc_accept_named_pipe(loop, + handle, + &handle->pipe.serv.accept_reqs[0], + TRUE)) { + err = GetLastError(); + if (err == ERROR_ACCESS_DENIED) { + err = UV_EADDRINUSE; + } else if (err == ERROR_PATH_NOT_FOUND || err == ERROR_INVALID_NAME) { + err = UV_EACCES; + } else { + err = uv_translate_sys_error(err); + } + goto error; } - goto error; } handle->pipe.serv.pending_accepts = NULL; @@ -880,7 +995,7 @@ int uv_pipe_connect2(uv_connect_t* req, unsigned int flags, uv_connect_cb cb) { uv_loop_t* loop; - int err; + int err = 0; size_t nameSize; HANDLE pipeHandle = INVALID_HANDLE_VALUE; DWORD duplex_flags; @@ -893,6 +1008,7 @@ int uv_pipe_connect2(uv_connect_t* req, req->u.connect.pipeHandle = INVALID_HANDLE_VALUE; req->u.connect.duplex_flags = 0; req->u.connect.name = NULL; + memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped)); if (flags & ~UV_PIPE_NO_TRUNCATE) { return UV_EINVAL; @@ -910,6 +1026,8 @@ int uv_pipe_connect2(uv_connect_t* req, return UV_EINVAL; } + int use_win_named_pipe = uv__is_named_pipe_prefix(name); + name_copy = uv__malloc(namelen + 1); if (name_copy == NULL) { return UV_ENOMEM; @@ -928,16 +1046,79 @@ int uv_pipe_connect2(uv_connect_t* req, } uv__pipe_connection_init(handle); - /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ - err = uv__convert_utf8_to_utf16(name_copy, &handle->name); - uv__free(name_copy); - name_copy = NULL; + if (use_win_named_pipe) { + /* TODO(bnoordhuis) Add converters that take a |length| parameter. */ + err = uv__convert_utf8_to_utf16(name_copy, &handle->name); + uv__free(name_copy); + name_copy = NULL; + } else { + /* Use unix domain socket we save the original path name copy. */ + handle->pathname = name_copy; + name_copy = NULL; + } if (err) { err = ERROR_NO_UNICODE_TRANSLATION; goto error; } + if (!use_win_named_pipe) { + /* + * If prefix is not "\\.\pipe", we assume it is a Unix Domain Socket. + * Although "NamedPipe" is a Windows concept, however in libuv, it has + * been abstracted to be a general concept that can be used on all platforms. + * Thus, we use Unix Domain Socket as a "named_pipe" backend when the prefix + * of the pipe is not matched. + */ + + SOCKET client_fd = socket(AF_UNIX, SOCK_STREAM, IPPROTO_IP); + if (client_fd == INVALID_SOCKET) { + err = WSAGetLastError(); + goto error; + } + + struct sockaddr_un addr = {0}; + addr.sun_family = AF_UNIX; + strcpy(addr.sun_path, name); + + /* Load function ConnectEx */ + if (!handle->pipe.conn.func_connectex) { + if (!uv__get_connectex_function((SOCKET) handle->handle, &handle->pipe.conn.func_connectex)) { + err = WSAEAFNOSUPPORT; + goto error; + } + } + + DWORD bytes_sent; + int ret = handle->pipe.conn.func_connectex(client_fd, + (const struct sockaddr*)&addr, + sizeof(struct sockaddr_un), + NULL, + 0, + &bytes_sent, + &req->u.io.overlapped); + if (!ret) { + err = WSAGetLastError(); + if (err != ERROR_IO_PENDING) { + closesocket(client_fd); + goto error; + } + } + + // Set flag indicates it is a unix domain socket; + handle->flags |= UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET; + req->u.connect.pipeHandle = pipeHandle; + req->u.connect.duplex_flags = UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; + + /* The req will be processed with IOCP. */ + handle->reqs_pending++; + REGISTER_HANDLE_REQ(loop, handle); + return 0; + } + + /* + * When matched with named pipe prefix, use named pipe as backend. + */ pipeHandle = open_named_pipe(handle->name, &duplex_flags); if (pipeHandle == INVALID_HANDLE_VALUE) { if (GetLastError() == ERROR_PIPE_BUSY) { @@ -1082,7 +1263,7 @@ void uv__pipe_close(uv_loop_t* loop, uv_pipe_t* handle) { for (i = 0; i < handle->pipe.serv.pending_instances; i++) { pipeHandle = handle->pipe.serv.accept_reqs[i].pipeHandle; if (pipeHandle != INVALID_HANDLE_VALUE) { - CloseHandle(pipeHandle); + uv__close_pipe_handle(handle, pipeHandle); handle->pipe.serv.accept_reqs[i].pipeHandle = INVALID_HANDLE_VALUE; } } @@ -1108,11 +1289,23 @@ static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, uv_pipe_accept_t* req, BOOL firstInstance) { assert(handle->flags & UV_HANDLE_LISTENING); - if (!firstInstance && !pipe_alloc_accept(loop, handle, req, FALSE)) { - SET_REQ_ERROR(req, GetLastError()); - uv__insert_pending_req(loop, (uv_req_t*) req); - handle->reqs_pending++; - return; + if (!firstInstance) { + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + int uds_err = 0; + if (!pipe_alloc_accept_unix_domain_socket(loop, handle, req, handle->pathname, &uds_err, FALSE)) { + SET_REQ_ERROR(req, uds_err); + uv__insert_pending_req(loop, (uv_req_t*) req); + handle->reqs_pending++; + return; + } + } else { + if (!pipe_alloc_accept_named_pipe(loop, handle, req, FALSE)) { + SET_REQ_ERROR(req, GetLastError()); + uv__insert_pending_req(loop, (uv_req_t*) req); + handle->reqs_pending++; + return; + } + } } assert(req->pipeHandle != INVALID_HANDLE_VALUE); @@ -1120,19 +1313,43 @@ static void uv__pipe_queue_accept(uv_loop_t* loop, uv_pipe_t* handle, /* Prepare the overlapped structure. */ memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped)); - if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) && - GetLastError() != ERROR_IO_PENDING) { - if (GetLastError() == ERROR_PIPE_CONNECTED) { - SET_REQ_SUCCESS(req); - } else { - CloseHandle(req->pipeHandle); - req->pipeHandle = INVALID_HANDLE_VALUE; - /* Make this req pending reporting an error. */ - SET_REQ_ERROR(req, GetLastError()); + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + DWORD bytes_received; + CHAR accept_buf[2 * (sizeof(SOCKADDR_STORAGE) + 16)]; + if (!handle->pipe.serv.func_acceptex((SOCKET)handle->handle, + (SOCKET)req->pipeHandle, + accept_buf, + 0, + sizeof(SOCKADDR_STORAGE) + 16, + sizeof(SOCKADDR_STORAGE) + 16, + &bytes_received, + &req->u.io.overlapped)) { + int wsa_err = WSAGetLastError(); + if (wsa_err != ERROR_IO_PENDING) { + closesocket((SOCKET) req->pipeHandle); + req->pipeHandle = INVALID_HANDLE_VALUE; + + SET_REQ_ERROR(req, wsa_err); + uv__insert_pending_req(loop, (uv_req_t*) req); + handle->reqs_pending++; + return; + } + } + } else { + if (!ConnectNamedPipe(req->pipeHandle, &req->u.io.overlapped) && + GetLastError() != ERROR_IO_PENDING) { + if (GetLastError() == ERROR_PIPE_CONNECTED) { + SET_REQ_SUCCESS(req); + } else { + CloseHandle(req->pipeHandle); + req->pipeHandle = INVALID_HANDLE_VALUE; + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + } + uv__insert_pending_req(loop, (uv_req_t*) req); + handle->reqs_pending++; + return; } - uv__insert_pending_req(loop, (uv_req_t*) req); - handle->reqs_pending++; - return; } /* Wait for completion via IOCP */ @@ -1184,12 +1401,32 @@ int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) { pipe_client->handle = req->pipeHandle; pipe_client->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE; + /* A unix domain socket server */ + if (server->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + /* Associate it with the I/O completion port. Use uv_handle_t pointer as + * completion key. */ + if (CreateIoCompletionPort(req->pipeHandle, + pipe_client->loop->iocp, + (ULONG_PTR)req->pipeHandle, + 0) == NULL) { + return GetLastError(); + } + + /* AcceptEx() implicitly binds the accepted socket. */ + pipe_client->flags |= UV_HANDLE_BOUND; + pipe_client->flags |= UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET; + } + /* Prepare the req to pick up a new connection */ server->pipe.serv.pending_accepts = req->next_pending; req->next_pending = NULL; req->pipeHandle = INVALID_HANDLE_VALUE; - server->handle = INVALID_HANDLE_VALUE; + if (!(server->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET)) { + /* Unix domain socket doesn't transfer to client ownership, so do not reset here.*/ + server->handle = INVALID_HANDLE_VALUE; + } + if (!(server->flags & UV_HANDLE_CLOSING)) { uv__pipe_queue_accept(loop, server, req, FALSE); } @@ -1224,6 +1461,20 @@ int uv__pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { return WSAEINVAL; } + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + /* Load function AcceptEx */ + if (!handle->pipe.serv.func_acceptex) { + if (!uv__get_acceptex_function((SOCKET)handle->handle, &handle->pipe.serv.func_acceptex)) { + return WSAEAFNOSUPPORT; + } + } + + int err = listen((SOCKET) handle->handle, backlog); + if (err) { + return err; + } + } + handle->flags |= UV_HANDLE_LISTENING; INCREASE_ACTIVE_COUNT(loop, handle); handle->stream.serv.connection_cb = cb; @@ -2236,6 +2487,15 @@ void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, return; } + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + /* If it is unix domain handle, the event comes from AcceptEx IOCP. */ + setsockopt((SOCKET)req->pipeHandle, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&handle->handle, + sizeof(handle->handle)); + } + if (REQ_SUCCESS(req)) { assert(req->pipeHandle != INVALID_HANDLE_VALUE); req->next_pending = handle->pipe.serv.pending_accepts; @@ -2246,7 +2506,7 @@ void uv__process_pipe_accept_req(uv_loop_t* loop, uv_pipe_t* handle, } } else { if (req->pipeHandle != INVALID_HANDLE_VALUE) { - CloseHandle(req->pipeHandle); + uv__close_pipe_handle(handle, req->pipeHandle); req->pipeHandle = INVALID_HANDLE_VALUE; } if (!(handle->flags & UV_HANDLE_CLOSING)) { @@ -2266,6 +2526,15 @@ void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, assert(handle->type == UV_NAMED_PIPE); + if (handle->flags & UV_HANDLE_WIN_UNIX_DOMAIN_SOCKET) { + /* If it is unix domain handle, the event comes from ConnectEx IOCP. */ + setsockopt((SOCKET)req->u.connect.pipeHandle, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + NULL, + 0); + } + UNREGISTER_HANDLE_REQ(loop, handle); err = 0; @@ -2277,7 +2546,7 @@ void uv__process_pipe_connect_req(uv_loop_t* loop, uv_pipe_t* handle, else err = uv__set_pipe_handle(loop, handle, pipeHandle, -1, duplex_flags); if (err) - CloseHandle(pipeHandle); + uv__close_pipe_handle(handle, pipeHandle); } else { err = uv_translate_sys_error(GET_REQ_ERROR(req)); }