stream: fix uv__stream_osx_select

Relying on `readable`/`writable` when polling will lead to the select
thread spinning in the loop and calling `uv_async_send()`, because
stream may never become not `readable`/`writable`.

We should rely on `uv__io_active()` instead and interruprt select thread
every time it changes.
This commit is contained in:
Fedor Indutny 2013-12-21 01:16:17 +04:00
parent 7d23c1d7b9
commit 08cafd091b

View File

@ -168,9 +168,9 @@ static void uv__stream_osx_select(void* arg) {
FD_ZERO(&sread);
FD_ZERO(&swrite);
if (uv_is_readable(stream))
if (uv__io_active(&stream->io_watcher, UV__POLLIN))
FD_SET(fd, &sread);
if (uv_is_writable(stream))
if (uv__io_active(&stream->io_watcher, UV__POLLOUT))
FD_SET(fd, &swrite);
FD_SET(s->int_fd, &sread);
@ -230,11 +230,14 @@ static void uv__stream_osx_select(void* arg) {
static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
#if defined(__APPLE__)
/* Notify select() thread about state change */
uv__stream_select_t* s;
int r;
s = stream->select;
if (s == NULL)
return;
/* Interrupt select() loop
* NOTE: fake_fd and int_fd are socketpair(), thus writing to one will
@ -245,6 +248,9 @@ static void uv__stream_osx_interrupt_select(uv_stream_t* stream) {
while (r == -1 && errno == EINTR);
assert(r == 1);
#else /* !defined(__APPLE__) */
/* No-op on any other platform */
#endif /* !defined(__APPLE__) */
}
@ -622,6 +628,7 @@ static void uv__drain(uv_stream_t* stream) {
assert(QUEUE_EMPTY(&stream->write_queue));
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
uv__stream_osx_interrupt_select(stream);
/* Shutdown? */
if ((stream->flags & UV_STREAM_SHUTTING) &&
@ -802,6 +809,7 @@ start:
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
if (!uv__io_active(&stream->io_watcher, UV__POLLIN))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
return;
} else if (stream->flags & UV_STREAM_BLOCKING) {
/* If this is a blocking stream, try again. */
@ -863,6 +871,9 @@ start:
/* We're not done. */
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
/* Notify select() thread about state change */
uv__stream_osx_interrupt_select(stream);
}
@ -947,6 +958,7 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__handle_stop(stream);
uv__stream_osx_interrupt_select(stream);
uv__stream_read_cb(stream, UV_EOF, buf, UV_UNKNOWN_HANDLE);
}
@ -1013,6 +1025,7 @@ static void uv__read(uv_stream_t* stream) {
/* Wait for the next one. */
if (stream->flags & UV_STREAM_READING) {
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
uv__stream_osx_interrupt_select(stream);
}
uv__stream_read_cb(stream, 0, &buf, UV_UNKNOWN_HANDLE);
} else {
@ -1105,6 +1118,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
stream->flags |= UV_STREAM_SHUTTING;
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
uv__stream_osx_interrupt_select(stream);
return 0;
}
@ -1281,6 +1295,7 @@ int uv_write2(uv_write_t* req,
*/
assert(!(stream->flags & UV_STREAM_BLOCKING));
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return 0;
@ -1341,8 +1356,10 @@ int uv_try_write(uv_stream_t* stream,
req.bufs = NULL;
/* Do not poll for writable, if we wasn't before calling this */
if (!has_pollout)
if (!has_pollout) {
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLOUT);
uv__stream_osx_interrupt_select(stream);
}
return (int) written;
}
@ -1363,12 +1380,6 @@ static int uv__read_start_common(uv_stream_t* stream,
*/
stream->flags |= UV_STREAM_READING;
#if defined(__APPLE__)
/* Notify select() thread about state change */
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif /* defined(__APPLE__) */
/* TODO: try to do the read inline? */
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
@ -1382,6 +1393,7 @@ static int uv__read_start_common(uv_stream_t* stream,
uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN);
uv__handle_start(stream);
uv__stream_osx_interrupt_select(stream);
return 0;
}
@ -1414,12 +1426,7 @@ int uv_read_stop(uv_stream_t* stream) {
uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN);
if (!uv__io_active(&stream->io_watcher, UV__POLLOUT))
uv__handle_stop(stream);
#if defined(__APPLE__)
/* Notify select() thread about state change */
if (stream->select != NULL)
uv__stream_osx_interrupt_select(stream);
#endif /* defined(__APPLE__) */
uv__stream_osx_interrupt_select(stream);
stream->read_cb = NULL;
stream->read2_cb = NULL;