pipe: allow queueing pending handles

If multiple handles arrive to the IPC pipe at the same time (happens on
some platforms), libuv will queue them internally, and call `read2_cb`
multiple times with a null-buffer and proper `handle_type`.
This commit is contained in:
Fedor Indutny 2013-12-22 14:45:00 +04:00
parent f166d6d705
commit 08aeaf6193
3 changed files with 91 additions and 15 deletions

View File

@ -229,6 +229,7 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
int* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \
#define UV_TCP_PRIVATE_FIELDS /* empty */

View File

@ -376,6 +376,9 @@ typedef void (*uv_read_cb)(uv_stream_t* stream,
* Just like the uv_read_cb except that if the pending parameter is true
* then you can use uv_accept() to pull the new handle into the process.
* If no handle is pending then pending will be UV_UNKNOWN_HANDLE.
*
* NOTE: The buffer may be a null buffer if multiple fds were accepted and
* read2_cb is called for pending ones.
*/
typedef void (*uv_read2_cb)(uv_pipe_t* pipe,
ssize_t nread,

View File

@ -119,6 +119,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
@ -559,6 +560,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (server->accepted_fd == -1)
return -EAGAIN;
err = 0;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
@ -568,8 +570,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (err) {
/* TODO handle error */
uv__close(server->accepted_fd);
server->accepted_fd = -1;
return err;
goto done;
}
break;
@ -577,8 +578,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
server->accepted_fd = -1;
return err;
goto done;
}
break;
@ -586,9 +586,31 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
assert(0);
}
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
server->accepted_fd = -1;
return 0;
done:
/* Process queued fds */
if (server->queued_fds != NULL) {
/* Read first */
server->accepted_fd = server->queued_fds[2];
/* All read, free */
if (--server->queued_fds[0] == 0) {
free(server->queued_fds);
server->queued_fds = NULL;
} else {
/* Shift rest */
memmove(server->queued_fds + 2,
server->queued_fds + 3,
server->queued_fds[0]);
}
/* Invoke read_cb one more time */
uv__io_feed(server->loop, &server->io_watcher);
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
}
return err;
}
@ -951,6 +973,41 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
}
static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
int queue_offset;
int queue_len;
if (stream->queued_fds == NULL) {
queue_offset = 0;
queue_len = 8;
stream->queued_fds = malloc((queue_len + 2) * sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
} else {
queue_offset = stream->queued_fds[0];
queue_len = stream->queued_fds[1];
/* Grow */
if (queue_offset == queue_len) {
queue_len += 8;
stream->queued_fds = realloc(stream->queued_fds,
(queue_len + 2) *
sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
}
}
/* Put fd in a queue */
stream->queued_fds[0] = queue_offset;
stream->queued_fds[2 + queue_offset++] = fd;
return 0;
}
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
@ -958,6 +1015,16 @@ static void uv__read(uv_stream_t* stream) {
struct cmsghdr* cmsg;
char cmsg_space[64];
int count;
int err;
/* Has queued fds */
if (stream->accepted_fd != -1) {
static uv_buf_t buf = { NULL, 0 };
stream->read2_cb((uv_pipe_t*) stream,
0,
&buf,
uv__handle_type(stream->accepted_fd));
}
stream->flags &= ~UV_STREAM_READ_PARTIAL;
@ -1046,17 +1113,20 @@ static void uv__read(uv_stream_t* stream) {
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_type == SCM_RIGHTS) {
if (stream->accepted_fd != -1) {
fprintf(stderr, "(libuv) ignoring extra FD received\n");
}
/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
/* Already has accepted fd, queue now */
if (stream->accepted_fd != -1) {
err = uv__stream_queue_fd(stream, *pi);
if (err != 0) {
uv__stream_read_cb(stream, err, NULL, UV_UNKNOWN_HANDLE);
return;
}
} else {
stream->accepted_fd = *pi;
}
} else {
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
cmsg->cmsg_type);
@ -1493,6 +1563,8 @@ void uv__stream_close(uv_stream_t* handle) {
handle->accepted_fd = -1;
}
free(handle->queued_fds);
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}