diff --git a/src/unix/stream.c b/src/unix/stream.c index 2e5bde01..3cdeaa8c 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -62,6 +62,7 @@ void uv__stream_init(uv_loop_t* loop, stream->accepted_fd = -1; stream->fd = -1; stream->delayed_error = 0; + stream->blocking = 0; ngx_queue_init(&stream->write_queue); ngx_queue_init(&stream->write_completed_queue); stream->write_queue_size = 0; @@ -340,9 +341,9 @@ static void uv__write(uv_stream_t* stream) { int iovcnt; ssize_t n; - assert(stream->fd >= 0); +start: - /* TODO: should probably while(1) here until EAGAIN */ + assert(stream->fd >= 0); /* Get the request at the head of the queue. */ req = uv_write_queue_head(stream); @@ -353,14 +354,16 @@ static void uv__write(uv_stream_t* stream) { assert(req->handle == stream); - /* Cast to iovec. We had to have our own uv_buf_t instead of iovec + /* + * Cast to iovec. We had to have our own uv_buf_t instead of iovec * because Windows's WSABUF is not an iovec. */ assert(sizeof(uv_buf_t) == sizeof(struct iovec)); iov = (struct iovec*) &(req->bufs[req->write_index]); iovcnt = req->bufcnt - req->write_index; - /* Now do the actual writev. Note that we've been updating the pointers + /* + * Now do the actual writev. Note that we've been updating the pointers * inside the iov each time we write. So there is no need to offset it. */ @@ -409,6 +412,9 @@ static void uv__write(uv_stream_t* stream) { stream->write_queue_size -= uv__write_req_size(req); uv__write_req_finish(req); return; + } else if (stream->blocking) { + /* If this is a blocking stream, try again. */ + goto start; } } else { /* Successful write */ @@ -426,8 +432,17 @@ static void uv__write(uv_stream_t* stream) { stream->write_queue_size -= n; n = 0; - /* There is more to write. Break and ensure the watcher is pending. */ - break; + /* There is more to write. */ + if (stream->blocking) { + /* + * If we're blocking then we should not be enabling the write + * watcher - instead we need to try again. + */ + goto start; + } else { + /* Break loop and ensure the watcher is pending. */ + break; + } } else { /* Finished writing the buf at index req->write_index. */ @@ -453,6 +468,9 @@ static void uv__write(uv_stream_t* stream) { /* Either we've counted n down to zero or we've got EAGAIN. */ assert(n == 0 || n == -1); + /* Only non-blocking streams should use the write_watcher. */ + assert(!stream->blocking); + /* We're not done. */ ev_io_start(stream->loop->ev, &stream->write_watcher); } @@ -862,6 +880,13 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, if (empty_queue) { uv__write(stream); } else { + /* + * blocking streams should never have anything in the queue. + * if this assert fires then somehow the blocking stream isn't being + * sufficently flushed in uv__write. + */ + assert(!stream->blocking); + ev_io_start(stream->loop->ev, &stream->write_watcher); } diff --git a/src/unix/tty.c b/src/unix/tty.c index 32ac2c71..de77f5c4 100644 --- a/src/unix/tty.c +++ b/src/unix/tty.c @@ -41,7 +41,6 @@ int uv_tty_init(uv_loop_t* loop, uv_tty_t* tty, int fd, int readable) { uv__stream_open((uv_stream_t*)tty, fd, UV_READABLE); } else { /* Note: writable tty we set to blocking mode. */ - uv__nonblock(fd, 0); uv__stream_open((uv_stream_t*)tty, fd, UV_WRITABLE); tty->blocking = 1; }