diff --git a/include/uv.h b/include/uv.h index f51aea60..29a07ffe 100644 --- a/include/uv.h +++ b/include/uv.h @@ -349,6 +349,8 @@ typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); * Trying to read from the stream again is undefined. * * The callee is responsible for freeing the buffer, libuv does not reuse it. + * The buffer may be a null buffer (where buf.base=NULL and buf.len=0) on EOF + * or error. */ typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); diff --git a/src/unix/internal.h b/src/unix/internal.h index 2f4d526a..abc774e0 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -99,17 +99,19 @@ /* handle flags */ enum { - UV_CLOSING = 0x01, /* uv_close() called but not finished. */ - UV_CLOSED = 0x02, /* close(2) finished. */ - UV_STREAM_READING = 0x04, /* uv_read_start() called. */ - UV_STREAM_SHUTTING = 0x08, /* uv_shutdown() called but not complete. */ - UV_STREAM_SHUT = 0x10, /* Write side closed. */ - UV_STREAM_READABLE = 0x20, /* The stream is readable */ - UV_STREAM_WRITABLE = 0x40, /* The stream is writable */ - UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */ - UV_TCP_NODELAY = 0x100, /* Disable Nagle. */ - UV_TCP_KEEPALIVE = 0x200, /* Turn on keep-alive. */ - UV_TCP_SINGLE_ACCEPT = 0x400 /* Only accept() when idle. */ + UV_CLOSING = 0x01, /* uv_close() called but not finished. */ + UV_CLOSED = 0x02, /* close(2) finished. */ + UV_STREAM_READING = 0x04, /* uv_read_start() called. */ + UV_STREAM_SHUTTING = 0x08, /* uv_shutdown() called but not complete. */ + UV_STREAM_SHUT = 0x10, /* Write side closed. */ + UV_STREAM_READABLE = 0x20, /* The stream is readable */ + UV_STREAM_WRITABLE = 0x40, /* The stream is writable */ + UV_STREAM_BLOCKING = 0x80, /* Synchronous writes. */ + UV_STREAM_READ_PARTIAL = 0x100, /* read(2) read less than requested. */ + UV_STREAM_READ_EOF = 0x200, /* read(2) read EOF. */ + UV_TCP_NODELAY = 0x400, /* Disable Nagle. */ + UV_TCP_KEEPALIVE = 0x800, /* Turn on keep-alive. */ + UV_TCP_SINGLE_ACCEPT = 0x1000 /* Only accept() when idle. */ }; /* core */ diff --git a/src/unix/stream.c b/src/unix/stream.c index 051241fb..bf0ba68a 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -930,6 +930,27 @@ static uv_handle_type uv__handle_type(int fd) { } +static void uv__stream_read_cb(uv_stream_t* stream, + int status, + uv_buf_t buf, + uv_handle_type type) { + if (stream->read_cb != NULL) + stream->read_cb(stream, status, buf); + else + stream->read2_cb((uv_pipe_t*) stream, status, buf, type); +} + + +static void uv__stream_eof(uv_stream_t* stream, uv_buf_t buf) { + stream->flags |= UV_STREAM_READ_EOF; + uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); + if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) + uv__handle_stop(stream); + uv__set_artificial_error(stream->loop, UV_EOF); + uv__stream_read_cb(stream, -1, buf, UV_UNKNOWN_HANDLE); +} + + static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; @@ -938,6 +959,8 @@ static void uv__read(uv_stream_t* stream) { char cmsg_space[64]; int count; + stream->flags &= ~UV_STREAM_READ_PARTIAL; + /* Prevent loop starvation when the data comes in as fast as (or faster than) * we can read it. XXX Need to rearm fd if we switch to edge-triggered I/O. */ @@ -979,15 +1002,6 @@ static void uv__read(uv_stream_t* stream) { while (nread < 0 && errno == EINTR); } -#define INVOKE_READ_CB(stream, status, buf, type) \ - do { \ - if ((stream)->read_cb != NULL) \ - (stream)->read_cb((stream), (status), (buf)); \ - else \ - (stream)->read2_cb((uv_pipe_t*) (stream), (status), (buf), (type)); \ - } \ - while (0) - if (nread < 0) { /* Error */ if (errno == EAGAIN || errno == EWOULDBLOCK) { @@ -996,22 +1010,17 @@ static void uv__read(uv_stream_t* stream) { uv__io_start(stream->loop, &stream->io_watcher, UV__POLLIN); } uv__set_sys_error(stream->loop, EAGAIN); - INVOKE_READ_CB(stream, 0, buf, UV_UNKNOWN_HANDLE); + uv__stream_read_cb(stream, 0, buf, UV_UNKNOWN_HANDLE); } else { /* Error. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); - INVOKE_READ_CB(stream, -1, buf, UV_UNKNOWN_HANDLE); + uv__stream_read_cb(stream, -1, buf, UV_UNKNOWN_HANDLE); assert(!uv__io_active(&stream->io_watcher, UV__POLLIN) && "stream->read_cb(status=-1) did not call uv_close()"); } return; } else if (nread == 0) { - /* EOF */ - uv__io_stop(stream->loop, &stream->io_watcher, UV__POLLIN); - if (!uv__io_active(&stream->io_watcher, UV__POLLOUT)) - uv__handle_stop(stream); - uv__set_artificial_error(stream->loop, UV_EOF); - INVOKE_READ_CB(stream, -1, buf, UV_UNKNOWN_HANDLE); + uv__stream_eof(stream, buf); return; } else { /* Successful read */ @@ -1062,6 +1071,7 @@ static void uv__read(uv_stream_t* stream) { /* Return if we didn't fill the buffer, there is no more data to read. */ if (nread < buflen) { + stream->flags |= UV_STREAM_READ_PARTIAL; return; } } @@ -1110,17 +1120,33 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { return; } - if (events & (UV__POLLIN | UV__POLLERR | UV__POLLHUP)) { - assert(uv__stream_fd(stream) >= 0); + assert(uv__stream_fd(stream) >= 0); + /* Ignore POLLHUP here. Even it it's set, there may still be data to read. */ + if (events & (UV__POLLIN | UV__POLLERR)) uv__read(stream); - if (uv__stream_fd(stream) == -1) - return; /* read_cb closed stream. */ + if (uv__stream_fd(stream) == -1) + return; /* read_cb closed stream. */ + + /* Short-circuit iff POLLHUP is set, the user is still interested in read + * events and uv__read() reported a partial read but not EOF. If the EOF + * flag is set, uv__read() called read_cb with err=UV_EOF and we don't + * have to do anything. If the partial read flag is not set, we can't + * report the EOF yet because there is still data to read. + */ + if ((events & UV__POLLHUP) && + (stream->flags & UV_STREAM_READING) && + (stream->flags & UV_STREAM_READ_PARTIAL) && + !(stream->flags & UV_STREAM_READ_EOF)) { + uv_buf_t buf = { NULL, 0 }; + uv__stream_eof(stream, buf); } + if (uv__stream_fd(stream) == -1) + return; /* read_cb closed stream. */ + if (events & UV__POLLOUT) { - assert(uv__stream_fd(stream) >= 0); uv__write(stream); uv__write_callbacks(stream); }