diff --git a/include/uv-unix.h b/include/uv-unix.h index 45006092..22a4d855 100644 --- a/include/uv-unix.h +++ b/include/uv-unix.h @@ -229,6 +229,7 @@ typedef struct { uv_connection_cb connection_cb; \ int delayed_error; \ int accepted_fd; \ + int* queued_fds; \ UV_STREAM_PRIVATE_PLATFORM_FIELDS \ #define UV_TCP_PRIVATE_FIELDS /* empty */ diff --git a/include/uv.h b/include/uv.h index 505b024b..d5c4de0f 100644 --- a/include/uv.h +++ b/include/uv.h @@ -376,6 +376,9 @@ typedef void (*uv_read_cb)(uv_stream_t* stream, * Just like the uv_read_cb except that if the pending parameter is true * then you can use uv_accept() to pull the new handle into the process. * If no handle is pending then pending will be UV_UNKNOWN_HANDLE. + * + * NOTE: The buffer may be a null buffer if multiple fds were accepted and + * read2_cb is called for pending ones. */ typedef void (*uv_read2_cb)(uv_pipe_t* pipe, ssize_t nread, diff --git a/src/unix/stream.c b/src/unix/stream.c index ee3ea75f..ae931ee8 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -119,6 +119,7 @@ void uv__stream_init(uv_loop_t* loop, stream->connect_req = NULL; stream->shutdown_req = NULL; stream->accepted_fd = -1; + stream->queued_fds = NULL; stream->delayed_error = 0; QUEUE_INIT(&stream->write_queue); QUEUE_INIT(&stream->write_completed_queue); @@ -559,6 +560,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (server->accepted_fd == -1) return -EAGAIN; + err = 0; switch (client->type) { case UV_NAMED_PIPE: case UV_TCP: @@ -568,8 +570,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { if (err) { /* TODO handle error */ uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -577,8 +578,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { err = uv_udp_open((uv_udp_t*) client, server->accepted_fd); if (err) { uv__close(server->accepted_fd); - server->accepted_fd = -1; - return err; + goto done; } break; @@ -586,9 +586,31 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) { assert(0); } - uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); - server->accepted_fd = -1; - return 0; +done: + /* Process queued fds */ + if (server->queued_fds != NULL) { + /* Read first */ + server->accepted_fd = server->queued_fds[2]; + + /* All read, free */ + if (--server->queued_fds[0] == 0) { + free(server->queued_fds); + server->queued_fds = NULL; + } else { + /* Shift rest */ + memmove(server->queued_fds + 2, + server->queued_fds + 3, + server->queued_fds[0]); + } + + /* Invoke read_cb one more time */ + uv__io_feed(server->loop, &server->io_watcher); + } else { + server->accepted_fd = -1; + if (err == 0) + uv__io_start(server->loop, &server->io_watcher, UV__POLLIN); + } + return err; } @@ -951,6 +973,41 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { } +static int uv__stream_queue_fd(uv_stream_t* stream, int fd) { + int queue_offset; + int queue_len; + + if (stream->queued_fds == NULL) { + queue_offset = 0; + queue_len = 8; + stream->queued_fds = malloc((queue_len + 2) * sizeof(*stream->queued_fds)); + if (stream->queued_fds == NULL) + return UV_ENOMEM; + stream->queued_fds[1] = queue_len; + } else { + queue_offset = stream->queued_fds[0]; + queue_len = stream->queued_fds[1]; + + /* Grow */ + if (queue_offset == queue_len) { + queue_len += 8; + stream->queued_fds = realloc(stream->queued_fds, + (queue_len + 2) * + sizeof(*stream->queued_fds)); + if (stream->queued_fds == NULL) + return UV_ENOMEM; + stream->queued_fds[1] = queue_len; + } + } + + /* Put fd in a queue */ + stream->queued_fds[0] = queue_offset; + stream->queued_fds[2 + queue_offset++] = fd; + + return 0; +} + + static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; @@ -958,6 +1015,16 @@ static void uv__read(uv_stream_t* stream) { struct cmsghdr* cmsg; char cmsg_space[64]; int count; + int err; + + /* Has queued fds */ + if (stream->accepted_fd != -1) { + static uv_buf_t buf = { NULL, 0 }; + stream->read2_cb((uv_pipe_t*) stream, + 0, + &buf, + uv__handle_type(stream->accepted_fd)); + } stream->flags &= ~UV_STREAM_READ_PARTIAL; @@ -1046,17 +1113,20 @@ static void uv__read(uv_stream_t* stream) { cmsg = CMSG_NXTHDR(&msg, cmsg)) { if (cmsg->cmsg_type == SCM_RIGHTS) { - if (stream->accepted_fd != -1) { - fprintf(stderr, "(libuv) ignoring extra FD received\n"); - } - /* silence aliasing warning */ - { - void* pv = CMSG_DATA(cmsg); - int* pi = pv; + void* pv = CMSG_DATA(cmsg); + int* pi = pv; + + /* Already has accepted fd, queue now */ + if (stream->accepted_fd != -1) { + err = uv__stream_queue_fd(stream, *pi); + if (err != 0) { + uv__stream_read_cb(stream, err, NULL, UV_UNKNOWN_HANDLE); + return; + } + } else { stream->accepted_fd = *pi; } - } else { fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", cmsg->cmsg_type); @@ -1493,6 +1563,8 @@ void uv__stream_close(uv_stream_t* handle) { handle->accepted_fd = -1; } + free(handle->queued_fds); + assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT)); }