From 08cafd091ba408cc339a9ea20a60260ac582f091 Mon Sep 17 00:00:00 2001 From: Fedor Indutny Date: Sat, 21 Dec 2013 01:16:17 +0400 Subject: [PATCH] stream: fix uv__stream_osx_select Relying on `readable`/`writable` when polling will lead to the select thread spinning in the loop and calling `uv_async_send()`, because stream may never become not `readable`/`writable`. We should rely on `uv__io_active()` instead and interruprt select thread every time it changes. --- src/unix/stream.c | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/src/unix/stream.c b/src/unix/stream.c index ee3ea75f..f4e9bf75 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -168,9 +168,9 @@ static void uv__stream_osx_select(void* arg) { FD_ZERO(&sread); FD_ZERO(&swrite); - if (uv_is_readable(stream)) + if (uv__io_active(&stream->io_watcher, UV__POLLIN)) FD_SET(fd, &sread); - if (uv_is_writable(stream)) + if (uv__io_active(&stream->io_watcher, UV__POLLOUT)) FD_SET(fd, &swrite); FD_SET(s->int_fd, &sread); @@ -230,11 +230,14 @@ static void uv__stream_osx_select(void* arg) { static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { +#if defined(__APPLE__) /* Notify select() thread about state change */ uv__stream_select_t* s; int r; s = stream->select; + if (s == NULL) + return; /* Interrupt select() loop * NOTE: fake_fd and int_fd are socketpair(), thus writing to one will @@ -245,6 +248,9 @@ static void uv__stream_osx_interrupt_select(uv_stream_t* stream) { while (r == -1 && errno == EINTR); assert(r == 1); +#else /* !defined(__APPLE__) */ + /* No-op on any other platform */ +#endif /* !defined(__APPLE__) */ } @@ -622,6 +628,7 @@ static void uv__drain(uv_stream_t* stream) { assert(QUEUE_EMPTY(&stream->write_queue)); uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); + uv__stream_osx_interrupt_select(stream); /* Shutdown? */ if ((stream->flags & UV_STREAM_SHUTTING) && @@ -802,6 +809,7 @@ start: uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); if (!uv__io_active(&stream->io_watcher, UV__POLLIN)) uv__handle_stop(stream); + uv__stream_osx_interrupt_select(stream); return; } else if (stream->flags & UV_STREAM_BLOCKING) { /* If this is a blocking stream, try again. */ @@ -863,6 +871,9 @@ start: /* We're not done. */ uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT); + + /* Notify select() thread about state change */ + uv__stream_osx_interrupt_select(stream); } @@ -947,6 +958,7 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) { uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) uv__handle_stop(stream); + uv__stream_osx_interrupt_select(stream); uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE); } @@ -1013,6 +1025,7 @@ static void uv__read(uv_stream_t* stream) { /* Wait for the next one. */ if (stream->flags & UV_STREAM_READING) { uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); + uv__stream_osx_interrupt_select(stream); } uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE); } else { @@ -1105,6 +1118,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { stream->flags |= UV_STREAM_SHUTTING; uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT); + uv__stream_osx_interrupt_select(stream); return 0; } @@ -1281,6 +1295,7 @@ int uv_write2(uv_write_t* req, */ assert(!(stream->flags & UV_STREAM_BLOCKING)); uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT); + uv__stream_osx_interrupt_select(stream); } return 0; @@ -1341,8 +1356,10 @@ int uv_try_write(uv_stream_t* stream, req.bufs = NULL; /* Do not poll for writable, if we wasn't before calling this */ - if (!has_pollout) + if (!has_pollout) { uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT); + uv__stream_osx_interrupt_select(stream); + } return (int) written; } @@ -1363,12 +1380,6 @@ static int uv__read_start_common(uv_stream_t* stream, */ stream->flags |= UV_STREAM_READING; -#if defined(__APPLE__) - /* Notify select() thread about state change */ - if (stream->select != NULL) - uv__stream_osx_interrupt_select(stream); -#endif /* defined(__APPLE__) */ - /* TODO: try to do the read inline? */ /* TODO: keep track of tcp state. If we've gotten a EOF then we should * not start the IO watcher. @@ -1382,6 +1393,7 @@ static int uv__read_start_common(uv_stream_t* stream, uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); uv__handle_start(stream); + uv__stream_osx_interrupt_select(stream); return 0; } @@ -1414,12 +1426,7 @@ int uv_read_stop(uv_stream_t* stream) { uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) uv__handle_stop(stream); - -#if defined(__APPLE__) - /* Notify select() thread about state change */ - if (stream->select != NULL) - uv__stream_osx_interrupt_select(stream); -#endif /* defined(__APPLE__) */ + uv__stream_osx_interrupt_select(stream); stream->read_cb = NULL; stream->read2_cb = NULL;