UNIX: Loop on blocking streams

Also removes a superfluous syscall during uv_tty_init for writable TTY
streams.
This commit is contained in:
Ryan Dahl 2011-11-02 16:46:19 -07:00
parent 74b49e821b
commit e1bee05ecd
2 changed files with 31 additions and 7 deletions

View File

@ -62,6 +62,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->accepted_fd = -1;
stream->fd = -1;
stream->delayed_error = 0;
stream->blocking = 0;
ngx_queue_init(&stream->write_queue);
ngx_queue_init(&stream->write_completed_queue);
stream->write_queue_size = 0;
@ -340,9 +341,9 @@ static void uv__write(uv_stream_t* stream) {
int iovcnt;
ssize_t n;
assert(stream->fd >= 0);
start:
/* TODO: should probably while(1) here until EAGAIN */
assert(stream->fd >= 0);
/* Get the request at the head of the queue. */
req = uv_write_queue_head(stream);
@ -353,14 +354,16 @@ static void uv__write(uv_stream_t* stream) {
assert(req->handle == stream);
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
/*
* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
*/
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
iov = (struct iovec*) &(req->bufs[req->write_index]);
iovcnt = req->bufcnt - req->write_index;
/* Now do the actual writev. Note that we've been updating the pointers
/*
* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
@ -409,6 +412,9 @@ static void uv__write(uv_stream_t* stream) {
stream->write_queue_size -= uv__write_req_size(req);
uv__write_req_finish(req);
return;
} else if (stream->blocking) {
/* If this is a blocking stream, try again. */
goto start;
}
} else {
/* Successful write */
@ -426,8 +432,17 @@ static void uv__write(uv_stream_t* stream) {
stream->write_queue_size -= n;
n = 0;
/* There is more to write. Break and ensure the watcher is pending. */
break;
/* There is more to write. */
if (stream->blocking) {
/*
* If we're blocking then we should not be enabling the write
* watcher - instead we need to try again.
*/
goto start;
} else {
/* Break loop and ensure the watcher is pending. */
break;
}
} else {
/* Finished writing the buf at index req->write_index. */
@ -453,6 +468,9 @@ static void uv__write(uv_stream_t* stream) {
/* Either we've counted n down to zero or we've got EAGAIN. */
assert(n == 0 || n == -1);
/* Only non-blocking streams should use the write_watcher. */
assert(!stream->blocking);
/* We're not done. */
ev_io_start(stream->loop->ev, &stream->write_watcher);
}
@ -862,6 +880,13 @@ int uv_write2(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
if (empty_queue) {
uv__write(stream);
} else {
/*
* blocking streams should never have anything in the queue.
* if this assert fires then somehow the blocking stream isn't being
* sufficently flushed in uv__write.
*/
assert(!stream->blocking);
ev_io_start(stream->loop->ev, &stream->write_watcher);
}

View File

@ -41,7 +41,6 @@ int uv_tty_init(uv_loop_t* loop, uv_tty_t* tty, int fd, int readable) {
uv__stream_open((uv_stream_t*)tty, fd, UV_READABLE);
} else {
/* Note: writable tty we set to blocking mode. */
uv__nonblock(fd, 0);
uv__stream_open((uv_stream_t*)tty, fd, UV_WRITABLE);
tty->blocking = 1;
}