stream: add uv_pipe and uv_socketpair to the API

Equivalents of `pipe` and `socketpair` for cross-platform use.

PR-URL: https://github.com/libuv/libuv/pull/2953
Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
This commit is contained in:
Jameson Nash 2020-11-09 21:50:09 -05:00 committed by GitHub
parent cbcd0cfc82
commit 4ddc292774
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 822 additions and 339 deletions

View File

@ -118,3 +118,21 @@ API
function is blocking.
.. versionadded:: 1.16.0
.. c:function:: int uv_pipe(uv_file fds[2], int read_flags, int write_flags)
Create a pair of connected pipe handles.
Data may be written to `fds[1]` and read from `fds[0]`.
The resulting handles can be passed to `uv_pipe_open`, used with `uv_spawn`,
or for any other purpose.
Valid values for `flags` are:
- UV_NONBLOCK_PIPE: Opens the specified socket handle for `OVERLAPPED`
or `FIONBIO`/`O_NONBLOCK` I/O usage.
This is recommended for handles that will be used by libuv,
and not usually recommended otherwise.
Equivalent to :man:`pipe(2)` with the `O_CLOEXEC` flag set.
.. versionadded:: 1.x.0

View File

@ -119,12 +119,14 @@ Data types
* flags may be specified to create a duplex data stream.
*/
UV_READABLE_PIPE = 0x10,
UV_WRITABLE_PIPE = 0x20
UV_WRITABLE_PIPE = 0x20,
/*
* Open the child pipe handle in overlapped mode on Windows.
* On Unix it is silently ignored.
*/
UV_OVERLAPPED_PIPE = 0x40
* When UV_CREATE_PIPE is specified, specifying UV_NONBLOCK_PIPE opens the
* handle in non-blocking mode in the child. This may cause loss of data,
* if the child is not designed to handle to encounter this mode,
* but can also be significantly more efficient.
*/
UV_NONBLOCK_PIPE = 0x40
} uv_stdio_flags;

View File

@ -127,3 +127,20 @@ API
:c:func:`uv_tcp_close_reset` calls is not allowed.
.. versionadded:: 1.32.0
.. c:function:: int uv_socketpair(int type, int protocol, uv_os_sock_t socket_vector[2], int flags0, int flags1)
Create a pair of connected sockets with the specified properties.
The resulting handles can be passed to `uv_tcp_open`, used with `uv_spawn`,
or for any other purpose.
Valid values for `flags0` and `flags1` are:
- UV_NONBLOCK_PIPE: Opens the specified socket handle for `OVERLAPPED`
or `FIONBIO`/`O_NONBLOCK` I/O usage.
This is recommended for handles that will be used by libuv,
and not usually recommended otherwise.
Equivalent to :man:`socketpair(2)` with a domain of AF_UNIX.
.. versionadded:: 1.x.0

View File

@ -475,6 +475,12 @@ UV_EXTERN int uv_fileno(const uv_handle_t* handle, uv_os_fd_t* fd);
UV_EXTERN uv_buf_t uv_buf_init(char* base, unsigned int len);
UV_EXTERN int uv_pipe(uv_file fds[2], int read_flags, int write_flags);
UV_EXTERN int uv_socketpair(int type,
int protocol,
uv_os_sock_t socket_vector[2],
int flags0,
int flags1);
#define UV_STREAM_FIELDS \
/* number of bytes queued for writing */ \
@ -933,10 +939,13 @@ typedef enum {
UV_WRITABLE_PIPE = 0x20,
/*
* Open the child pipe handle in overlapped mode on Windows.
* On Unix it is silently ignored.
* When UV_CREATE_PIPE is specified, specifying UV_NONBLOCK_PIPE opens the
* handle in non-blocking mode in the child. This may cause loss of data,
* if the child is not designed to handle to encounter this mode,
* but can also be significantly more efficient.
*/
UV_OVERLAPPED_PIPE = 0x40
UV_NONBLOCK_PIPE = 0x40,
UV_OVERLAPPED_PIPE = 0x40 /* old name, for compatibility */
} uv_stdio_flags;
typedef struct uv_stdio_container_s {

View File

@ -214,7 +214,7 @@ static int uv__async_start(uv_loop_t* loop) {
pipefd[0] = err;
pipefd[1] = -1;
#else
err = uv__make_pipe(pipefd, UV__F_NONBLOCK);
err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
if (err < 0)
return err;
#endif

View File

@ -282,12 +282,6 @@ int uv___stream_fd(const uv_stream_t* handle);
#define uv__stream_fd(handle) ((handle)->io_watcher.fd)
#endif /* defined(__APPLE__) */
#ifdef O_NONBLOCK
# define UV__F_NONBLOCK O_NONBLOCK
#else
# define UV__F_NONBLOCK 1
#endif
int uv__make_pipe(int fds[2], int flags);
#if defined(__APPLE__)

View File

@ -379,3 +379,57 @@ int uv_pipe_chmod(uv_pipe_t* handle, int mode) {
return r != -1 ? 0 : UV__ERR(errno);
}
int uv_pipe(uv_os_fd_t fds[2], int read_flags, int write_flags) {
uv_os_fd_t temp[2];
int err;
#if defined(__FreeBSD__) || defined(__linux__)
int flags = O_CLOEXEC;
if ((read_flags & UV_NONBLOCK_PIPE) && (write_flags & UV_NONBLOCK_PIPE))
flags |= UV_FS_O_NONBLOCK;
if (pipe2(temp, flags))
return UV__ERR(errno);
if (flags & UV_FS_O_NONBLOCK) {
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
}
#else
if (pipe(temp))
return UV__ERR(errno);
if ((err = uv__cloexec(temp[0], 1)))
goto fail;
if ((err = uv__cloexec(temp[1], 1)))
goto fail;
#endif
if (read_flags & UV_NONBLOCK_PIPE)
if ((err = uv__nonblock(temp[0], 1)))
goto fail;
if (write_flags & UV_NONBLOCK_PIPE)
if ((err = uv__nonblock(temp[1], 1)))
goto fail;
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
fail:
uv__close(temp[0]);
uv__close(temp[1]);
return err;
}
int uv__make_pipe(int fds[2], int flags) {
return uv_pipe(fds,
flags & UV_NONBLOCK_PIPE,
flags & UV_NONBLOCK_PIPE);
}

View File

@ -111,68 +111,6 @@ static void uv__chld(uv_signal_t* handle, int signum) {
assert(QUEUE_EMPTY(&pending));
}
static int uv__make_socketpair(int fds[2]) {
#if defined(__FreeBSD__) || defined(__linux__)
if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fds))
return UV__ERR(errno);
return 0;
#else
int err;
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds))
return UV__ERR(errno);
err = uv__cloexec(fds[0], 1);
if (err == 0)
err = uv__cloexec(fds[1], 1);
if (err != 0) {
uv__close(fds[0]);
uv__close(fds[1]);
return UV__ERR(errno);
}
return 0;
#endif
}
int uv__make_pipe(int fds[2], int flags) {
#if defined(__FreeBSD__) || defined(__linux__)
if (pipe2(fds, flags | O_CLOEXEC))
return UV__ERR(errno);
return 0;
#else
if (pipe(fds))
return UV__ERR(errno);
if (uv__cloexec(fds[0], 1))
goto fail;
if (uv__cloexec(fds[1], 1))
goto fail;
if (flags & UV__F_NONBLOCK) {
if (uv__nonblock(fds[0], 1))
goto fail;
if (uv__nonblock(fds[1], 1))
goto fail;
}
return 0;
fail:
uv__close(fds[0]);
uv__close(fds[1]);
return UV__ERR(errno);
#endif
}
/*
* Used for initializing stdio streams like options.stdin_stream. Returns
* zero on success. See also the cleanup section in uv_spawn().
@ -192,7 +130,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
if (container->data.stream->type != UV_NAMED_PIPE)
return UV_EINVAL;
else
return uv__make_socketpair(fds);
return uv_socketpair(SOCK_STREAM, 0, fds, 0, 0);
case UV_INHERIT_FD:
case UV_INHERIT_STREAM:

View File

@ -265,7 +265,7 @@ static int uv__signal_loop_once_init(uv_loop_t* loop) {
if (loop->signal_pipefd[0] != -1)
return 0;
err = uv__make_pipe(loop->signal_pipefd, UV__F_NONBLOCK);
err = uv__make_pipe(loop->signal_pipefd, UV_NONBLOCK_PIPE);
if (err)
return err;

View File

@ -462,3 +462,49 @@ int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
void uv__tcp_close(uv_tcp_t* handle) {
uv__stream_close((uv_stream_t*)handle);
}
int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
uv_os_sock_t temp[2];
int err;
#if defined(__FreeBSD__) || defined(__linux__)
int flags;
flags = type | SOCK_CLOEXEC;
if ((flags0 & UV_NONBLOCK_PIPE) && (flags1 & UV_NONBLOCK_PIPE))
flags |= SOCK_NONBLOCK;
if (socketpair(AF_UNIX, flags, protocol, temp))
return UV__ERR(errno);
if (flags & UV_FS_O_NONBLOCK) {
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
}
#else
if (socketpair(AF_UNIX, type, protocol, temp))
return UV__ERR(errno);
if ((err = uv__cloexec(temp[0], 1)))
goto fail;
if ((err = uv__cloexec(temp[1], 1)))
goto fail;
#endif
if (flags0 & UV_NONBLOCK_PIPE)
if ((err = uv__nonblock(temp[0], 1)))
goto fail;
if (flags1 & UV_NONBLOCK_PIPE)
if ((err = uv__nonblock(temp[1], 1)))
goto fail;
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
fail:
uv__close(temp[0]);
uv__close(temp[1]);
return err;
}

View File

@ -115,8 +115,8 @@ void uv_udp_endgame(uv_loop_t* loop, uv_udp_t* handle);
/*
* Pipes
*/
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize);
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags);
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client);

View File

@ -202,17 +202,17 @@ static void close_pipe(uv_pipe_t* pipe) {
}
int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
char* name, size_t nameSize) {
static int uv__pipe_server(
HANDLE* pipeHandle_ptr, DWORD access,
char* name, size_t nameSize, char* random) {
HANDLE pipeHandle;
int err;
char* ptr = (char*)handle;
for (;;) {
uv_unique_pipe_name(ptr, name, nameSize);
uv_unique_pipe_name(random, name, nameSize);
pipeHandle = CreateNamedPipeA(name,
access | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
access | FILE_FLAG_FIRST_PIPE_INSTANCE,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT, 1, 65536, 65536, 0,
NULL);
@ -226,20 +226,11 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
goto error;
}
/* Pipe name collision. Increment the pointer and try again. */
ptr++;
/* Pipe name collision. Increment the random number and try again. */
random++;
}
if (CreateIoCompletionPort(pipeHandle,
loop->iocp,
(ULONG_PTR)handle,
0) == NULL) {
err = GetLastError();
goto error;
}
uv_pipe_connection_init(handle);
handle->handle = pipeHandle;
*pipeHandle_ptr = pipeHandle;
return 0;
@ -251,6 +242,214 @@ int uv_stdio_pipe_server(uv_loop_t* loop, uv_pipe_t* handle, DWORD access,
}
static int uv__create_pipe_pair(
HANDLE* server_pipe_ptr, HANDLE* client_pipe_ptr,
unsigned int server_flags, unsigned int client_flags,
int inherit_client, char* random) {
/* allowed flags are: UV_READABLE_PIPE | UV_WRITABLE_PIPE | UV_NONBLOCK_PIPE */
char pipe_name[64];
SECURITY_ATTRIBUTES sa;
DWORD server_access;
DWORD client_access;
HANDLE server_pipe;
HANDLE client_pipe;
int err;
server_pipe = INVALID_HANDLE_VALUE;
client_pipe = INVALID_HANDLE_VALUE;
server_access = 0;
if (server_flags & UV_READABLE_PIPE)
server_access |= PIPE_ACCESS_INBOUND;
if (server_flags & UV_WRITABLE_PIPE)
server_access |= PIPE_ACCESS_OUTBOUND;
if (server_flags & UV_NONBLOCK_PIPE)
server_access |= FILE_FLAG_OVERLAPPED;
server_access |= WRITE_DAC;
client_access = 0;
if (client_flags & UV_READABLE_PIPE)
client_access |= GENERIC_READ;
else
client_access |= FILE_READ_ATTRIBUTES;
if (client_flags & UV_WRITABLE_PIPE)
client_access |= GENERIC_WRITE;
else
client_access |= FILE_WRITE_ATTRIBUTES;
client_access |= WRITE_DAC;
/* Create server pipe handle. */
err = uv__pipe_server(&server_pipe,
server_access,
pipe_name,
sizeof(pipe_name),
random);
if (err)
goto error;
/* Create client pipe handle. */
sa.nLength = sizeof sa;
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = inherit_client;
client_pipe = CreateFileA(pipe_name,
client_access,
0,
&sa,
OPEN_EXISTING,
(client_flags & UV_NONBLOCK_PIPE) ? FILE_FLAG_OVERLAPPED : 0,
NULL);
if (client_pipe == INVALID_HANDLE_VALUE) {
err = GetLastError();
goto error;
}
#ifndef NDEBUG
/* Validate that the pipe was opened in the right mode. */
{
DWORD mode;
BOOL r;
r = GetNamedPipeHandleState(client_pipe, &mode, NULL, NULL, NULL, NULL, 0);
if (r == TRUE) {
assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
} else {
fprintf(stderr, "libuv assertion failure: GetNamedPipeHandleState failed\n");
}
}
#endif
/* Do a blocking ConnectNamedPipe. This should not block because we have
* both ends of the pipe created. */
if (!ConnectNamedPipe(server_pipe, NULL)) {
if (GetLastError() != ERROR_PIPE_CONNECTED) {
err = GetLastError();
goto error;
}
}
*client_pipe_ptr = client_pipe;
*server_pipe_ptr = server_pipe;
return 0;
error:
if (server_pipe != INVALID_HANDLE_VALUE)
CloseHandle(server_pipe);
if (client_pipe != INVALID_HANDLE_VALUE)
CloseHandle(client_pipe);
return err;
}
int uv_pipe(uv_file fds[2], int read_flags, int write_flags) {
uv_file temp[2];
int err;
HANDLE readh;
HANDLE writeh;
/* Make the server side the inbound (read) end, */
/* so that both ends will have FILE_READ_ATTRIBUTES permission. */
/* TODO: better source of local randomness than &fds? */
read_flags |= UV_READABLE_PIPE;
write_flags |= UV_WRITABLE_PIPE;
err = uv__create_pipe_pair(&readh, &writeh, read_flags, write_flags, 0, (char*) &fds[0]);
if (err != 0)
return err;
temp[0] = _open_osfhandle((intptr_t) readh, 0);
if (temp[0] == -1) {
if (errno == UV_EMFILE)
err = UV_EMFILE;
else
err = UV_UNKNOWN;
CloseHandle(readh);
CloseHandle(writeh);
return err;
}
temp[1] = _open_osfhandle((intptr_t) writeh, 0);
if (temp[1] == -1) {
if (errno == UV_EMFILE)
err = UV_EMFILE;
else
err = UV_UNKNOWN;
_close(temp[0]);
CloseHandle(writeh);
return err;
}
fds[0] = temp[0];
fds[1] = temp[1];
return 0;
}
int uv__create_stdio_pipe_pair(uv_loop_t* loop,
uv_pipe_t* parent_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
/* The parent_pipe is always the server_pipe and kept by libuv.
* The child_pipe is always the client_pipe and is passed to the child.
* The flags are specified with respect to their usage in the child. */
HANDLE server_pipe;
HANDLE client_pipe;
unsigned int server_flags;
unsigned int client_flags;
int err;
server_pipe = INVALID_HANDLE_VALUE;
client_pipe = INVALID_HANDLE_VALUE;
server_flags = 0;
client_flags = 0;
if (flags & UV_READABLE_PIPE) {
/* The server needs inbound (read) access too, otherwise CreateNamedPipe()
* won't give us the FILE_READ_ATTRIBUTES permission. We need that to probe
* the state of the write buffer when we're trying to shutdown the pipe. */
server_flags |= UV_READABLE_PIPE | UV_WRITABLE_PIPE;
client_flags |= UV_READABLE_PIPE;
}
if (flags & UV_WRITABLE_PIPE) {
server_flags |= UV_READABLE_PIPE;
client_flags |= UV_WRITABLE_PIPE;
}
server_flags |= UV_NONBLOCK_PIPE;
if (flags & UV_NONBLOCK_PIPE || parent_pipe->ipc) {
client_flags |= UV_NONBLOCK_PIPE;
}
err = uv__create_pipe_pair(&server_pipe, &client_pipe,
server_flags, client_flags, 1, (char*) server_pipe);
if (err)
goto error;
if (CreateIoCompletionPort(server_pipe,
loop->iocp,
(ULONG_PTR) parent_pipe,
0) == NULL) {
err = GetLastError();
goto error;
}
uv_pipe_connection_init(parent_pipe);
parent_pipe->handle = server_pipe;
*child_pipe_ptr = client_pipe;
/* The server end is now readable and/or writable. */
if (flags & UV_READABLE_PIPE)
parent_pipe->flags |= UV_HANDLE_WRITABLE;
if (flags & UV_WRITABLE_PIPE)
parent_pipe->flags |= UV_HANDLE_READABLE;
return 0;
error:
if (server_pipe != INVALID_HANDLE_VALUE)
CloseHandle(server_pipe);
if (client_pipe != INVALID_HANDLE_VALUE)
CloseHandle(client_pipe);
return err;
}
static int uv_set_pipe_handle(uv_loop_t* loop,
uv_pipe_t* handle,
HANDLE pipeHandle,
@ -712,9 +911,8 @@ error:
handle->name = NULL;
}
if (pipeHandle != INVALID_HANDLE_VALUE) {
if (pipeHandle != INVALID_HANDLE_VALUE)
CloseHandle(pipeHandle);
}
/* Make this req pending reporting an error. */
SET_REQ_ERROR(req, err);

View File

@ -95,102 +95,6 @@ void uv_disable_stdio_inheritance(void) {
}
static int uv__create_stdio_pipe_pair(uv_loop_t* loop,
uv_pipe_t* server_pipe, HANDLE* child_pipe_ptr, unsigned int flags) {
char pipe_name[64];
SECURITY_ATTRIBUTES sa;
DWORD server_access = 0;
DWORD client_access = 0;
HANDLE child_pipe = INVALID_HANDLE_VALUE;
int err;
int overlap;
if (flags & UV_READABLE_PIPE) {
/* The server needs inbound access too, otherwise CreateNamedPipe() won't
* give us the FILE_READ_ATTRIBUTES permission. We need that to probe the
* state of the write buffer when we're trying to shutdown the pipe. */
server_access |= PIPE_ACCESS_OUTBOUND | PIPE_ACCESS_INBOUND;
client_access |= GENERIC_READ | FILE_WRITE_ATTRIBUTES;
}
if (flags & UV_WRITABLE_PIPE) {
server_access |= PIPE_ACCESS_INBOUND;
client_access |= GENERIC_WRITE | FILE_READ_ATTRIBUTES;
}
/* Create server pipe handle. */
err = uv_stdio_pipe_server(loop,
server_pipe,
server_access,
pipe_name,
sizeof(pipe_name));
if (err)
goto error;
/* Create child pipe handle. */
sa.nLength = sizeof sa;
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = TRUE;
overlap = server_pipe->ipc || (flags & UV_OVERLAPPED_PIPE);
child_pipe = CreateFileA(pipe_name,
client_access,
0,
&sa,
OPEN_EXISTING,
overlap ? FILE_FLAG_OVERLAPPED : 0,
NULL);
if (child_pipe == INVALID_HANDLE_VALUE) {
err = GetLastError();
goto error;
}
#ifndef NDEBUG
/* Validate that the pipe was opened in the right mode. */
{
DWORD mode;
BOOL r = GetNamedPipeHandleState(child_pipe,
&mode,
NULL,
NULL,
NULL,
NULL,
0);
assert(r == TRUE);
assert(mode == (PIPE_READMODE_BYTE | PIPE_WAIT));
}
#endif
/* Do a blocking ConnectNamedPipe. This should not block because we have both
* ends of the pipe created. */
if (!ConnectNamedPipe(server_pipe->handle, NULL)) {
if (GetLastError() != ERROR_PIPE_CONNECTED) {
err = GetLastError();
goto error;
}
}
/* The server end is now readable and/or writable. */
if (flags & UV_READABLE_PIPE)
server_pipe->flags |= UV_HANDLE_WRITABLE;
if (flags & UV_WRITABLE_PIPE)
server_pipe->flags |= UV_HANDLE_READABLE;
*child_pipe_ptr = child_pipe;
return 0;
error:
if (server_pipe->handle != INVALID_HANDLE_VALUE) {
uv_pipe_cleanup(loop, server_pipe);
}
if (child_pipe != INVALID_HANDLE_VALUE) {
CloseHandle(child_pipe);
}
return err;
}
static int uv__duplicate_handle(uv_loop_t* loop, HANDLE handle, HANDLE* dup) {
HANDLE current_process;

View File

@ -1587,3 +1587,118 @@ int uv__tcp_connect(uv_connect_t* req,
return 0;
}
#ifndef WSA_FLAG_NO_HANDLE_INHERIT
/* Added in Windows 7 SP1. Specify this to avoid race conditions, */
/* but also manually clear the inherit flag in case this failed. */
#define WSA_FLAG_NO_HANDLE_INHERIT 0x80
#endif
int uv_socketpair(int type, int protocol, uv_os_sock_t fds[2], int flags0, int flags1) {
SOCKET server = INVALID_SOCKET;
SOCKET client0 = INVALID_SOCKET;
SOCKET client1 = INVALID_SOCKET;
SOCKADDR_IN name;
LPFN_ACCEPTEX func_acceptex;
WSAOVERLAPPED overlap;
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
int namelen;
int err;
DWORD bytes;
DWORD flags;
DWORD client0_flags = WSA_FLAG_NO_HANDLE_INHERIT;
DWORD client1_flags = WSA_FLAG_NO_HANDLE_INHERIT;
if (flags0 & UV_NONBLOCK_PIPE)
client0_flags |= WSA_FLAG_OVERLAPPED;
if (flags1 & UV_NONBLOCK_PIPE)
client1_flags |= WSA_FLAG_OVERLAPPED;
server = WSASocketW(AF_INET, type, protocol, NULL, 0,
WSA_FLAG_OVERLAPPED | WSA_FLAG_NO_HANDLE_INHERIT);
if (server == INVALID_SOCKET)
goto wsaerror;
if (!SetHandleInformation((HANDLE) server, HANDLE_FLAG_INHERIT, 0))
goto error;
name.sin_family = AF_INET;
name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
name.sin_port = 0;
if (bind(server, (SOCKADDR*) &name, sizeof(name)) != 0)
goto wsaerror;
if (listen(server, 1) != 0)
goto wsaerror;
namelen = sizeof(name);
if (getsockname(server, (SOCKADDR*) &name, &namelen) != 0)
goto wsaerror;
client0 = WSASocketW(AF_INET, type, protocol, NULL, 0, client0_flags);
if (client0 == INVALID_SOCKET)
goto wsaerror;
if (!SetHandleInformation((HANDLE) client0, HANDLE_FLAG_INHERIT, 0))
goto error;
if (connect(client0, (SOCKADDR*) &name, sizeof(name)) != 0)
goto wsaerror;
client1 = WSASocketW(AF_INET, type, protocol, NULL, 0, client1_flags);
if (client1 == INVALID_SOCKET)
goto wsaerror;
if (!SetHandleInformation((HANDLE) client1, HANDLE_FLAG_INHERIT, 0))
goto error;
if (!uv_get_acceptex_function(server, &func_acceptex)) {
err = WSAEAFNOSUPPORT;
goto cleanup;
}
memset(&overlap, 0, sizeof(overlap));
if (!func_acceptex(server,
client1,
accept_buffer,
0,
sizeof(struct sockaddr_storage),
sizeof(struct sockaddr_storage),
&bytes,
&overlap)) {
err = WSAGetLastError();
if (err == ERROR_IO_PENDING) {
/* Result should complete immediately, since we already called connect,
* but emperically, we sometimes have to poll the kernel a couple times
* until it notices that. */
while (!WSAGetOverlappedResult(client1, &overlap, &bytes, FALSE, &flags)) {
err = WSAGetLastError();
if (err != WSA_IO_INCOMPLETE)
goto cleanup;
SwitchToThread();
}
}
else {
goto cleanup;
}
}
if (setsockopt(client1, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char*) &server, sizeof(server)) != 0) {
goto wsaerror;
}
closesocket(server);
fds[0] = client0;
fds[1] = client1;
return 0;
wsaerror:
err = WSAGetLastError();
goto cleanup;
error:
err = GetLastError();
goto cleanup;
cleanup:
if (server != INVALID_SOCKET)
closesocket(server);
if (client0 != INVALID_SOCKET)
closesocket(client0);
if (client1 != INVALID_SOCKET)
closesocket(client1);
assert(err);
return uv_translate_sys_error(err);
}

View File

@ -50,6 +50,10 @@ int spawn_tcp_server_helper(void);
static int maybe_run_test(int argc, char **argv);
#ifdef _WIN32
typedef BOOL (WINAPI *sCompareObjectHandles)(_In_ HANDLE, _In_ HANDLE);
#endif
int main(int argc, char **argv) {
#ifndef _WIN32
@ -202,22 +206,36 @@ static int maybe_run_test(int argc, char **argv) {
return 1;
}
#ifndef _WIN32
if (strcmp(argv[1], "spawn_helper8") == 0) {
int fd;
uv_os_fd_t closed_fd;
uv_os_fd_t open_fd;
#ifdef _WIN32
DWORD flags;
HMODULE kernelbase_module;
sCompareObjectHandles pCompareObjectHandles; /* function introduced in Windows 10 */
#endif
notify_parent_process();
ASSERT(sizeof(fd) == read(0, &fd, sizeof(fd)));
ASSERT(fd > 2);
ASSERT(sizeof(closed_fd) == read(0, &closed_fd, sizeof(closed_fd)));
ASSERT(sizeof(open_fd) == read(0, &open_fd, sizeof(open_fd)));
#ifdef _WIN32
ASSERT((intptr_t) closed_fd > 0);
ASSERT((intptr_t) open_fd > 0);
ASSERT(0 != GetHandleInformation(open_fd, &flags));
kernelbase_module = GetModuleHandleA("kernelbase.dll");
pCompareObjectHandles = (sCompareObjectHandles)
GetProcAddress(kernelbase_module, "CompareObjectHandles");
ASSERT(pCompareObjectHandles == NULL || !pCompareObjectHandles(open_fd, closed_fd));
#else
ASSERT(open_fd > 2);
ASSERT(closed_fd > 2);
# if defined(__PASE__) /* On IBMi PASE, write() returns 1 */
ASSERT(1 == write(fd, "x", 1));
ASSERT(1 == write(closed_fd, "x", 1));
# else
ASSERT(-1 == write(fd, "x", 1));
ASSERT(-1 == write(closed_fd, "x", 1));
# endif /* !__PASE__ */
#endif
return 1;
}
#endif /* !_WIN32 */
if (strcmp(argv[1], "spawn_helper9") == 0) {
notify_parent_process();

View File

@ -19,12 +19,11 @@
* IN THE SOFTWARE.
*/
#if !defined(_WIN32)
#include "uv.h"
#include "task.h"
#include <fcntl.h>
#ifndef _WIN32
#include <unistd.h>
#endif
static unsigned int read_cb_called;
@ -51,14 +50,25 @@ static void read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
TEST_IMPL(close_fd) {
uv_pipe_t pipe_handle;
int fd[2];
uv_fs_t req;
uv_buf_t bufs[1];
uv_file fd[2];
bufs[0] = uv_buf_init("", 1);
ASSERT(0 == pipe(fd));
ASSERT(0 == uv_pipe(fd, 0, 0));
ASSERT(0 == uv_pipe_init(uv_default_loop(), &pipe_handle, 0));
ASSERT(0 == uv_pipe_open(&pipe_handle, fd[0]));
fd[0] = -1; /* uv_pipe_open() takes ownership of the file descriptor. */
ASSERT(1 == write(fd[1], "", 1));
/* uv_pipe_open() takes ownership of the file descriptor. */
fd[0] = -1;
ASSERT(1 == uv_fs_write(NULL, &req, fd[1], bufs, 1, -1, NULL));
ASSERT(1 == req.result);
uv_fs_req_cleanup(&req);
#ifdef _WIN32
ASSERT(0 == _close(fd[1]));
#else
ASSERT(0 == close(fd[1]));
#endif
fd[1] = -1;
ASSERT(0 == uv_read_start((uv_stream_t *) &pipe_handle, alloc_cb, read_cb));
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
@ -72,9 +82,3 @@ TEST_IMPL(close_fd) {
MAKE_VALGRIND_HAPPY();
return 0;
}
#else
typedef int file_has_no_tests; /* ISO C forbids an empty translation unit. */
#endif /* !_WIN32 */

View File

@ -458,6 +458,9 @@ TEST_DECLARE (ip6_addr_link_local)
TEST_DECLARE (poll_close_doesnt_corrupt_stack)
TEST_DECLARE (poll_closesocket)
TEST_DECLARE (close_fd)
TEST_DECLARE (closed_fd_events)
TEST_DECLARE (spawn_fs_open)
#ifdef _WIN32
TEST_DECLARE (spawn_detect_pipe_name_collisions_on_windows)
#if !defined(USING_UV_SHARED)
@ -472,8 +475,6 @@ TEST_DECLARE (ipc_listen_after_bind_twice)
TEST_DECLARE (win32_signum_number)
#else
TEST_DECLARE (emfile)
TEST_DECLARE (close_fd)
TEST_DECLARE (spawn_fs_open)
TEST_DECLARE (spawn_setuid_setgid)
TEST_DECLARE (we_get_signal)
TEST_DECLARE (we_get_signals)
@ -482,7 +483,6 @@ TEST_DECLARE (we_get_signals_mixed)
TEST_DECLARE (signal_multiple_loops)
TEST_DECLARE (signal_pending_on_close)
TEST_DECLARE (signal_close_loop_alive)
TEST_DECLARE (closed_fd_events)
#endif
#ifdef __APPLE__
TEST_DECLARE (osx_select)
@ -568,7 +568,8 @@ TASK_LIST_START
#ifndef _WIN32
TEST_ENTRY (pipe_close_stdout_read_stdin)
#endif
TEST_ENTRY (pipe_set_non_blocking)
/* Seems to be either about 0.5s or 5s, depending on the OS. */
TEST_ENTRY_CUSTOM (pipe_set_non_blocking, 0, 0, 20000)
TEST_ENTRY (pipe_set_chmod)
TEST_ENTRY (tty)
#ifdef _WIN32
@ -942,6 +943,9 @@ TASK_LIST_START
TEST_ENTRY (poll_close_doesnt_corrupt_stack)
TEST_ENTRY (poll_closesocket)
TEST_ENTRY (close_fd)
TEST_ENTRY (closed_fd_events)
TEST_ENTRY (spawn_fs_open)
#ifdef _WIN32
TEST_ENTRY (spawn_detect_pipe_name_collisions_on_windows)
#if !defined(USING_UV_SHARED)
@ -956,8 +960,6 @@ TASK_LIST_START
TEST_ENTRY (win32_signum_number)
#else
TEST_ENTRY (emfile)
TEST_ENTRY (close_fd)
TEST_ENTRY (spawn_fs_open)
TEST_ENTRY (spawn_setuid_setgid)
TEST_ENTRY (we_get_signal)
TEST_ENTRY (we_get_signals)
@ -966,7 +968,6 @@ TASK_LIST_START
TEST_ENTRY (signal_multiple_loops)
TEST_ENTRY (signal_pending_on_close)
TEST_ENTRY (signal_close_loop_alive)
TEST_ENTRY (closed_fd_events)
#endif
#ifdef __APPLE__

View File

@ -24,6 +24,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h> /* strlen */
static int completed_pingers = 0;
@ -33,23 +34,21 @@ static int completed_pingers = 0;
#define NUM_PINGS 1000
#endif
/* 64 bytes is enough for a pinger */
#define BUFSIZE 10240
static char PING[] = "PING\n";
static char PONG[] = "PONG\n";
static int pinger_on_connect_count;
typedef struct {
int vectored_writes;
int pongs;
int state;
unsigned pongs;
unsigned state;
union {
uv_tcp_t tcp;
uv_pipe_t pipe;
} stream;
uv_connect_t connect_req;
char read_buffer[BUFSIZE];
char* pong;
} pinger_t;
@ -59,28 +58,44 @@ static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
}
static void ponger_on_close(uv_handle_t* handle) {
if (handle->data)
free(handle->data);
else
free(handle);
}
static void pinger_on_close(uv_handle_t* handle) {
pinger_t* pinger = (pinger_t*)handle->data;
pinger_t* pinger = (pinger_t*) handle->data;
ASSERT(NUM_PINGS == pinger->pongs);
ASSERT_EQ(NUM_PINGS, pinger->pongs);
free(pinger);
if (handle == (uv_handle_t*) &pinger->stream.tcp) {
free(pinger); /* also frees handle */
} else {
uv_close((uv_handle_t*) &pinger->stream.tcp, ponger_on_close);
free(handle);
}
completed_pingers++;
}
static void pinger_after_write(uv_write_t* req, int status) {
ASSERT(status == 0);
ASSERT_EQ(status, 0);
free(req);
}
static void pinger_write_ping(pinger_t* pinger) {
uv_stream_t* stream;
uv_write_t* req;
uv_buf_t bufs[sizeof PING - 1];
int i, nbufs;
stream = (uv_stream_t*) &pinger->stream.tcp;
if (!pinger->vectored_writes) {
/* Write a single buffer. */
nbufs = 1;
@ -94,13 +109,8 @@ static void pinger_write_ping(pinger_t* pinger) {
}
req = malloc(sizeof(*req));
if (uv_write(req,
(uv_stream_t*) &pinger->stream.tcp,
bufs,
nbufs,
pinger_after_write)) {
FATAL("uv_write failed");
}
ASSERT_NOT_NULL(req);
ASSERT_EQ(0, uv_write(req, stream, bufs, nbufs, pinger_after_write));
puts("PING");
}
@ -115,20 +125,20 @@ static void pinger_read_cb(uv_stream_t* stream,
pinger = (pinger_t*) stream->data;
if (nread < 0) {
ASSERT(nread == UV_EOF);
ASSERT_EQ(nread, UV_EOF);
puts("got EOF");
free(buf->base);
uv_close((uv_handle_t*)(&pinger->stream.tcp), pinger_on_close);
uv_close((uv_handle_t*) stream, pinger_on_close);
return;
}
/* Now we count the pings */
/* Now we count the pongs */
for (i = 0; i < nread; i++) {
ASSERT(buf->base[i] == PING[pinger->state]);
pinger->state = (pinger->state + 1) % (sizeof(PING) - 1);
ASSERT_EQ(buf->base[i], pinger->pong[pinger->state]);
pinger->state = (pinger->state + 1) % strlen(pinger->pong);
if (pinger->state != 0)
continue;
@ -139,7 +149,7 @@ static void pinger_read_cb(uv_stream_t* stream,
if (pinger->pongs < NUM_PINGS) {
pinger_write_ping(pinger);
} else {
uv_close((uv_handle_t*)(&pinger->stream.tcp), pinger_on_close);
uv_close((uv_handle_t*) stream, pinger_on_close);
break;
}
}
@ -148,20 +158,53 @@ static void pinger_read_cb(uv_stream_t* stream,
}
static void ponger_read_cb(uv_stream_t* stream,
ssize_t nread,
const uv_buf_t* buf) {
uv_buf_t writebuf;
uv_write_t* req;
int i;
if (nread < 0) {
ASSERT_EQ(nread, UV_EOF);
puts("got EOF");
free(buf->base);
uv_close((uv_handle_t*) stream, ponger_on_close);
return;
}
/* Echo back */
for (i = 0; i < nread; i++) {
if (buf->base[i] == 'I')
buf->base[i] = 'O';
}
writebuf = uv_buf_init(buf->base, nread);
req = malloc(sizeof(*req));
ASSERT_NOT_NULL(req);
ASSERT_EQ(0, uv_write(req, stream, &writebuf, 1, pinger_after_write));
}
static void pinger_on_connect(uv_connect_t* req, int status) {
pinger_t* pinger = (pinger_t*)req->handle->data;
pinger_t* pinger = (pinger_t*) req->handle->data;
pinger_on_connect_count++;
ASSERT(status == 0);
ASSERT_EQ(status, 0);
ASSERT(1 == uv_is_readable(req->handle));
ASSERT(1 == uv_is_writable(req->handle));
ASSERT(0 == uv_is_closing((uv_handle_t *) req->handle));
ASSERT_EQ(1, uv_is_readable(req->handle));
ASSERT_EQ(1, uv_is_writable(req->handle));
ASSERT_EQ(0, uv_is_closing((uv_handle_t *) req->handle));
pinger_write_ping(pinger);
uv_read_start((uv_stream_t*)(req->handle), alloc_cb, pinger_read_cb);
ASSERT_EQ(0, uv_read_start((uv_stream_t*) req->handle,
alloc_cb,
pinger_read_cb));
}
@ -172,17 +215,18 @@ static void tcp_pinger_v6_new(int vectored_writes) {
pinger_t* pinger;
ASSERT(0 == uv_ip6_addr("::1", TEST_PORT, &server_addr));
ASSERT_EQ(0, uv_ip6_addr("::1", TEST_PORT, &server_addr));
pinger = malloc(sizeof(*pinger));
ASSERT(pinger != NULL);
ASSERT_NOT_NULL(pinger);
pinger->vectored_writes = vectored_writes;
pinger->state = 0;
pinger->pongs = 0;
pinger->pong = PING;
/* Try to connect to the server and do NUM_PINGS ping-pongs. */
r = uv_tcp_init(uv_default_loop(), &pinger->stream.tcp);
pinger->stream.tcp.data = pinger;
ASSERT(!r);
ASSERT_EQ(0, r);
/* We are never doing multiple reads/connects at a time anyway, so these
* handles can be pre-initialized. */
@ -190,10 +234,10 @@ static void tcp_pinger_v6_new(int vectored_writes) {
&pinger->stream.tcp,
(const struct sockaddr*) &server_addr,
pinger_on_connect);
ASSERT(!r);
ASSERT_EQ(0, r);
/* Synchronous connect callbacks are not allowed. */
ASSERT(pinger_on_connect_count == 0);
ASSERT_EQ(pinger_on_connect_count, 0);
}
@ -202,17 +246,18 @@ static void tcp_pinger_new(int vectored_writes) {
struct sockaddr_in server_addr;
pinger_t* pinger;
ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &server_addr));
pinger = malloc(sizeof(*pinger));
ASSERT(pinger != NULL);
ASSERT_NOT_NULL(pinger);
pinger->vectored_writes = vectored_writes;
pinger->state = 0;
pinger->pongs = 0;
pinger->pong = PING;
/* Try to connect to the server and do NUM_PINGS ping-pongs. */
r = uv_tcp_init(uv_default_loop(), &pinger->stream.tcp);
pinger->stream.tcp.data = pinger;
ASSERT(!r);
ASSERT_EQ(0, r);
/* We are never doing multiple reads/connects at a time anyway, so these
* handles can be pre-initialized. */
@ -220,10 +265,10 @@ static void tcp_pinger_new(int vectored_writes) {
&pinger->stream.tcp,
(const struct sockaddr*) &server_addr,
pinger_on_connect);
ASSERT(!r);
ASSERT_EQ(0, r);
/* Synchronous connect callbacks are not allowed. */
ASSERT(pinger_on_connect_count == 0);
ASSERT_EQ(pinger_on_connect_count, 0);
}
@ -232,15 +277,16 @@ static void pipe_pinger_new(int vectored_writes) {
pinger_t* pinger;
pinger = malloc(sizeof(*pinger));
ASSERT(pinger != NULL);
ASSERT_NOT_NULL(pinger);
pinger->vectored_writes = vectored_writes;
pinger->state = 0;
pinger->pongs = 0;
pinger->pong = PING;
/* Try to connect to the server and do NUM_PINGS ping-pongs. */
r = uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0);
pinger->stream.pipe.data = pinger;
ASSERT(!r);
ASSERT_EQ(0, r);
/* We are never doing multiple reads/connects at a time anyway, so these
* handles can be pre-initialized. */
@ -248,13 +294,86 @@ static void pipe_pinger_new(int vectored_writes) {
pinger_on_connect);
/* Synchronous connect callbacks are not allowed. */
ASSERT(pinger_on_connect_count == 0);
ASSERT_EQ(pinger_on_connect_count, 0);
}
static void socketpair_pinger_new(int vectored_writes) {
pinger_t* pinger;
uv_os_sock_t fds[2];
uv_tcp_t* ponger;
pinger = malloc(sizeof(*pinger));
ASSERT_NOT_NULL(pinger);
pinger->vectored_writes = vectored_writes;
pinger->state = 0;
pinger->pongs = 0;
pinger->pong = PONG;
/* Try to make a socketpair and do NUM_PINGS ping-pongs. */
(void)uv_default_loop(); /* ensure WSAStartup has been performed */
ASSERT_EQ(0, uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE));
#ifndef _WIN32
/* On Windows, this is actually a UV_TCP, but libuv doesn't detect that. */
ASSERT_EQ(uv_guess_handle((uv_file) fds[0]), UV_NAMED_PIPE);
ASSERT_EQ(uv_guess_handle((uv_file) fds[1]), UV_NAMED_PIPE);
#endif
ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), &pinger->stream.tcp));
pinger->stream.pipe.data = pinger;
ASSERT_EQ(0, uv_tcp_open(&pinger->stream.tcp, fds[1]));
ponger = malloc(sizeof(*ponger));
ASSERT_NOT_NULL(ponger);
ponger->data = NULL;
ASSERT_EQ(0, uv_tcp_init(uv_default_loop(), ponger));
ASSERT_EQ(0, uv_tcp_open(ponger, fds[0]));
pinger_write_ping(pinger);
ASSERT_EQ(0, uv_read_start((uv_stream_t*) &pinger->stream.tcp,
alloc_cb,
pinger_read_cb));
ASSERT_EQ(0, uv_read_start((uv_stream_t*) ponger,
alloc_cb,
ponger_read_cb));
}
static void pipe2_pinger_new(int vectored_writes) {
uv_file fds[2];
pinger_t* pinger;
uv_pipe_t* ponger;
/* Try to make a pipe and do NUM_PINGS pings. */
ASSERT_EQ(0, uv_pipe(fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE));
ASSERT_EQ(uv_guess_handle(fds[0]), UV_NAMED_PIPE);
ASSERT_EQ(uv_guess_handle(fds[1]), UV_NAMED_PIPE);
ponger = malloc(sizeof(*ponger));
ASSERT_NOT_NULL(ponger);
ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), ponger, 0));
ASSERT_EQ(0, uv_pipe_open(ponger, fds[0]));
pinger = malloc(sizeof(*pinger));
ASSERT_NOT_NULL(pinger);
pinger->vectored_writes = vectored_writes;
pinger->state = 0;
pinger->pongs = 0;
pinger->pong = PING;
ASSERT_EQ(0, uv_pipe_init(uv_default_loop(), &pinger->stream.pipe, 0));
ASSERT_EQ(0, uv_pipe_open(&pinger->stream.pipe, fds[1]));
pinger->stream.pipe.data = pinger; /* record for close_cb */
ponger->data = pinger; /* record for read_cb */
pinger_write_ping(pinger);
ASSERT_EQ(0, uv_read_start((uv_stream_t*) ponger, alloc_cb, pinger_read_cb));
}
static int run_ping_pong_test(void) {
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
ASSERT(completed_pingers == 1);
ASSERT_EQ(completed_pingers, 1);
MAKE_VALGRIND_HAPPY();
return 0;
@ -263,12 +382,20 @@ static int run_ping_pong_test(void) {
TEST_IMPL(tcp_ping_pong) {
tcp_pinger_new(0);
run_ping_pong_test();
completed_pingers = 0;
socketpair_pinger_new(0);
return run_ping_pong_test();
}
TEST_IMPL(tcp_ping_pong_vec) {
tcp_pinger_new(1);
run_ping_pong_test();
completed_pingers = 0;
socketpair_pinger_new(1);
return run_ping_pong_test();
}
@ -291,11 +418,19 @@ TEST_IMPL(tcp6_ping_pong_vec) {
TEST_IMPL(pipe_ping_pong) {
pipe_pinger_new(0);
run_ping_pong_test();
completed_pingers = 0;
pipe2_pinger_new(0);
return run_ping_pong_test();
}
TEST_IMPL(pipe_ping_pong_vec) {
pipe_pinger_new(1);
run_ping_pong_test();
completed_pingers = 0;
pipe2_pinger_new(1);
return run_ping_pong_test();
}

View File

@ -16,18 +16,10 @@
#include "uv.h"
#include "task.h"
#ifdef _WIN32
TEST_IMPL(pipe_set_non_blocking) {
RETURN_SKIP("Test not implemented on Windows.");
}
#else /* !_WIN32 */
#include <string.h> /* memset */
#ifndef _WIN32
#include <unistd.h> /* close */
#include <sys/types.h>
#include <sys/socket.h>
#endif
struct thread_ctx {
uv_barrier_t barrier;
@ -54,9 +46,28 @@ static void thread_main(void* arg) {
uv_fs_req_cleanup(&req);
} while (n > 0 || (n == -1 && uv_errno == UV_EINTR));
#ifdef _WIN32
ASSERT(n == UV_EOF);
#else
ASSERT(n == 0);
#endif
}
#ifdef _WIN32
static void write_cb(uv_write_t* req, int status) {
ASSERT(status == 0);
req->handle = NULL; /* signal completion of write_cb */
}
#endif
#ifdef _WIN32
#define NWRITES (10 << 16)
#else
#define NWRITES (10 << 20)
#endif
TEST_IMPL(pipe_set_non_blocking) {
struct thread_ctx ctx;
uv_pipe_t pipe_handle;
@ -66,9 +77,12 @@ TEST_IMPL(pipe_set_non_blocking) {
uv_buf_t buf;
uv_file fd[2];
int n;
#ifdef _WIN32
uv_write_t write_req;
#endif
ASSERT(0 == uv_pipe_init(uv_default_loop(), &pipe_handle, 0));
ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, fd));
ASSERT(0 == uv_pipe(fd, 0, 0));
ASSERT(0 == uv_pipe_open(&pipe_handle, fd[1]));
ASSERT(0 == uv_stream_set_blocking((uv_stream_t*) &pipe_handle, 1));
fd[1] = -1; /* fd[1] is owned by pipe_handle now. */
@ -83,11 +97,20 @@ TEST_IMPL(pipe_set_non_blocking) {
memset(data, '.', sizeof(data));
nwritten = 0;
while (nwritten < 10 << 20) {
while (nwritten < NWRITES) {
/* The stream is in blocking mode so uv_try_write() should always succeed
* with the exact number of bytes that we wanted written.
*/
n = uv_try_write((uv_stream_t*) &pipe_handle, &buf, 1);
#ifdef _WIN32
ASSERT(n == UV_EAGAIN); /* E_NOTIMPL */
ASSERT(0 == uv_write(&write_req, (uv_stream_t*) &pipe_handle, &buf, 1, write_cb));
ASSERT(write_req.handle != NULL);
ASSERT(1 == uv_run(uv_default_loop(), UV_RUN_ONCE)); /* queue write_cb */
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_ONCE)); /* process write_cb */
ASSERT(write_req.handle == NULL); /* check for signaled completion of write_cb */
n = buf.len;
#endif
ASSERT(n == sizeof(data));
nwritten += n;
}
@ -96,12 +119,14 @@ TEST_IMPL(pipe_set_non_blocking) {
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
ASSERT(0 == uv_thread_join(&thread));
#ifdef _WIN32
ASSERT(0 == _close(fd[0])); /* fd[1] is closed by uv_close(). */
#else
ASSERT(0 == close(fd[0])); /* fd[1] is closed by uv_close(). */
#endif
fd[0] = -1;
uv_barrier_destroy(&ctx.barrier);
MAKE_VALGRIND_HAPPY();
return 0;
}
#endif /* !_WIN32 */

View File

@ -29,11 +29,9 @@
#include <string.h>
#ifdef _WIN32
# if defined(__MINGW32__)
# include <basetyps.h>
# endif
# include <shellapi.h>
# include <wchar.h>
typedef BOOL (WINAPI *sCompareObjectHandles)(_In_ HANDLE, _In_ HANDLE);
#else
# include <unistd.h>
# include <sys/wait.h>
@ -49,9 +47,7 @@ static char exepath[1024];
static size_t exepath_size = 1024;
static char* args[5];
static int no_term_signal;
#ifndef _WIN32
static int timer_counter;
#endif
static uv_tcp_t tcp_server;
#define OUTPUT_SIZE 1024
@ -140,12 +136,10 @@ static void on_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
}
#ifndef _WIN32
static void on_read_once(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
uv_read_stop(tcp);
on_read(tcp, nread, buf);
}
#endif
static void write_cb(uv_write_t* req, int status) {
@ -154,6 +148,11 @@ static void write_cb(uv_write_t* req, int status) {
}
static void write_null_cb(uv_write_t* req, int status) {
ASSERT(status == 0);
}
static void init_process_options(char* test, uv_exit_cb exit_cb) {
/* Note spawn_helper1 defined in test/run-tests.c */
int r = uv_exepath(exepath, &exepath_size);
@ -177,11 +176,9 @@ static void timer_cb(uv_timer_t* handle) {
}
#ifndef _WIN32
static void timer_counter_cb(uv_timer_t* handle) {
++timer_counter;
}
#endif
TEST_IMPL(spawn_fails) {
@ -1579,17 +1576,27 @@ TEST_IMPL(spawn_auto_unref) {
}
#ifndef _WIN32
TEST_IMPL(spawn_fs_open) {
int fd;
int r;
uv_os_fd_t fd;
uv_os_fd_t dup_fd;
uv_fs_t fs_req;
uv_pipe_t in;
uv_write_t write_req;
uv_write_t write_req2;
uv_buf_t buf;
uv_stdio_container_t stdio[1];
#ifdef _WIN32
const char dev_null[] = "NUL";
HMODULE kernelbase_module;
sCompareObjectHandles pCompareObjectHandles; /* function introduced in Windows 10 */
#else
const char dev_null[] = "/dev/null";
#endif
fd = uv_fs_open(NULL, &fs_req, "/dev/null", O_RDWR, 0, NULL);
ASSERT(fd >= 0);
r = uv_fs_open(NULL, &fs_req, dev_null, O_RDWR, 0, NULL);
ASSERT(r != -1);
fd = uv_get_osfhandle((uv_file) fs_req.result);
uv_fs_req_cleanup(&fs_req);
init_process_options("spawn_helper8", exit_cb);
@ -1601,13 +1608,28 @@ TEST_IMPL(spawn_fs_open) {
options.stdio[0].data.stream = (uv_stream_t*) &in;
options.stdio_count = 1;
/* make an inheritable copy */
#ifdef _WIN32
ASSERT(0 != DuplicateHandle(GetCurrentProcess(), fd, GetCurrentProcess(), &dup_fd,
0, /* inherit */ TRUE, DUPLICATE_SAME_ACCESS));
kernelbase_module = GetModuleHandleA("kernelbase.dll");
pCompareObjectHandles = (sCompareObjectHandles)
GetProcAddress(kernelbase_module, "CompareObjectHandles");
ASSERT(pCompareObjectHandles == NULL || pCompareObjectHandles(fd, dup_fd));
#else
dup_fd = dup(fd);
#endif
ASSERT(0 == uv_spawn(uv_default_loop(), &process, &options));
buf = uv_buf_init((char*) &fd, sizeof(fd));
ASSERT(0 == uv_write(&write_req, (uv_stream_t*) &in, &buf, 1, write_cb));
ASSERT(0 == uv_write(&write_req, (uv_stream_t*) &in, &buf, 1, write_null_cb));
buf = uv_buf_init((char*) &dup_fd, sizeof(fd));
ASSERT(0 == uv_write(&write_req2, (uv_stream_t*) &in, &buf, 1, write_cb));
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_DEFAULT));
ASSERT(0 == uv_fs_close(NULL, &fs_req, fd, NULL));
ASSERT(0 == uv_fs_close(NULL, &fs_req, r, NULL));
ASSERT(exit_cb_called == 1);
ASSERT(close_cb_called == 2); /* One for `in`, one for process */
@ -1615,17 +1637,20 @@ TEST_IMPL(spawn_fs_open) {
MAKE_VALGRIND_HAPPY();
return 0;
}
#endif /* !_WIN32 */
#ifndef _WIN32
TEST_IMPL(closed_fd_events) {
uv_stdio_container_t stdio[3];
uv_pipe_t pipe_handle;
int fd[2];
uv_fs_t req;
uv_buf_t bufs[1];
uv_file fd[2];
bufs[0] = uv_buf_init("", 1);
/* create a pipe and share it with a child process */
ASSERT(0 == pipe(fd));
ASSERT(0 == uv_pipe(fd, 0, 0));
ASSERT(fd[0] > 2);
ASSERT(fd[1] > 2);
/* spawn_helper4 blocks indefinitely. */
init_process_options("spawn_helper4", exit_cb);
@ -1642,12 +1667,18 @@ TEST_IMPL(closed_fd_events) {
/* read from the pipe with uv */
ASSERT(0 == uv_pipe_init(uv_default_loop(), &pipe_handle, 0));
ASSERT(0 == uv_pipe_open(&pipe_handle, fd[0]));
/* uv_pipe_open() takes ownership of the file descriptor. */
fd[0] = -1;
ASSERT(0 == uv_read_start((uv_stream_t*) &pipe_handle, on_alloc, on_read_once));
ASSERT(1 == write(fd[1], "", 1));
ASSERT(1 == uv_fs_write(NULL, &req, fd[1], bufs, 1, -1, NULL));
ASSERT(req.result == 1);
uv_fs_req_cleanup(&req);
#ifdef _WIN32
ASSERT(1 == uv_run(uv_default_loop(), UV_RUN_ONCE));
#endif
ASSERT(0 == uv_run(uv_default_loop(), UV_RUN_ONCE));
/* should have received just one byte */
@ -1656,7 +1687,9 @@ TEST_IMPL(closed_fd_events) {
/* close the pipe and see if we still get events */
uv_close((uv_handle_t*) &pipe_handle, close_cb);
ASSERT(1 == write(fd[1], "", 1));
ASSERT(1 == uv_fs_write(NULL, &req, fd[1], bufs, 1, -1, NULL));
ASSERT(req.result == 1);
uv_fs_req_cleanup(&req);
ASSERT(0 == uv_timer_init(uv_default_loop(), &timer));
ASSERT(0 == uv_timer_start(&timer, timer_counter_cb, 10, 0));
@ -1669,13 +1702,17 @@ TEST_IMPL(closed_fd_events) {
ASSERT(timer_counter == 1);
/* cleanup */
ASSERT(0 == uv_process_kill(&process, /* SIGTERM */ 15));
ASSERT(0 == uv_process_kill(&process, SIGTERM));
#ifdef _WIN32
ASSERT(0 == _close(fd[1]));
#else
ASSERT(0 == close(fd[1]));
#endif
MAKE_VALGRIND_HAPPY();
return 0;
}
#endif /* !_WIN32 */
TEST_IMPL(spawn_reads_child_path) {
int r;
@ -1746,38 +1783,6 @@ TEST_IMPL(spawn_reads_child_path) {
return 0;
}
#ifndef _WIN32
static int mpipe(int *fds) {
if (pipe(fds) == -1)
return -1;
if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) == -1 ||
fcntl(fds[1], F_SETFD, FD_CLOEXEC) == -1) {
close(fds[0]);
close(fds[1]);
return -1;
}
return 0;
}
#else
static int mpipe(int *fds) {
SECURITY_ATTRIBUTES attr;
HANDLE readh, writeh;
attr.nLength = sizeof(attr);
attr.lpSecurityDescriptor = NULL;
attr.bInheritHandle = FALSE;
if (!CreatePipe(&readh, &writeh, &attr, 0))
return -1;
fds[0] = _open_osfhandle((intptr_t)readh, 0);
fds[1] = _open_osfhandle((intptr_t)writeh, 0);
if (fds[0] == -1 || fds[1] == -1) {
CloseHandle(readh);
CloseHandle(writeh);
return -1;
}
return 0;
}
#endif /* !_WIN32 */
TEST_IMPL(spawn_inherit_streams) {
uv_process_t child_req;
uv_stdio_container_t child_stdio[2];
@ -1803,8 +1808,8 @@ TEST_IMPL(spawn_inherit_streams) {
ASSERT(uv_pipe_init(loop, &pipe_stdin_parent, 0) == 0);
ASSERT(uv_pipe_init(loop, &pipe_stdout_parent, 0) == 0);
ASSERT(mpipe(fds_stdin) != -1);
ASSERT(mpipe(fds_stdout) != -1);
ASSERT(uv_pipe(fds_stdin, 0, 0) == 0);
ASSERT(uv_pipe(fds_stdout, 0, 0) == 0);
ASSERT(uv_pipe_open(&pipe_stdin_child, fds_stdin[0]) == 0);
ASSERT(uv_pipe_open(&pipe_stdout_child, fds_stdout[1]) == 0);