pipe: allow queueing pending handles
Introduce `int uv_pipe_pending_count(uv_pipe_t*)` and
`uv_handle_type uv_pipe_pending_type(uv_pipe_t*)`. They should be
used in IPC pipe's read cb to accept incoming handles:
int count = uv_pipe_pending_count(pipe);
int i;
for (i = 0; i < count; i++) {
uv_handle_type type = uv_pipe_pending_type(pipe);
/* ... */
uv_accept(...);
}
This commit is contained in:
parent
2930d04e5b
commit
b05a3ee4d1
@ -158,6 +158,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
|
|||||||
test/test-pipe-bind-error.c \
|
test/test-pipe-bind-error.c \
|
||||||
test/test-pipe-connect-error.c \
|
test/test-pipe-connect-error.c \
|
||||||
test/test-pipe-getsockname.c \
|
test/test-pipe-getsockname.c \
|
||||||
|
test/test-pipe-sendmsg.c \
|
||||||
test/test-pipe-server-close.c \
|
test/test-pipe-server-close.c \
|
||||||
test/test-platform-output.c \
|
test/test-platform-output.c \
|
||||||
test/test-poll-close.c \
|
test/test-poll-close.c \
|
||||||
|
|||||||
@ -115,6 +115,7 @@ test/test-pass-always.c
|
|||||||
test/test-ping-pong.c
|
test/test-ping-pong.c
|
||||||
test/test-pipe-bind-error.c
|
test/test-pipe-bind-error.c
|
||||||
test/test-pipe-connect-error.c
|
test/test-pipe-connect-error.c
|
||||||
|
test/test-pipe-sendmsg.c
|
||||||
test/test-pipe-server-close.c
|
test/test-pipe-server-close.c
|
||||||
test/test-platform-output.c
|
test/test-platform-output.c
|
||||||
test/test-poll-close.c
|
test/test-poll-close.c
|
||||||
|
|||||||
@ -230,6 +230,7 @@ typedef struct {
|
|||||||
uv_connection_cb connection_cb; \
|
uv_connection_cb connection_cb; \
|
||||||
int delayed_error; \
|
int delayed_error; \
|
||||||
int accepted_fd; \
|
int accepted_fd; \
|
||||||
|
void* queued_fds; \
|
||||||
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
|
||||||
|
|
||||||
#define UV_TCP_PRIVATE_FIELDS /* empty */
|
#define UV_TCP_PRIVATE_FIELDS /* empty */
|
||||||
|
|||||||
@ -424,10 +424,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s);
|
|||||||
uv_write_t ipc_header_write_req; \
|
uv_write_t ipc_header_write_req; \
|
||||||
int ipc_pid; \
|
int ipc_pid; \
|
||||||
uint64_t remaining_ipc_rawdata_bytes; \
|
uint64_t remaining_ipc_rawdata_bytes; \
|
||||||
unsigned char reserved[sizeof(void*)]; \
|
|
||||||
struct { \
|
struct { \
|
||||||
WSAPROTOCOL_INFOW* socket_info; \
|
void* queue[2]; \
|
||||||
int tcp_connection; \
|
int queue_len; \
|
||||||
} pending_ipc_info; \
|
} pending_ipc_info; \
|
||||||
uv_write_t* non_overlapped_writes_tail;
|
uv_write_t* non_overlapped_writes_tail;
|
||||||
|
|
||||||
|
|||||||
28
include/uv.h
28
include/uv.h
@ -398,17 +398,6 @@ typedef void (*uv_alloc_cb)(uv_handle_t* handle,
|
|||||||
typedef void (*uv_read_cb)(uv_stream_t* stream,
|
typedef void (*uv_read_cb)(uv_stream_t* stream,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf);
|
const uv_buf_t* buf);
|
||||||
|
|
||||||
/*
|
|
||||||
* Just like the uv_read_cb except that if the pending parameter is true
|
|
||||||
* then you can use uv_accept() to pull the new handle into the process.
|
|
||||||
* If no handle is pending then pending will be UV_UNKNOWN_HANDLE.
|
|
||||||
*/
|
|
||||||
typedef void (*uv_read2_cb)(uv_pipe_t* pipe,
|
|
||||||
ssize_t nread,
|
|
||||||
const uv_buf_t* buf,
|
|
||||||
uv_handle_type pending);
|
|
||||||
|
|
||||||
typedef void (*uv_write_cb)(uv_write_t* req, int status);
|
typedef void (*uv_write_cb)(uv_write_t* req, int status);
|
||||||
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
|
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
|
||||||
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
|
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
|
||||||
@ -611,7 +600,6 @@ UV_EXTERN uv_buf_t uv_buf_init(char* base, unsigned int len);
|
|||||||
size_t write_queue_size; \
|
size_t write_queue_size; \
|
||||||
uv_alloc_cb alloc_cb; \
|
uv_alloc_cb alloc_cb; \
|
||||||
uv_read_cb read_cb; \
|
uv_read_cb read_cb; \
|
||||||
uv_read2_cb read2_cb; \
|
|
||||||
/* private */ \
|
/* private */ \
|
||||||
UV_STREAM_PRIVATE_FIELDS
|
UV_STREAM_PRIVATE_FIELDS
|
||||||
|
|
||||||
@ -660,13 +648,6 @@ UV_EXTERN int uv_read_start(uv_stream_t*,
|
|||||||
|
|
||||||
UV_EXTERN int uv_read_stop(uv_stream_t*);
|
UV_EXTERN int uv_read_stop(uv_stream_t*);
|
||||||
|
|
||||||
/*
|
|
||||||
* Extended read methods for receiving handles over a pipe. The pipe must be
|
|
||||||
* initialized with ipc == 1.
|
|
||||||
*/
|
|
||||||
UV_EXTERN int uv_read2_start(uv_stream_t*, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read2_cb read_cb);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Write data to stream. Buffers are written in order. Example:
|
* Write data to stream. Buffers are written in order. Example:
|
||||||
@ -1213,6 +1194,15 @@ UV_EXTERN int uv_pipe_getsockname(const uv_pipe_t* handle,
|
|||||||
*/
|
*/
|
||||||
UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count);
|
UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Used to receive handles over ipc pipes.
|
||||||
|
*
|
||||||
|
* First - call `uv_pipe_pending_count`, if it is > 0 - initialize handle
|
||||||
|
* using type, returned by `uv_pipe_pending_type` and call
|
||||||
|
* `uv_accept(pipe, handle)`.
|
||||||
|
*/
|
||||||
|
UV_EXTERN int uv_pipe_pending_count(uv_pipe_t* handle);
|
||||||
|
UV_EXTERN uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* uv_poll_t is a subclass of uv_handle_t.
|
* uv_poll_t is a subclass of uv_handle_t.
|
||||||
|
|||||||
@ -120,6 +120,8 @@
|
|||||||
# define O_CLOEXEC 0x00100000
|
# define O_CLOEXEC 0x00100000
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct uv__stream_queued_fds_s uv__stream_queued_fds_t;
|
||||||
|
|
||||||
/* handle flags */
|
/* handle flags */
|
||||||
enum {
|
enum {
|
||||||
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
|
UV_CLOSING = 0x01, /* uv_close() called but not finished. */
|
||||||
@ -142,6 +144,13 @@ typedef enum {
|
|||||||
UV_CLOCK_FAST = 1 /* Use the fastest clock with <= 1ms granularity. */
|
UV_CLOCK_FAST = 1 /* Use the fastest clock with <= 1ms granularity. */
|
||||||
} uv_clocktype_t;
|
} uv_clocktype_t;
|
||||||
|
|
||||||
|
struct uv__stream_queued_fds_s {
|
||||||
|
unsigned int size;
|
||||||
|
unsigned int offset;
|
||||||
|
int fds[1];
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/* core */
|
/* core */
|
||||||
int uv__nonblock(int fd, int set);
|
int uv__nonblock(int fd, int set);
|
||||||
int uv__close(int fd);
|
int uv__close(int fd);
|
||||||
@ -226,6 +235,7 @@ void uv__tcp_close(uv_tcp_t* handle);
|
|||||||
void uv__timer_close(uv_timer_t* handle);
|
void uv__timer_close(uv_timer_t* handle);
|
||||||
void uv__udp_close(uv_udp_t* handle);
|
void uv__udp_close(uv_udp_t* handle);
|
||||||
void uv__udp_finish_close(uv_udp_t* handle);
|
void uv__udp_finish_close(uv_udp_t* handle);
|
||||||
|
uv_handle_type uv__handle_type(int fd);
|
||||||
|
|
||||||
#if defined(__APPLE__)
|
#if defined(__APPLE__)
|
||||||
int uv___stream_fd(uv_stream_t* handle);
|
int uv___stream_fd(uv_stream_t* handle);
|
||||||
|
|||||||
@ -246,3 +246,31 @@ int uv_pipe_getsockname(const uv_pipe_t* handle, char* buf, size_t* len) {
|
|||||||
|
|
||||||
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
|
void uv_pipe_pending_instances(uv_pipe_t* handle, int count) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int uv_pipe_pending_count(uv_pipe_t* handle) {
|
||||||
|
uv__stream_queued_fds_t* queued_fds;
|
||||||
|
|
||||||
|
if (!handle->ipc)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (handle->accepted_fd == -1)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (handle->queued_fds == NULL)
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
queued_fds = handle->queued_fds;
|
||||||
|
return queued_fds->offset + 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
|
||||||
|
if (!handle->ipc)
|
||||||
|
return UV_UNKNOWN_HANDLE;
|
||||||
|
|
||||||
|
if (handle->accepted_fd == -1)
|
||||||
|
return UV_UNKNOWN_HANDLE;
|
||||||
|
else
|
||||||
|
return uv__handle_type(handle->accepted_fd);
|
||||||
|
}
|
||||||
|
|||||||
@ -112,13 +112,13 @@ void uv__stream_init(uv_loop_t* loop,
|
|||||||
|
|
||||||
uv__handle_init(loop, (uv_handle_t*)stream, type);
|
uv__handle_init(loop, (uv_handle_t*)stream, type);
|
||||||
stream->read_cb = NULL;
|
stream->read_cb = NULL;
|
||||||
stream->read2_cb = NULL;
|
|
||||||
stream->alloc_cb = NULL;
|
stream->alloc_cb = NULL;
|
||||||
stream->close_cb = NULL;
|
stream->close_cb = NULL;
|
||||||
stream->connection_cb = NULL;
|
stream->connection_cb = NULL;
|
||||||
stream->connect_req = NULL;
|
stream->connect_req = NULL;
|
||||||
stream->shutdown_req = NULL;
|
stream->shutdown_req = NULL;
|
||||||
stream->accepted_fd = -1;
|
stream->accepted_fd = -1;
|
||||||
|
stream->queued_fds = NULL;
|
||||||
stream->delayed_error = 0;
|
stream->delayed_error = 0;
|
||||||
QUEUE_INIT(&stream->write_queue);
|
QUEUE_INIT(&stream->write_queue);
|
||||||
QUEUE_INIT(&stream->write_completed_queue);
|
QUEUE_INIT(&stream->write_completed_queue);
|
||||||
@ -570,6 +570,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
|||||||
if (server->accepted_fd == -1)
|
if (server->accepted_fd == -1)
|
||||||
return -EAGAIN;
|
return -EAGAIN;
|
||||||
|
|
||||||
|
err = 0;
|
||||||
switch (client->type) {
|
switch (client->type) {
|
||||||
case UV_NAMED_PIPE:
|
case UV_NAMED_PIPE:
|
||||||
case UV_TCP:
|
case UV_TCP:
|
||||||
@ -579,8 +580,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
|||||||
if (err) {
|
if (err) {
|
||||||
/* TODO handle error */
|
/* TODO handle error */
|
||||||
uv__close(server->accepted_fd);
|
uv__close(server->accepted_fd);
|
||||||
server->accepted_fd = -1;
|
goto done;
|
||||||
return err;
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -588,8 +588,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
|||||||
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
|
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
|
||||||
if (err) {
|
if (err) {
|
||||||
uv__close(server->accepted_fd);
|
uv__close(server->accepted_fd);
|
||||||
server->accepted_fd = -1;
|
goto done;
|
||||||
return err;
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@ -597,9 +596,33 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
|||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
|
done:
|
||||||
server->accepted_fd = -1;
|
/* Process queued fds */
|
||||||
return 0;
|
if (server->queued_fds != NULL) {
|
||||||
|
uv__stream_queued_fds_t* queued_fds;
|
||||||
|
|
||||||
|
queued_fds = server->queued_fds;
|
||||||
|
|
||||||
|
/* Read first */
|
||||||
|
server->accepted_fd = queued_fds->fds[0];
|
||||||
|
|
||||||
|
/* All read, free */
|
||||||
|
assert(queued_fds->offset > 0);
|
||||||
|
if (--queued_fds->offset == 0) {
|
||||||
|
free(queued_fds);
|
||||||
|
server->queued_fds = NULL;
|
||||||
|
} else {
|
||||||
|
/* Shift rest */
|
||||||
|
memmove(queued_fds->fds,
|
||||||
|
queued_fds->fds + 1,
|
||||||
|
queued_fds->offset * sizeof(*queued_fds->fds));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
server->accepted_fd = -1;
|
||||||
|
if (err == 0)
|
||||||
|
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
|
||||||
|
}
|
||||||
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -777,12 +800,12 @@ start:
|
|||||||
msg.msg_flags = 0;
|
msg.msg_flags = 0;
|
||||||
|
|
||||||
msg.msg_control = (void*) scratch;
|
msg.msg_control = (void*) scratch;
|
||||||
msg.msg_controllen = CMSG_LEN(sizeof(fd_to_send));
|
msg.msg_controllen = CMSG_SPACE(sizeof(fd_to_send));
|
||||||
|
|
||||||
cmsg = CMSG_FIRSTHDR(&msg);
|
cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
cmsg->cmsg_level = SOL_SOCKET;
|
cmsg->cmsg_level = SOL_SOCKET;
|
||||||
cmsg->cmsg_type = SCM_RIGHTS;
|
cmsg->cmsg_type = SCM_RIGHTS;
|
||||||
cmsg->cmsg_len = msg.msg_controllen;
|
cmsg->cmsg_len = CMSG_LEN(sizeof(fd_to_send));
|
||||||
|
|
||||||
/* silence aliasing warning */
|
/* silence aliasing warning */
|
||||||
{
|
{
|
||||||
@ -913,7 +936,7 @@ static void uv__write_callbacks(uv_stream_t* stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static uv_handle_type uv__handle_type(int fd) {
|
uv_handle_type uv__handle_type(int fd) {
|
||||||
struct sockaddr_storage ss;
|
struct sockaddr_storage ss;
|
||||||
socklen_t len;
|
socklen_t len;
|
||||||
int type;
|
int type;
|
||||||
@ -947,24 +970,106 @@ static uv_handle_type uv__handle_type(int fd) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void uv__stream_read_cb(uv_stream_t* stream,
|
|
||||||
int status,
|
|
||||||
const uv_buf_t* buf,
|
|
||||||
uv_handle_type type) {
|
|
||||||
if (stream->read_cb != NULL)
|
|
||||||
stream->read_cb(stream, status, buf);
|
|
||||||
else
|
|
||||||
stream->read2_cb((uv_pipe_t*) stream, status, buf, type);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
|
static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
|
||||||
stream->flags |= UV_STREAM_READ_EOF;
|
stream->flags |= UV_STREAM_READ_EOF;
|
||||||
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
|
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
|
||||||
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
|
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
|
||||||
uv__handle_stop(stream);
|
uv__handle_stop(stream);
|
||||||
uv__stream_osx_interrupt_select(stream);
|
uv__stream_osx_interrupt_select(stream);
|
||||||
uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
|
stream->read_cb(stream, UV_EOF, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
|
||||||
|
uv__stream_queued_fds_t* queued_fds;
|
||||||
|
unsigned int queue_size;
|
||||||
|
|
||||||
|
queued_fds = stream->queued_fds;
|
||||||
|
if (queued_fds == NULL) {
|
||||||
|
queue_size = 8;
|
||||||
|
queued_fds = malloc((queue_size - 1) * sizeof(*queued_fds->fds) +
|
||||||
|
sizeof(*queued_fds));
|
||||||
|
if (queued_fds == NULL)
|
||||||
|
return -ENOMEM;
|
||||||
|
queued_fds->size = queue_size;
|
||||||
|
queued_fds->offset = 0;
|
||||||
|
stream->queued_fds = queued_fds;
|
||||||
|
|
||||||
|
/* Grow */
|
||||||
|
} else if (queued_fds->size == queued_fds->offset) {
|
||||||
|
queue_size = queued_fds->size + 8;
|
||||||
|
queued_fds = realloc(queued_fds,
|
||||||
|
(queue_size - 1) * sizeof(*queued_fds->fds) +
|
||||||
|
sizeof(*queued_fds));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Allocation failure, report back.
|
||||||
|
* NOTE: if it is fatal - sockets will be closed in uv__stream_close
|
||||||
|
*/
|
||||||
|
if (queued_fds == NULL)
|
||||||
|
return -ENOMEM;
|
||||||
|
queued_fds->size = queue_size;
|
||||||
|
stream->queued_fds = queued_fds;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Put fd in a queue */
|
||||||
|
queued_fds->fds[queued_fds->offset++] = fd;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#define UV__CMSG_FD_COUNT 64
|
||||||
|
#define UV__CMSG_FD_SIZE (UV__CMSG_FD_COUNT * sizeof(int))
|
||||||
|
|
||||||
|
|
||||||
|
static int uv__stream_recv_cmsg(uv_stream_t* stream, struct msghdr* msg) {
|
||||||
|
struct cmsghdr* cmsg;
|
||||||
|
|
||||||
|
for (cmsg = CMSG_FIRSTHDR(msg); cmsg != NULL; cmsg = CMSG_NXTHDR(msg, cmsg)) {
|
||||||
|
char* start;
|
||||||
|
char* end;
|
||||||
|
int err;
|
||||||
|
void* pv;
|
||||||
|
int* pi;
|
||||||
|
unsigned int i;
|
||||||
|
unsigned int count;
|
||||||
|
|
||||||
|
if (cmsg->cmsg_type != SCM_RIGHTS) {
|
||||||
|
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
|
||||||
|
cmsg->cmsg_type);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* silence aliasing warning */
|
||||||
|
pv = CMSG_DATA(cmsg);
|
||||||
|
pi = pv;
|
||||||
|
|
||||||
|
/* Count available fds */
|
||||||
|
start = (char*) cmsg;
|
||||||
|
end = (char*) cmsg + cmsg->cmsg_len;
|
||||||
|
count = 0;
|
||||||
|
while (start + CMSG_LEN(count * sizeof(*pi)) < end)
|
||||||
|
count++;
|
||||||
|
assert(start + CMSG_LEN(count * sizeof(*pi)) == end);
|
||||||
|
|
||||||
|
for (i = 0; i < count; i++) {
|
||||||
|
/* Already has accepted fd, queue now */
|
||||||
|
if (stream->accepted_fd != -1) {
|
||||||
|
err = uv__stream_queue_fd(stream, pi[i]);
|
||||||
|
if (err != 0) {
|
||||||
|
/* Close rest */
|
||||||
|
for (; i < count; i++)
|
||||||
|
uv__close(pi[i]);
|
||||||
|
return err;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stream->accepted_fd = pi[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -972,9 +1077,10 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
uv_buf_t buf;
|
uv_buf_t buf;
|
||||||
ssize_t nread;
|
ssize_t nread;
|
||||||
struct msghdr msg;
|
struct msghdr msg;
|
||||||
struct cmsghdr* cmsg;
|
char cmsg_space[CMSG_SPACE(UV__CMSG_FD_SIZE)];
|
||||||
char cmsg_space[64];
|
|
||||||
int count;
|
int count;
|
||||||
|
int err;
|
||||||
|
int is_ipc;
|
||||||
|
|
||||||
stream->flags &= ~UV_STREAM_READ_PARTIAL;
|
stream->flags &= ~UV_STREAM_READ_PARTIAL;
|
||||||
|
|
||||||
@ -983,10 +1089,12 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
*/
|
*/
|
||||||
count = 32;
|
count = 32;
|
||||||
|
|
||||||
|
is_ipc = stream->type == UV_NAMED_PIPE && ((uv_pipe_t*) stream)->ipc;
|
||||||
|
|
||||||
/* XXX: Maybe instead of having UV_STREAM_READING we just test if
|
/* XXX: Maybe instead of having UV_STREAM_READING we just test if
|
||||||
* tcp->read_cb is NULL or not?
|
* tcp->read_cb is NULL or not?
|
||||||
*/
|
*/
|
||||||
while ((stream->read_cb || stream->read2_cb)
|
while (stream->read_cb
|
||||||
&& (stream->flags & UV_STREAM_READING)
|
&& (stream->flags & UV_STREAM_READING)
|
||||||
&& (count-- > 0)) {
|
&& (count-- > 0)) {
|
||||||
assert(stream->alloc_cb != NULL);
|
assert(stream->alloc_cb != NULL);
|
||||||
@ -994,29 +1102,28 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
|
stream->alloc_cb((uv_handle_t*)stream, 64 * 1024, &buf);
|
||||||
if (buf.len == 0) {
|
if (buf.len == 0) {
|
||||||
/* User indicates it can't or won't handle the read. */
|
/* User indicates it can't or won't handle the read. */
|
||||||
uv__stream_read_cb(stream, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
|
stream->read_cb(stream, UV_ENOBUFS, &buf);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(buf.base != NULL);
|
assert(buf.base != NULL);
|
||||||
assert(uv__stream_fd(stream) >= 0);
|
assert(uv__stream_fd(stream) >= 0);
|
||||||
|
|
||||||
if (stream->read_cb) {
|
if (!is_ipc) {
|
||||||
do {
|
do {
|
||||||
nread = read(uv__stream_fd(stream), buf.base, buf.len);
|
nread = read(uv__stream_fd(stream), buf.base, buf.len);
|
||||||
}
|
}
|
||||||
while (nread < 0 && errno == EINTR);
|
while (nread < 0 && errno == EINTR);
|
||||||
} else {
|
} else {
|
||||||
assert(stream->read2_cb);
|
/* ipc uses recvmsg */
|
||||||
/* read2_cb uses recvmsg */
|
|
||||||
msg.msg_flags = 0;
|
msg.msg_flags = 0;
|
||||||
msg.msg_iov = (struct iovec*) &buf;
|
msg.msg_iov = (struct iovec*) &buf;
|
||||||
msg.msg_iovlen = 1;
|
msg.msg_iovlen = 1;
|
||||||
msg.msg_name = NULL;
|
msg.msg_name = NULL;
|
||||||
msg.msg_namelen = 0;
|
msg.msg_namelen = 0;
|
||||||
/* Set up to receive a descriptor even if one isn't in the message */
|
/* Set up to receive a descriptor even if one isn't in the message */
|
||||||
msg.msg_controllen = 64;
|
msg.msg_controllen = sizeof(cmsg_space);
|
||||||
msg.msg_control = (void*) cmsg_space;
|
msg.msg_control = cmsg_space;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
|
nread = uv__recvmsg(uv__stream_fd(stream), &msg, 0);
|
||||||
@ -1032,10 +1139,10 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
|
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
|
||||||
uv__stream_osx_interrupt_select(stream);
|
uv__stream_osx_interrupt_select(stream);
|
||||||
}
|
}
|
||||||
uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE);
|
stream->read_cb(stream, 0, &buf);
|
||||||
} else {
|
} else {
|
||||||
/* Error. User should call uv_close(). */
|
/* Error. User should call uv_close(). */
|
||||||
uv__stream_read_cb(stream, -errno, &buf, UV_UNKNOWN_HANDLE);
|
stream->read_cb(stream, -errno, &buf);
|
||||||
assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
|
assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) &&
|
||||||
"stream->read_cb(status=-1) did not call uv_close()");
|
"stream->read_cb(status=-1) did not call uv_close()");
|
||||||
}
|
}
|
||||||
@ -1047,50 +1154,14 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
/* Successful read */
|
/* Successful read */
|
||||||
ssize_t buflen = buf.len;
|
ssize_t buflen = buf.len;
|
||||||
|
|
||||||
if (stream->read_cb) {
|
if (is_ipc) {
|
||||||
stream->read_cb(stream, nread, &buf);
|
err = uv__stream_recv_cmsg(stream, &msg);
|
||||||
} else {
|
if (err != 0) {
|
||||||
assert(stream->read2_cb);
|
stream->read_cb(stream, err, NULL);
|
||||||
|
return;
|
||||||
/*
|
|
||||||
* XXX: Some implementations can send multiple file descriptors in a
|
|
||||||
* single message. We should be using CMSG_NXTHDR() to walk the
|
|
||||||
* chain to get at them all. This would require changing the API to
|
|
||||||
* hand these back up the caller, is a pain.
|
|
||||||
*/
|
|
||||||
|
|
||||||
for (cmsg = CMSG_FIRSTHDR(&msg);
|
|
||||||
msg.msg_controllen > 0 && cmsg != NULL;
|
|
||||||
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
|
||||||
|
|
||||||
if (cmsg->cmsg_type == SCM_RIGHTS) {
|
|
||||||
if (stream->accepted_fd != -1) {
|
|
||||||
fprintf(stderr, "(libuv) ignoring extra FD received\n");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* silence aliasing warning */
|
|
||||||
{
|
|
||||||
void* pv = CMSG_DATA(cmsg);
|
|
||||||
int* pi = pv;
|
|
||||||
stream->accepted_fd = *pi;
|
|
||||||
}
|
|
||||||
|
|
||||||
} else {
|
|
||||||
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
|
|
||||||
cmsg->cmsg_type);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (stream->accepted_fd >= 0) {
|
|
||||||
stream->read2_cb((uv_pipe_t*) stream,
|
|
||||||
nread,
|
|
||||||
&buf,
|
|
||||||
uv__handle_type(stream->accepted_fd));
|
|
||||||
} else {
|
|
||||||
stream->read2_cb((uv_pipe_t*) stream, nread, &buf, UV_UNKNOWN_HANDLE);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
stream->read_cb(stream, nread, &buf);
|
||||||
|
|
||||||
/* Return if we didn't fill the buffer, there is no more data to read. */
|
/* Return if we didn't fill the buffer, there is no more data to read. */
|
||||||
if (nread < buflen) {
|
if (nread < buflen) {
|
||||||
@ -1102,6 +1173,10 @@ static void uv__read(uv_stream_t* stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#undef UV__CMSG_FD_COUNT
|
||||||
|
#undef UV__CMSG_FD_SIZE
|
||||||
|
|
||||||
|
|
||||||
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||||
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
||||||
"uv_shutdown (unix) only supports uv_handle_t right now");
|
"uv_shutdown (unix) only supports uv_handle_t right now");
|
||||||
@ -1371,10 +1446,9 @@ int uv_try_write(uv_stream_t* stream,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int uv__read_start_common(uv_stream_t* stream,
|
int uv_read_start(uv_stream_t* stream,
|
||||||
uv_alloc_cb alloc_cb,
|
uv_alloc_cb alloc_cb,
|
||||||
uv_read_cb read_cb,
|
uv_read_cb read_cb) {
|
||||||
uv_read2_cb read2_cb) {
|
|
||||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
|
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
|
||||||
stream->type == UV_TTY);
|
stream->type == UV_TTY);
|
||||||
|
|
||||||
@ -1394,7 +1468,6 @@ static int uv__read_start_common(uv_stream_t* stream,
|
|||||||
assert(alloc_cb);
|
assert(alloc_cb);
|
||||||
|
|
||||||
stream->read_cb = read_cb;
|
stream->read_cb = read_cb;
|
||||||
stream->read2_cb = read2_cb;
|
|
||||||
stream->alloc_cb = alloc_cb;
|
stream->alloc_cb = alloc_cb;
|
||||||
|
|
||||||
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
|
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
|
||||||
@ -1405,18 +1478,6 @@ static int uv__read_start_common(uv_stream_t* stream,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read_cb read_cb) {
|
|
||||||
return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read2_cb read_cb) {
|
|
||||||
return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int uv_read_stop(uv_stream_t* stream) {
|
int uv_read_stop(uv_stream_t* stream) {
|
||||||
/* Sanity check. We're going to stop the handle unless it's primed for
|
/* Sanity check. We're going to stop the handle unless it's primed for
|
||||||
* writing but that means there should be some kind of write action in
|
* writing but that means there should be some kind of write action in
|
||||||
@ -1435,7 +1496,6 @@ int uv_read_stop(uv_stream_t* stream) {
|
|||||||
uv__stream_osx_interrupt_select(stream);
|
uv__stream_osx_interrupt_select(stream);
|
||||||
|
|
||||||
stream->read_cb = NULL;
|
stream->read_cb = NULL;
|
||||||
stream->read2_cb = NULL;
|
|
||||||
stream->alloc_cb = NULL;
|
stream->alloc_cb = NULL;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -1469,6 +1529,9 @@ int uv___stream_fd(uv_stream_t* handle) {
|
|||||||
|
|
||||||
|
|
||||||
void uv__stream_close(uv_stream_t* handle) {
|
void uv__stream_close(uv_stream_t* handle) {
|
||||||
|
unsigned int i;
|
||||||
|
uv__stream_queued_fds_t* queued_fds;
|
||||||
|
|
||||||
#if defined(__APPLE__)
|
#if defined(__APPLE__)
|
||||||
/* Terminate select loop first */
|
/* Terminate select loop first */
|
||||||
if (handle->select != NULL) {
|
if (handle->select != NULL) {
|
||||||
@ -1506,6 +1569,15 @@ void uv__stream_close(uv_stream_t* handle) {
|
|||||||
handle->accepted_fd = -1;
|
handle->accepted_fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Close all queued fds */
|
||||||
|
if (handle->queued_fds != NULL) {
|
||||||
|
queued_fds = handle->queued_fds;
|
||||||
|
for (i = 0; i < queued_fds->offset; i++)
|
||||||
|
uv__close(queued_fds->fds[i]);
|
||||||
|
free(handle->queued_fds);
|
||||||
|
handle->queued_fds = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
|
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -166,8 +166,6 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
|
|||||||
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client);
|
int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client);
|
||||||
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
||||||
uv_read_cb read_cb);
|
uv_read_cb read_cb);
|
||||||
int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read2_cb read_cb);
|
|
||||||
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
int uv_pipe_write(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
||||||
const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);
|
const uv_buf_t bufs[], unsigned int nbufs, uv_write_cb cb);
|
||||||
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
int uv_pipe_write2(uv_loop_t* loop, uv_write_t* req, uv_pipe_t* handle,
|
||||||
|
|||||||
161
src/win/pipe.c
161
src/win/pipe.c
@ -31,6 +31,17 @@
|
|||||||
#include "stream-inl.h"
|
#include "stream-inl.h"
|
||||||
#include "req-inl.h"
|
#include "req-inl.h"
|
||||||
|
|
||||||
|
typedef struct uv__ipc_queue_item_s uv__ipc_queue_item_t;
|
||||||
|
|
||||||
|
struct uv__ipc_queue_item_s {
|
||||||
|
/*
|
||||||
|
* NOTE: It is important for socket_info to be the first field,
|
||||||
|
* because we will we assigning it to the pending_ipc_info.socket_info
|
||||||
|
*/
|
||||||
|
WSAPROTOCOL_INFOW socket_info;
|
||||||
|
QUEUE member;
|
||||||
|
int tcp_connection;
|
||||||
|
};
|
||||||
|
|
||||||
/* A zero-size buffer for use by uv_pipe_read */
|
/* A zero-size buffer for use by uv_pipe_read */
|
||||||
static char uv_zero_[] = "";
|
static char uv_zero_[] = "";
|
||||||
@ -86,8 +97,8 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
|
|||||||
handle->name = NULL;
|
handle->name = NULL;
|
||||||
handle->ipc_pid = 0;
|
handle->ipc_pid = 0;
|
||||||
handle->remaining_ipc_rawdata_bytes = 0;
|
handle->remaining_ipc_rawdata_bytes = 0;
|
||||||
handle->pending_ipc_info.socket_info = NULL;
|
QUEUE_INIT(&handle->pending_ipc_info.queue);
|
||||||
handle->pending_ipc_info.tcp_connection = 0;
|
handle->pending_ipc_info.queue_len = 0;
|
||||||
handle->ipc = ipc;
|
handle->ipc = ipc;
|
||||||
handle->non_overlapped_writes_tail = NULL;
|
handle->non_overlapped_writes_tail = NULL;
|
||||||
|
|
||||||
@ -287,6 +298,7 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
|||||||
NTSTATUS nt_status;
|
NTSTATUS nt_status;
|
||||||
IO_STATUS_BLOCK io_status;
|
IO_STATUS_BLOCK io_status;
|
||||||
FILE_PIPE_LOCAL_INFORMATION pipe_info;
|
FILE_PIPE_LOCAL_INFORMATION pipe_info;
|
||||||
|
uv__ipc_queue_item_t* item;
|
||||||
|
|
||||||
if ((handle->flags & UV_HANDLE_CONNECTION) &&
|
if ((handle->flags & UV_HANDLE_CONNECTION) &&
|
||||||
handle->shutdown_req != NULL &&
|
handle->shutdown_req != NULL &&
|
||||||
@ -362,10 +374,28 @@ void uv_pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) {
|
|||||||
assert(!(handle->flags & UV_HANDLE_CLOSED));
|
assert(!(handle->flags & UV_HANDLE_CLOSED));
|
||||||
|
|
||||||
if (handle->flags & UV_HANDLE_CONNECTION) {
|
if (handle->flags & UV_HANDLE_CONNECTION) {
|
||||||
if (handle->pending_ipc_info.socket_info) {
|
/* Free pending sockets */
|
||||||
free(handle->pending_ipc_info.socket_info);
|
while (!QUEUE_EMPTY(&handle->pending_ipc_info.queue)) {
|
||||||
handle->pending_ipc_info.socket_info = NULL;
|
QUEUE* q;
|
||||||
|
SOCKET socket;
|
||||||
|
|
||||||
|
q = QUEUE_HEAD(&handle->pending_ipc_info.queue);
|
||||||
|
QUEUE_REMOVE(q);
|
||||||
|
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
||||||
|
|
||||||
|
/* Materialize socket and close it */
|
||||||
|
socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||||
|
FROM_PROTOCOL_INFO,
|
||||||
|
FROM_PROTOCOL_INFO,
|
||||||
|
&item->socket_info,
|
||||||
|
0,
|
||||||
|
WSA_FLAG_OVERLAPPED);
|
||||||
|
free(item);
|
||||||
|
|
||||||
|
if (socket != INVALID_SOCKET)
|
||||||
|
closesocket(socket);
|
||||||
}
|
}
|
||||||
|
handle->pending_ipc_info.queue_len = 0;
|
||||||
|
|
||||||
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {
|
||||||
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
if (handle->read_req.wait_handle != INVALID_HANDLE_VALUE) {
|
||||||
@ -720,15 +750,29 @@ int uv_pipe_accept(uv_pipe_t* server, uv_stream_t* client) {
|
|||||||
uv_loop_t* loop = server->loop;
|
uv_loop_t* loop = server->loop;
|
||||||
uv_pipe_t* pipe_client;
|
uv_pipe_t* pipe_client;
|
||||||
uv_pipe_accept_t* req;
|
uv_pipe_accept_t* req;
|
||||||
|
QUEUE* q;
|
||||||
|
uv__ipc_queue_item_t* item;
|
||||||
|
int err;
|
||||||
|
|
||||||
if (server->ipc) {
|
if (server->ipc) {
|
||||||
if (!server->pending_ipc_info.socket_info) {
|
if (QUEUE_EMPTY(&server->pending_ipc_info.queue)) {
|
||||||
/* No valid pending sockets. */
|
/* No valid pending sockets. */
|
||||||
return WSAEWOULDBLOCK;
|
return WSAEWOULDBLOCK;
|
||||||
}
|
}
|
||||||
|
|
||||||
return uv_tcp_import((uv_tcp_t*)client, server->pending_ipc_info.socket_info,
|
q = QUEUE_HEAD(&server->pending_ipc_info.queue);
|
||||||
server->pending_ipc_info.tcp_connection);
|
QUEUE_REMOVE(q);
|
||||||
|
server->pending_ipc_info.queue_len--;
|
||||||
|
item = QUEUE_DATA(q, uv__ipc_queue_item_t, member);
|
||||||
|
|
||||||
|
err = uv_tcp_import((uv_tcp_t*)client,
|
||||||
|
&item->socket_info,
|
||||||
|
item->tcp_connection);
|
||||||
|
if (err != 0)
|
||||||
|
return err;
|
||||||
|
|
||||||
|
free(item);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
pipe_client = (uv_pipe_t*)client;
|
pipe_client = (uv_pipe_t*)client;
|
||||||
|
|
||||||
@ -956,14 +1000,14 @@ error:
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
int uv_pipe_read_start(uv_pipe_t* handle,
|
||||||
uv_read_cb read_cb, uv_read2_cb read2_cb) {
|
uv_alloc_cb alloc_cb,
|
||||||
|
uv_read_cb read_cb) {
|
||||||
uv_loop_t* loop = handle->loop;
|
uv_loop_t* loop = handle->loop;
|
||||||
|
|
||||||
handle->flags |= UV_HANDLE_READING;
|
handle->flags |= UV_HANDLE_READING;
|
||||||
INCREASE_ACTIVE_COUNT(loop, handle);
|
INCREASE_ACTIVE_COUNT(loop, handle);
|
||||||
handle->read_cb = read_cb;
|
handle->read_cb = read_cb;
|
||||||
handle->read2_cb = read2_cb;
|
|
||||||
handle->alloc_cb = alloc_cb;
|
handle->alloc_cb = alloc_cb;
|
||||||
|
|
||||||
/* If reading was stopped and then started again, there could still be a */
|
/* If reading was stopped and then started again, there could still be a */
|
||||||
@ -975,18 +1019,6 @@ static int uv_pipe_read_start_impl(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read_cb read_cb) {
|
|
||||||
return uv_pipe_read_start_impl(handle, alloc_cb, read_cb, NULL);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int uv_pipe_read2_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read2_cb read_cb) {
|
|
||||||
return uv_pipe_read_start_impl(handle, alloc_cb, NULL, read_cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
|
static void uv_insert_non_overlapped_write_req(uv_pipe_t* handle,
|
||||||
uv_write_t* req) {
|
uv_write_t* req) {
|
||||||
req->next_req = NULL;
|
req->next_req = NULL;
|
||||||
@ -1314,11 +1346,7 @@ static void uv_pipe_read_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|||||||
handle->flags &= ~UV_HANDLE_READABLE;
|
handle->flags &= ~UV_HANDLE_READABLE;
|
||||||
uv_read_stop((uv_stream_t*) handle);
|
uv_read_stop((uv_stream_t*) handle);
|
||||||
|
|
||||||
if (handle->read2_cb) {
|
handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
|
||||||
handle->read2_cb(handle, UV_EOF, &uv_null_buf_, UV_UNKNOWN_HANDLE);
|
|
||||||
} else {
|
|
||||||
handle->read_cb((uv_stream_t*) handle, UV_EOF, &uv_null_buf_);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1330,14 +1358,7 @@ static void uv_pipe_read_error(uv_loop_t* loop, uv_pipe_t* handle, int error,
|
|||||||
|
|
||||||
uv_read_stop((uv_stream_t*) handle);
|
uv_read_stop((uv_stream_t*) handle);
|
||||||
|
|
||||||
if (handle->read2_cb) {
|
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
|
||||||
handle->read2_cb(handle,
|
|
||||||
uv_translate_sys_error(error),
|
|
||||||
&buf,
|
|
||||||
UV_UNKNOWN_HANDLE);
|
|
||||||
} else {
|
|
||||||
handle->read_cb((uv_stream_t*)handle, uv_translate_sys_error(error), &buf);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -1351,6 +1372,23 @@ static void uv_pipe_read_error_or_eof(uv_loop_t* loop, uv_pipe_t* handle,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void uv__pipe_insert_pending_socket(uv_pipe_t* handle,
|
||||||
|
WSAPROTOCOL_INFOW* info,
|
||||||
|
int tcp_connection) {
|
||||||
|
WSAPROTOCOL_INFOW* socket_info;
|
||||||
|
uv__ipc_queue_item_t* item;
|
||||||
|
|
||||||
|
item = (uv__ipc_queue_item_t*) malloc(sizeof(*item));
|
||||||
|
if (item == NULL)
|
||||||
|
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
|
||||||
|
|
||||||
|
memcpy(&item->socket_info, info, sizeof(item->socket_info));
|
||||||
|
item->tcp_connection = tcp_connection;
|
||||||
|
QUEUE_INSERT_TAIL(&handle->pending_ipc_info.queue, &item->member);
|
||||||
|
handle->pending_ipc_info.queue_len++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
||||||
uv_req_t* req) {
|
uv_req_t* req) {
|
||||||
DWORD bytes, avail;
|
DWORD bytes, avail;
|
||||||
@ -1426,16 +1464,10 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|||||||
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
|
assert(bytes == sizeof(ipc_frame) - sizeof(ipc_frame.header));
|
||||||
|
|
||||||
/* Store the pending socket info. */
|
/* Store the pending socket info. */
|
||||||
assert(!handle->pending_ipc_info.socket_info);
|
uv__pipe_insert_pending_socket(
|
||||||
handle->pending_ipc_info.socket_info =
|
handle,
|
||||||
(WSAPROTOCOL_INFOW*)malloc(sizeof(*(handle->pending_ipc_info.socket_info)));
|
&ipc_frame.socket_info,
|
||||||
if (!handle->pending_ipc_info.socket_info) {
|
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION);
|
||||||
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
|
|
||||||
}
|
|
||||||
|
|
||||||
*(handle->pending_ipc_info.socket_info) = ipc_frame.socket_info;
|
|
||||||
handle->pending_ipc_info.tcp_connection =
|
|
||||||
ipc_frame.header.flags & UV_IPC_TCP_CONNECTION;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
|
if (ipc_frame.header.flags & UV_IPC_RAW_DATA) {
|
||||||
@ -1450,11 +1482,7 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|||||||
|
|
||||||
handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
|
handle->alloc_cb((uv_handle_t*) handle, avail, &buf);
|
||||||
if (buf.len == 0) {
|
if (buf.len == 0) {
|
||||||
if (handle->read2_cb) {
|
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
||||||
handle->read2_cb(handle, UV_ENOBUFS, &buf, UV_UNKNOWN_HANDLE);
|
|
||||||
} else if (handle->read_cb) {
|
|
||||||
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
assert(buf.base != NULL);
|
assert(buf.base != NULL);
|
||||||
@ -1469,20 +1497,8 @@ void uv_process_pipe_read_req(uv_loop_t* loop, uv_pipe_t* handle,
|
|||||||
assert(handle->remaining_ipc_rawdata_bytes >= bytes);
|
assert(handle->remaining_ipc_rawdata_bytes >= bytes);
|
||||||
handle->remaining_ipc_rawdata_bytes =
|
handle->remaining_ipc_rawdata_bytes =
|
||||||
handle->remaining_ipc_rawdata_bytes - bytes;
|
handle->remaining_ipc_rawdata_bytes - bytes;
|
||||||
if (handle->read2_cb) {
|
|
||||||
handle->read2_cb(handle, bytes, &buf,
|
|
||||||
handle->pending_ipc_info.socket_info ? UV_TCP : UV_UNKNOWN_HANDLE);
|
|
||||||
} else if (handle->read_cb) {
|
|
||||||
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (handle->pending_ipc_info.socket_info) {
|
|
||||||
free(handle->pending_ipc_info.socket_info);
|
|
||||||
handle->pending_ipc_info.socket_info = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
|
|
||||||
}
|
}
|
||||||
|
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
|
||||||
|
|
||||||
/* Read again only if bytes == buf.len */
|
/* Read again only if bytes == buf.len */
|
||||||
if (bytes <= buf.len) {
|
if (bytes <= buf.len) {
|
||||||
@ -1859,3 +1875,20 @@ error:
|
|||||||
free(name_info);
|
free(name_info);
|
||||||
return err;
|
return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int uv_pipe_pending_count(uv_pipe_t* handle) {
|
||||||
|
if (!handle->ipc)
|
||||||
|
return 0;
|
||||||
|
return handle->pending_ipc_info.queue_len;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uv_handle_type uv_pipe_pending_type(uv_pipe_t* handle) {
|
||||||
|
if (!handle->ipc)
|
||||||
|
return UV_UNKNOWN_HANDLE;
|
||||||
|
if (handle->pending_ipc_info.queue_len == 0)
|
||||||
|
return UV_UNKNOWN_HANDLE;
|
||||||
|
else
|
||||||
|
return UV_TCP;
|
||||||
|
}
|
||||||
|
|||||||
@ -96,31 +96,6 @@ int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int uv_read2_start(uv_stream_t* handle, uv_alloc_cb alloc_cb,
|
|
||||||
uv_read2_cb read_cb) {
|
|
||||||
int err;
|
|
||||||
|
|
||||||
if (handle->flags & UV_HANDLE_READING) {
|
|
||||||
return UV_EALREADY;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(handle->flags & UV_HANDLE_READABLE)) {
|
|
||||||
return UV_ENOTCONN;
|
|
||||||
}
|
|
||||||
|
|
||||||
err = ERROR_INVALID_PARAMETER;
|
|
||||||
switch (handle->type) {
|
|
||||||
case UV_NAMED_PIPE:
|
|
||||||
err = uv_pipe_read2_start((uv_pipe_t*)handle, alloc_cb, read_cb);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
assert(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return uv_translate_sys_error(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int uv_read_stop(uv_stream_t* handle) {
|
int uv_read_stop(uv_stream_t* handle) {
|
||||||
int err;
|
int err;
|
||||||
|
|
||||||
|
|||||||
@ -1099,9 +1099,9 @@ int uv_tcp_import(uv_tcp_t* tcp, WSAPROTOCOL_INFOW* socket_protocol_info,
|
|||||||
int tcp_connection) {
|
int tcp_connection) {
|
||||||
int err;
|
int err;
|
||||||
|
|
||||||
SOCKET socket = WSASocketW(AF_INET,
|
SOCKET socket = WSASocketW(FROM_PROTOCOL_INFO,
|
||||||
SOCK_STREAM,
|
FROM_PROTOCOL_INFO,
|
||||||
IPPROTO_IP,
|
FROM_PROTOCOL_INFO,
|
||||||
socket_protocol_info,
|
socket_protocol_info,
|
||||||
0,
|
0,
|
||||||
WSA_FLAG_OVERLAPPED);
|
WSA_FLAG_OVERLAPPED);
|
||||||
|
|||||||
@ -83,10 +83,9 @@ static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
|
|||||||
static void ipc_write_cb(uv_write_t* req, int status);
|
static void ipc_write_cb(uv_write_t* req, int status);
|
||||||
static void ipc_close_cb(uv_handle_t* handle);
|
static void ipc_close_cb(uv_handle_t* handle);
|
||||||
static void ipc_connect_cb(uv_connect_t* req, int status);
|
static void ipc_connect_cb(uv_connect_t* req, int status);
|
||||||
static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
|
static void ipc_read_cb(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf);
|
||||||
uv_handle_type type);
|
|
||||||
static void ipc_alloc_cb(uv_handle_t* handle,
|
static void ipc_alloc_cb(uv_handle_t* handle,
|
||||||
size_t suggested_size,
|
size_t suggested_size,
|
||||||
uv_buf_t* buf);
|
uv_buf_t* buf);
|
||||||
@ -155,9 +154,9 @@ static void ipc_connect_cb(uv_connect_t* req, int status) {
|
|||||||
struct ipc_client_ctx* ctx;
|
struct ipc_client_ctx* ctx;
|
||||||
ctx = container_of(req, struct ipc_client_ctx, connect_req);
|
ctx = container_of(req, struct ipc_client_ctx, connect_req);
|
||||||
ASSERT(0 == status);
|
ASSERT(0 == status);
|
||||||
ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe,
|
ASSERT(0 == uv_read_start((uv_stream_t*) &ctx->ipc_pipe,
|
||||||
ipc_alloc_cb,
|
ipc_alloc_cb,
|
||||||
ipc_read2_cb));
|
ipc_read_cb));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -171,16 +170,20 @@ static void ipc_alloc_cb(uv_handle_t* handle,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
|
static void ipc_read_cb(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf) {
|
||||||
uv_handle_type type) {
|
|
||||||
struct ipc_client_ctx* ctx;
|
struct ipc_client_ctx* ctx;
|
||||||
uv_loop_t* loop;
|
uv_loop_t* loop;
|
||||||
|
uv_handle_type type;
|
||||||
|
uv_pipe_t* ipc_pipe;
|
||||||
|
|
||||||
|
ipc_pipe = (uv_pipe_t*) handle;
|
||||||
ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
|
ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
|
||||||
loop = ipc_pipe->loop;
|
loop = ipc_pipe->loop;
|
||||||
|
|
||||||
|
ASSERT(1 == uv_pipe_pending_count(ipc_pipe));
|
||||||
|
type = uv_pipe_pending_type(ipc_pipe);
|
||||||
if (type == UV_TCP)
|
if (type == UV_TCP)
|
||||||
ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
|
ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
|
||||||
else if (type == UV_NAMED_PIPE)
|
else if (type == UV_NAMED_PIPE)
|
||||||
@ -188,7 +191,7 @@ static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
|
|||||||
else
|
else
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
||||||
ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle));
|
ASSERT(0 == uv_accept(handle, ctx->server_handle));
|
||||||
uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
|
uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -60,15 +60,20 @@ static void alloc_cb(uv_handle_t* handle,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void recv_cb(uv_pipe_t* handle,
|
static void recv_cb(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf) {
|
||||||
uv_handle_type pending) {
|
uv_handle_type pending;
|
||||||
|
uv_pipe_t* pipe;
|
||||||
int r;
|
int r;
|
||||||
|
|
||||||
ASSERT(pending == ctx.expected_type);
|
pipe = (uv_pipe_t*) handle;
|
||||||
ASSERT(handle == &ctx.channel);
|
ASSERT(pipe == &ctx.channel);
|
||||||
ASSERT(nread >= 0);
|
ASSERT(nread >= 0);
|
||||||
|
ASSERT(1 == uv_pipe_pending_count(pipe));
|
||||||
|
|
||||||
|
pending = uv_pipe_pending_type(pipe);
|
||||||
|
ASSERT(pending == ctx.expected_type);
|
||||||
|
|
||||||
if (pending == UV_NAMED_PIPE)
|
if (pending == UV_NAMED_PIPE)
|
||||||
r = uv_pipe_init(ctx.channel.loop, &ctx.recv.pipe, 0);
|
r = uv_pipe_init(ctx.channel.loop, &ctx.recv.pipe, 0);
|
||||||
@ -78,7 +83,7 @@ static void recv_cb(uv_pipe_t* handle,
|
|||||||
abort();
|
abort();
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_accept((uv_stream_t*)&ctx.channel, &ctx.recv.stream);
|
r = uv_accept(handle, &ctx.recv.stream);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
uv_close((uv_handle_t*)&ctx.channel, NULL);
|
uv_close((uv_handle_t*)&ctx.channel, NULL);
|
||||||
@ -103,7 +108,7 @@ static int run_test(void) {
|
|||||||
NULL);
|
NULL);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_read2_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
|
r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, recv_cb);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||||
@ -165,16 +170,21 @@ static void write2_cb(uv_write_t* req, int status) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void read2_cb(uv_pipe_t* handle,
|
static void read_cb(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* rdbuf,
|
const uv_buf_t* rdbuf) {
|
||||||
uv_handle_type pending) {
|
|
||||||
uv_buf_t wrbuf;
|
uv_buf_t wrbuf;
|
||||||
|
uv_pipe_t* pipe;
|
||||||
|
uv_handle_type pending;
|
||||||
int r;
|
int r;
|
||||||
|
|
||||||
ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
|
pipe = (uv_pipe_t*) handle;
|
||||||
ASSERT(handle == &ctx.channel);
|
ASSERT(pipe == &ctx.channel);
|
||||||
ASSERT(nread >= 0);
|
ASSERT(nread >= 0);
|
||||||
|
ASSERT(1 == uv_pipe_pending_count(pipe));
|
||||||
|
|
||||||
|
pending = uv_pipe_pending_type(pipe);
|
||||||
|
ASSERT(pending == UV_NAMED_PIPE || pending == UV_TCP);
|
||||||
|
|
||||||
wrbuf = uv_buf_init(".", 1);
|
wrbuf = uv_buf_init(".", 1);
|
||||||
|
|
||||||
@ -186,7 +196,7 @@ static void read2_cb(uv_pipe_t* handle,
|
|||||||
abort();
|
abort();
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_accept((uv_stream_t*)handle, &ctx.recv.stream);
|
r = uv_accept(handle, &ctx.recv.stream);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_write2(&ctx.write_req,
|
r = uv_write2(&ctx.write_req,
|
||||||
@ -215,7 +225,7 @@ int ipc_send_recv_helper(void) {
|
|||||||
ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx.channel));
|
ASSERT(1 == uv_is_writable((uv_stream_t*)&ctx.channel));
|
||||||
ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx.channel));
|
ASSERT(0 == uv_is_closing((uv_handle_t*)&ctx.channel));
|
||||||
|
|
||||||
r = uv_read2_start((uv_stream_t*)&ctx.channel, alloc_cb, read2_cb);
|
r = uv_read_start((uv_stream_t*)&ctx.channel, alloc_cb, read_cb);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||||
|
|||||||
@ -30,7 +30,7 @@ static uv_tcp_t tcp_server;
|
|||||||
static uv_tcp_t tcp_connection;
|
static uv_tcp_t tcp_connection;
|
||||||
|
|
||||||
static int exit_cb_called;
|
static int exit_cb_called;
|
||||||
static int read2_cb_called;
|
static int read_cb_called;
|
||||||
static int tcp_write_cb_called;
|
static int tcp_write_cb_called;
|
||||||
static int tcp_read_cb_called;
|
static int tcp_read_cb_called;
|
||||||
static int on_pipe_read_called;
|
static int on_pipe_read_called;
|
||||||
@ -138,13 +138,16 @@ static void make_many_connections(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void on_read(uv_pipe_t* pipe,
|
static void on_read(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf) {
|
||||||
uv_handle_type pending) {
|
|
||||||
int r;
|
int r;
|
||||||
|
uv_pipe_t* pipe;
|
||||||
|
uv_handle_type pending;
|
||||||
uv_buf_t outbuf;
|
uv_buf_t outbuf;
|
||||||
|
|
||||||
|
pipe = (uv_pipe_t*) handle;
|
||||||
|
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
/* Everything OK, but nothing read. */
|
/* Everything OK, but nothing read. */
|
||||||
free(buf->base);
|
free(buf->base);
|
||||||
@ -163,9 +166,11 @@ static void on_read(uv_pipe_t* pipe,
|
|||||||
|
|
||||||
fprintf(stderr, "got %d bytes\n", (int)nread);
|
fprintf(stderr, "got %d bytes\n", (int)nread);
|
||||||
|
|
||||||
|
pending = uv_pipe_pending_type(pipe);
|
||||||
if (!tcp_server_listening) {
|
if (!tcp_server_listening) {
|
||||||
|
ASSERT(1 == uv_pipe_pending_count(pipe));
|
||||||
ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
|
ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
|
||||||
read2_cb_called++;
|
read_cb_called++;
|
||||||
|
|
||||||
/* Accept the pending TCP server, and start listening on it. */
|
/* Accept the pending TCP server, and start listening on it. */
|
||||||
ASSERT(pending == UV_TCP);
|
ASSERT(pending == UV_TCP);
|
||||||
@ -191,6 +196,7 @@ static void on_read(uv_pipe_t* pipe,
|
|||||||
make_many_connections();
|
make_many_connections();
|
||||||
} else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
|
} else if (memcmp("accepted_connection\n", buf->base, nread) == 0) {
|
||||||
/* Remote server has accepted a connection. Close the channel. */
|
/* Remote server has accepted a connection. Close the channel. */
|
||||||
|
ASSERT(0 == uv_pipe_pending_count(pipe));
|
||||||
ASSERT(pending == UV_UNKNOWN_HANDLE);
|
ASSERT(pending == UV_UNKNOWN_HANDLE);
|
||||||
remote_conn_accepted = 1;
|
remote_conn_accepted = 1;
|
||||||
uv_close((uv_handle_t*)&channel, NULL);
|
uv_close((uv_handle_t*)&channel, NULL);
|
||||||
@ -267,13 +273,15 @@ static void on_tcp_read(uv_stream_t* tcp, ssize_t nread, const uv_buf_t* buf) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void on_read_connection(uv_pipe_t* pipe,
|
static void on_read_connection(uv_stream_t* handle,
|
||||||
ssize_t nread,
|
ssize_t nread,
|
||||||
const uv_buf_t* buf,
|
const uv_buf_t* buf) {
|
||||||
uv_handle_type pending) {
|
|
||||||
int r;
|
int r;
|
||||||
uv_buf_t outbuf;
|
uv_buf_t outbuf;
|
||||||
|
uv_pipe_t* pipe;
|
||||||
|
uv_handle_type pending;
|
||||||
|
|
||||||
|
pipe = (uv_pipe_t*) handle;
|
||||||
if (nread == 0) {
|
if (nread == 0) {
|
||||||
/* Everything OK, but nothing read. */
|
/* Everything OK, but nothing read. */
|
||||||
free(buf->base);
|
free(buf->base);
|
||||||
@ -292,15 +300,18 @@ static void on_read_connection(uv_pipe_t* pipe,
|
|||||||
|
|
||||||
fprintf(stderr, "got %d bytes\n", (int)nread);
|
fprintf(stderr, "got %d bytes\n", (int)nread);
|
||||||
|
|
||||||
|
ASSERT(1 == uv_pipe_pending_count(pipe));
|
||||||
|
pending = uv_pipe_pending_type(pipe);
|
||||||
|
|
||||||
ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
|
ASSERT(nread > 0 && buf->base && pending != UV_UNKNOWN_HANDLE);
|
||||||
read2_cb_called++;
|
read_cb_called++;
|
||||||
|
|
||||||
/* Accept the pending TCP connection */
|
/* Accept the pending TCP connection */
|
||||||
ASSERT(pending == UV_TCP);
|
ASSERT(pending == UV_TCP);
|
||||||
r = uv_tcp_init(uv_default_loop(), &tcp_connection);
|
r = uv_tcp_init(uv_default_loop(), &tcp_connection);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
r = uv_accept((uv_stream_t*)pipe, (uv_stream_t*)&tcp_connection);
|
r = uv_accept(handle, (uv_stream_t*)&tcp_connection);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
|
|
||||||
/* Make sure that the expected data is correctly multiplexed. */
|
/* Make sure that the expected data is correctly multiplexed. */
|
||||||
@ -319,12 +330,12 @@ static void on_read_connection(uv_pipe_t* pipe,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int run_ipc_test(const char* helper, uv_read2_cb read_cb) {
|
static int run_ipc_test(const char* helper, uv_read_cb read_cb) {
|
||||||
uv_process_t process;
|
uv_process_t process;
|
||||||
int r;
|
int r;
|
||||||
|
|
||||||
spawn_helper(&channel, &process, helper);
|
spawn_helper(&channel, &process, helper);
|
||||||
uv_read2_start((uv_stream_t*)&channel, on_alloc, read_cb);
|
uv_read_start((uv_stream_t*)&channel, on_alloc, read_cb);
|
||||||
|
|
||||||
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
r = uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||||
ASSERT(r == 0);
|
ASSERT(r == 0);
|
||||||
@ -338,7 +349,7 @@ TEST_IMPL(ipc_listen_before_write) {
|
|||||||
int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
|
int r = run_ipc_test("ipc_helper_listen_before_write", on_read);
|
||||||
ASSERT(local_conn_accepted == 1);
|
ASSERT(local_conn_accepted == 1);
|
||||||
ASSERT(remote_conn_accepted == 1);
|
ASSERT(remote_conn_accepted == 1);
|
||||||
ASSERT(read2_cb_called == 1);
|
ASSERT(read_cb_called == 1);
|
||||||
ASSERT(exit_cb_called == 1);
|
ASSERT(exit_cb_called == 1);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
@ -348,7 +359,7 @@ TEST_IMPL(ipc_listen_after_write) {
|
|||||||
int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
|
int r = run_ipc_test("ipc_helper_listen_after_write", on_read);
|
||||||
ASSERT(local_conn_accepted == 1);
|
ASSERT(local_conn_accepted == 1);
|
||||||
ASSERT(remote_conn_accepted == 1);
|
ASSERT(remote_conn_accepted == 1);
|
||||||
ASSERT(read2_cb_called == 1);
|
ASSERT(read_cb_called == 1);
|
||||||
ASSERT(exit_cb_called == 1);
|
ASSERT(exit_cb_called == 1);
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
@ -356,7 +367,7 @@ TEST_IMPL(ipc_listen_after_write) {
|
|||||||
|
|
||||||
TEST_IMPL(ipc_tcp_connection) {
|
TEST_IMPL(ipc_tcp_connection) {
|
||||||
int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
|
int r = run_ipc_test("ipc_helper_tcp_connection", on_read_connection);
|
||||||
ASSERT(read2_cb_called == 1);
|
ASSERT(read_cb_called == 1);
|
||||||
ASSERT(tcp_write_cb_called == 1);
|
ASSERT(tcp_write_cb_called == 1);
|
||||||
ASSERT(tcp_read_cb_called == 1);
|
ASSERT(tcp_read_cb_called == 1);
|
||||||
ASSERT(exit_cb_called == 1);
|
ASSERT(exit_cb_called == 1);
|
||||||
|
|||||||
@ -96,6 +96,7 @@ TEST_DECLARE (pipe_connect_bad_name)
|
|||||||
TEST_DECLARE (pipe_connect_to_file)
|
TEST_DECLARE (pipe_connect_to_file)
|
||||||
TEST_DECLARE (pipe_getsockname)
|
TEST_DECLARE (pipe_getsockname)
|
||||||
TEST_DECLARE (pipe_getsockname_abstract)
|
TEST_DECLARE (pipe_getsockname_abstract)
|
||||||
|
TEST_DECLARE (pipe_sendmsg)
|
||||||
TEST_DECLARE (pipe_server_close)
|
TEST_DECLARE (pipe_server_close)
|
||||||
TEST_DECLARE (connection_fail)
|
TEST_DECLARE (connection_fail)
|
||||||
TEST_DECLARE (connection_fail_doesnt_auto_close)
|
TEST_DECLARE (connection_fail_doesnt_auto_close)
|
||||||
@ -362,6 +363,7 @@ TASK_LIST_START
|
|||||||
TEST_ENTRY (pipe_listen_without_bind)
|
TEST_ENTRY (pipe_listen_without_bind)
|
||||||
TEST_ENTRY (pipe_getsockname)
|
TEST_ENTRY (pipe_getsockname)
|
||||||
TEST_ENTRY (pipe_getsockname_abstract)
|
TEST_ENTRY (pipe_getsockname_abstract)
|
||||||
|
TEST_ENTRY (pipe_sendmsg)
|
||||||
|
|
||||||
TEST_ENTRY (connection_fail)
|
TEST_ENTRY (connection_fail)
|
||||||
TEST_ENTRY (connection_fail_doesnt_auto_close)
|
TEST_ENTRY (connection_fail_doesnt_auto_close)
|
||||||
|
|||||||
169
test/test-pipe-sendmsg.c
Normal file
169
test/test-pipe-sendmsg.c
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
||||||
|
*
|
||||||
|
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
* of this software and associated documentation files (the "Software"), to
|
||||||
|
* deal in the Software without restriction, including without limitation the
|
||||||
|
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||||
|
* sell copies of the Software, and to permit persons to whom the Software is
|
||||||
|
* furnished to do so, subject to the following conditions:
|
||||||
|
*
|
||||||
|
* The above copyright notice and this permission notice shall be included in
|
||||||
|
* all copies or substantial portions of the Software.
|
||||||
|
*
|
||||||
|
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||||
|
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||||
|
* IN THE SOFTWARE.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "uv.h"
|
||||||
|
#include "task.h"
|
||||||
|
|
||||||
|
|
||||||
|
#ifndef _WIN32
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <errno.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
|
||||||
|
/* NOTE: size should be divisible by 2 */
|
||||||
|
static uv_pipe_t incoming[4];
|
||||||
|
static unsigned int incoming_count;
|
||||||
|
static unsigned int close_called;
|
||||||
|
|
||||||
|
|
||||||
|
static void set_nonblocking(uv_os_sock_t sock) {
|
||||||
|
int r;
|
||||||
|
#ifdef _WIN32
|
||||||
|
unsigned long on = 1;
|
||||||
|
r = ioctlsocket(sock, FIONBIO, &on);
|
||||||
|
ASSERT(r == 0);
|
||||||
|
#else
|
||||||
|
int flags = fcntl(sock, F_GETFL, 0);
|
||||||
|
ASSERT(flags >= 0);
|
||||||
|
r = fcntl(sock, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
ASSERT(r >= 0);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
static void close_cb(uv_handle_t* handle) {
|
||||||
|
close_called++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void alloc_cb(uv_handle_t* handle, size_t size, uv_buf_t* buf) {
|
||||||
|
static char base[1];
|
||||||
|
|
||||||
|
buf->base = base;
|
||||||
|
buf->len = sizeof(base);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void read_cb(uv_stream_t* handle,
|
||||||
|
ssize_t nread,
|
||||||
|
const uv_buf_t* buf) {
|
||||||
|
uv_pipe_t* p;
|
||||||
|
uv_pipe_t* inc;
|
||||||
|
uv_handle_type pending;
|
||||||
|
unsigned int i;
|
||||||
|
|
||||||
|
p = (uv_pipe_t*) handle;
|
||||||
|
ASSERT(nread >= 0);
|
||||||
|
|
||||||
|
while (uv_pipe_pending_count(p) != 0) {
|
||||||
|
pending = uv_pipe_pending_type(p);
|
||||||
|
ASSERT(pending == UV_NAMED_PIPE);
|
||||||
|
|
||||||
|
ASSERT(incoming_count < ARRAY_SIZE(incoming));
|
||||||
|
inc = &incoming[incoming_count++];
|
||||||
|
ASSERT(0 == uv_pipe_init(p->loop, inc, 0));
|
||||||
|
ASSERT(0 == uv_accept(handle, (uv_stream_t*) inc));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (incoming_count != ARRAY_SIZE(incoming))
|
||||||
|
return;
|
||||||
|
|
||||||
|
ASSERT(0 == uv_read_stop((uv_stream_t*) p));
|
||||||
|
uv_close((uv_handle_t*) p, close_cb);
|
||||||
|
for (i = 0; i < ARRAY_SIZE(incoming); i++)
|
||||||
|
uv_close((uv_handle_t*) &incoming[i], close_cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST_IMPL(pipe_sendmsg) {
|
||||||
|
uv_pipe_t p;
|
||||||
|
int r;
|
||||||
|
int fds[2];
|
||||||
|
int send_fds[ARRAY_SIZE(incoming)];
|
||||||
|
struct msghdr msg;
|
||||||
|
char scratch[64];
|
||||||
|
struct cmsghdr *cmsg;
|
||||||
|
unsigned int i;
|
||||||
|
uv_buf_t buf;
|
||||||
|
|
||||||
|
ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, fds));
|
||||||
|
for (i = 0; i < ARRAY_SIZE(send_fds); i += 2)
|
||||||
|
ASSERT(0 == socketpair(AF_UNIX, SOCK_STREAM, 0, send_fds + i));
|
||||||
|
ASSERT(i == ARRAY_SIZE(send_fds));
|
||||||
|
ASSERT(0 == uv_pipe_init(uv_default_loop(), &p, 1));
|
||||||
|
ASSERT(0 == uv_pipe_open(&p, fds[1]));
|
||||||
|
|
||||||
|
buf = uv_buf_init("X", 1);
|
||||||
|
memset(&msg, 0, sizeof(msg));
|
||||||
|
msg.msg_iov = (struct iovec*) &buf;
|
||||||
|
msg.msg_iovlen = 1;
|
||||||
|
msg.msg_flags = 0;
|
||||||
|
|
||||||
|
msg.msg_control = (void*) scratch;
|
||||||
|
msg.msg_controllen = CMSG_LEN(sizeof(send_fds));
|
||||||
|
ASSERT(sizeof(scratch) >= msg.msg_controllen);
|
||||||
|
|
||||||
|
cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
|
cmsg->cmsg_level = SOL_SOCKET;
|
||||||
|
cmsg->cmsg_type = SCM_RIGHTS;
|
||||||
|
cmsg->cmsg_len = msg.msg_controllen;
|
||||||
|
|
||||||
|
/* silence aliasing warning */
|
||||||
|
{
|
||||||
|
void* pv = CMSG_DATA(cmsg);
|
||||||
|
int* pi = pv;
|
||||||
|
for (i = 0; i < ARRAY_SIZE(send_fds); i++)
|
||||||
|
pi[i] = send_fds[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
set_nonblocking(fds[1]);
|
||||||
|
ASSERT(0 == uv_read_start((uv_stream_t*) &p, alloc_cb, read_cb));
|
||||||
|
|
||||||
|
do
|
||||||
|
r = sendmsg(fds[0], &msg, 0);
|
||||||
|
while (r == -1 && errno == EINTR);
|
||||||
|
ASSERT(r == 1);
|
||||||
|
|
||||||
|
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
|
||||||
|
ASSERT(ARRAY_SIZE(incoming) == incoming_count);
|
||||||
|
ASSERT(ARRAY_SIZE(incoming) + 1 == close_called);
|
||||||
|
close(fds[0]);
|
||||||
|
|
||||||
|
MAKE_VALGRIND_HAPPY();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#else /* !_WIN32 */
|
||||||
|
|
||||||
|
TEST_IMPL(pipe_sendmsg) {
|
||||||
|
MAKE_VALGRIND_HAPPY();
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif /* _WIN32 */
|
||||||
1
uv.gyp
1
uv.gyp
@ -338,6 +338,7 @@
|
|||||||
'test/test-pipe-bind-error.c',
|
'test/test-pipe-bind-error.c',
|
||||||
'test/test-pipe-connect-error.c',
|
'test/test-pipe-connect-error.c',
|
||||||
'test/test-pipe-getsockname.c',
|
'test/test-pipe-getsockname.c',
|
||||||
|
'test/test-pipe-sendmsg.c',
|
||||||
'test/test-pipe-server-close.c',
|
'test/test-pipe-server-close.c',
|
||||||
'test/test-platform-output.c',
|
'test/test-platform-output.c',
|
||||||
'test/test-poll.c',
|
'test/test-poll.c',
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user