unix: split out stream into its own file
This commit is contained in:
parent
38ce213b6a
commit
6fd340b8ca
@ -37,6 +37,7 @@ OBJS += src/unix/error.o
|
||||
OBJS += src/unix/process.o
|
||||
OBJS += src/unix/tcp.o
|
||||
OBJS += src/unix/pipe.o
|
||||
OBJS += src/unix/stream.o
|
||||
|
||||
ifeq (SunOS,$(uname_S))
|
||||
EV_CONFIG=config_sunos.h
|
||||
|
||||
767
src/unix/core.c
767
src/unix/core.c
@ -82,9 +82,6 @@ static uv_loop_t* default_loop_ptr;
|
||||
void uv__next(EV_P_ ev_idle* watcher, int revents);
|
||||
static void uv__finish_close(uv_handle_t* handle);
|
||||
|
||||
static uv_write_t* uv__write(uv_stream_t* stream);
|
||||
static void uv__read(uv_stream_t* stream);
|
||||
static void uv__stream_connect(uv_stream_t*);
|
||||
|
||||
|
||||
#ifndef __GNUC__
|
||||
@ -92,9 +89,6 @@ static void uv__stream_connect(uv_stream_t*);
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
void uv_init() {
|
||||
default_loop_ptr = &default_loop_struct;
|
||||
#if defined(__MAC_OS_X_VERSION_MIN_REQUIRED) && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1060
|
||||
@ -208,8 +202,6 @@ uv_loop_t* uv_default_loop() {
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
int uv_run(uv_loop_t* loop) {
|
||||
ev_run(loop->ev, 0);
|
||||
return 0;
|
||||
@ -232,132 +224,6 @@ void uv__handle_init(uv_loop_t* loop, uv_handle_t* handle,
|
||||
}
|
||||
|
||||
|
||||
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||
socklen_t yes;
|
||||
|
||||
assert(fd >= 0);
|
||||
stream->fd = fd;
|
||||
|
||||
((uv_handle_t*)stream)->flags |= flags;
|
||||
|
||||
/* Reuse the port address if applicable. */
|
||||
yes = 1;
|
||||
if (stream->type == UV_TCP
|
||||
&& setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Associate the fd with each ev_io watcher. */
|
||||
ev_io_set(&stream->read_watcher, fd, EV_READ);
|
||||
ev_io_set(&stream->write_watcher, fd, EV_WRITE);
|
||||
|
||||
/* These should have been set up by uv_tcp_init or uv_pipe_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
int fd;
|
||||
struct sockaddr_storage addr;
|
||||
uv_stream_t* stream = watcher->data;
|
||||
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
assert(revents == EV_READ);
|
||||
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->accepted_fd >= 0) {
|
||||
ev_io_stop(EV_A, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
|
||||
/* connection_cb can close the server socket while we're
|
||||
* in the loop so check it on each iteration.
|
||||
*/
|
||||
while (stream->fd != -1) {
|
||||
assert(stream->accepted_fd < 0);
|
||||
fd = uv__accept(stream->fd, (struct sockaddr*)&addr, sizeof addr);
|
||||
|
||||
if (fd < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
/* No problem. */
|
||||
return;
|
||||
} else if (errno == EMFILE) {
|
||||
/* TODO special trick. unlock reserved socket, accept, close. */
|
||||
return;
|
||||
} else {
|
||||
uv_err_new(stream->loop, errno);
|
||||
stream->connection_cb((uv_stream_t*)stream, -1);
|
||||
}
|
||||
} else {
|
||||
stream->accepted_fd = fd;
|
||||
stream->connection_cb((uv_stream_t*)stream, 0);
|
||||
if (stream->accepted_fd >= 0) {
|
||||
/* The user hasn't yet accepted called uv_accept() */
|
||||
ev_io_stop(stream->loop->ev, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
||||
uv_stream_t* streamServer;
|
||||
uv_stream_t* streamClient;
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
/* TODO document this */
|
||||
assert(server->loop == client->loop);
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
streamServer = (uv_stream_t*)server;
|
||||
streamClient = (uv_stream_t*)client;
|
||||
|
||||
if (streamServer->accepted_fd < 0) {
|
||||
uv_err_new(server->loop, EAGAIN);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (uv__stream_open(streamClient, streamServer->accepted_fd,
|
||||
UV_READABLE | UV_WRITABLE)) {
|
||||
/* TODO handle error */
|
||||
streamServer->accepted_fd = -1;
|
||||
uv__close(streamServer->accepted_fd);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ev_io_start(streamServer->loop->ev, &streamServer->read_watcher);
|
||||
streamServer->accepted_fd = -1;
|
||||
status = 0;
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
|
||||
switch (stream->type) {
|
||||
case UV_TCP:
|
||||
return uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
|
||||
case UV_NAMED_PIPE:
|
||||
return uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
|
||||
default:
|
||||
assert(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void uv__finish_close(uv_handle_t* handle) {
|
||||
uv_loop_t* loop = handle->loop;
|
||||
|
||||
@ -418,26 +284,6 @@ void uv__finish_close(uv_handle_t* handle) {
|
||||
}
|
||||
|
||||
|
||||
uv_write_t* uv_write_queue_head(uv_stream_t* stream) {
|
||||
ngx_queue_t* q;
|
||||
uv_write_t* req;
|
||||
|
||||
if (ngx_queue_empty(&stream->write_queue)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
q = ngx_queue_head(&stream->write_queue);
|
||||
if (!q) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
req = ngx_queue_data(q, struct uv_write_s, queue);
|
||||
assert(req);
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
|
||||
void uv__next(EV_P_ ev_idle* watcher, int revents) {
|
||||
uv_handle_t* handle = watcher->data;
|
||||
assert(watcher == &handle->next_watcher);
|
||||
@ -451,481 +297,6 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) {
|
||||
}
|
||||
|
||||
|
||||
static void uv__drain(uv_stream_t* stream) {
|
||||
uv_shutdown_t* req;
|
||||
|
||||
assert(!uv_write_queue_head(stream));
|
||||
assert(stream->write_queue_size == 0);
|
||||
|
||||
ev_io_stop(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
/* Shutdown? */
|
||||
if ((stream->flags & UV_SHUTTING) &&
|
||||
!(stream->flags & UV_CLOSING) &&
|
||||
!(stream->flags & UV_SHUT)) {
|
||||
assert(stream->shutdown_req);
|
||||
|
||||
req = stream->shutdown_req;
|
||||
|
||||
if (shutdown(stream->fd, SHUT_WR)) {
|
||||
/* Error. Report it. User should call uv_close(). */
|
||||
uv_err_new(stream->loop, errno);
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
} else {
|
||||
uv_err_new(stream->loop, 0);
|
||||
((uv_handle_t*) stream)->flags |= UV_SHUT;
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* On success returns NULL. On error returns a pointer to the write request
|
||||
* which had the error.
|
||||
*/
|
||||
static uv_write_t* uv__write(uv_stream_t* stream) {
|
||||
uv_write_t* req;
|
||||
struct iovec* iov;
|
||||
int iovcnt;
|
||||
ssize_t n;
|
||||
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
/* TODO: should probably while(1) here until EAGAIN */
|
||||
|
||||
/* Get the request at the head of the queue. */
|
||||
req = uv_write_queue_head(stream);
|
||||
if (!req) {
|
||||
assert(stream->write_queue_size == 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(req->handle == stream);
|
||||
|
||||
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
|
||||
* because Windows's WSABUF is not an iovec.
|
||||
*/
|
||||
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
|
||||
iov = (struct iovec*) &(req->bufs[req->write_index]);
|
||||
iovcnt = req->bufcnt - req->write_index;
|
||||
|
||||
/* Now do the actual writev. Note that we've been updating the pointers
|
||||
* inside the iov each time we write. So there is no need to offset it.
|
||||
*/
|
||||
|
||||
do {
|
||||
if (iovcnt == 1) {
|
||||
n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
|
||||
} else {
|
||||
n = writev(stream->fd, iov, iovcnt);
|
||||
}
|
||||
}
|
||||
while (n == -1 && errno == EINTR);
|
||||
|
||||
if (n < 0) {
|
||||
if (errno != EAGAIN) {
|
||||
/* Error */
|
||||
uv_err_new(stream->loop, errno);
|
||||
return req;
|
||||
}
|
||||
} else {
|
||||
/* Successful write */
|
||||
|
||||
/* Update the counters. */
|
||||
while (n >= 0) {
|
||||
uv_buf_t* buf = &(req->bufs[req->write_index]);
|
||||
size_t len = buf->len;
|
||||
|
||||
assert(req->write_index < req->bufcnt);
|
||||
|
||||
if ((size_t)n < len) {
|
||||
buf->base += n;
|
||||
buf->len -= n;
|
||||
stream->write_queue_size -= n;
|
||||
n = 0;
|
||||
|
||||
/* There is more to write. Break and ensure the watcher is pending. */
|
||||
break;
|
||||
|
||||
} else {
|
||||
/* Finished writing the buf at index req->write_index. */
|
||||
req->write_index++;
|
||||
|
||||
assert((size_t)n >= len);
|
||||
n -= len;
|
||||
|
||||
assert(stream->write_queue_size >= len);
|
||||
stream->write_queue_size -= len;
|
||||
|
||||
if (req->write_index == req->bufcnt) {
|
||||
/* Then we're done! */
|
||||
assert(n == 0);
|
||||
|
||||
/* Pop the req off tcp->write_queue. */
|
||||
ngx_queue_remove(&req->queue);
|
||||
if (req->bufs != req->bufsml) {
|
||||
free(req->bufs);
|
||||
}
|
||||
req->bufs = NULL;
|
||||
|
||||
/* Add it to the write_completed_queue where it will have its
|
||||
* callback called in the near future.
|
||||
* TODO: start trying to write the next request.
|
||||
*/
|
||||
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Either we've counted n down to zero or we've got EAGAIN. */
|
||||
assert(n == 0 || n == -1);
|
||||
|
||||
/* We're not done. */
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static void uv__write_callbacks(uv_stream_t* stream) {
|
||||
int callbacks_made = 0;
|
||||
ngx_queue_t* q;
|
||||
uv_write_t* req;
|
||||
|
||||
while (!ngx_queue_empty(&stream->write_completed_queue)) {
|
||||
/* Pop a req off write_completed_queue. */
|
||||
q = ngx_queue_head(&stream->write_completed_queue);
|
||||
assert(q);
|
||||
req = ngx_queue_data(q, struct uv_write_s, queue);
|
||||
ngx_queue_remove(q);
|
||||
|
||||
/* NOTE: call callback AFTER freeing the request data. */
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
|
||||
callbacks_made++;
|
||||
}
|
||||
|
||||
assert(ngx_queue_empty(&stream->write_completed_queue));
|
||||
|
||||
/* Write queue drained. */
|
||||
if (!uv_write_queue_head(stream)) {
|
||||
uv__drain(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__read(uv_stream_t* stream) {
|
||||
uv_buf_t buf;
|
||||
ssize_t nread;
|
||||
struct ev_loop* ev = stream->loop->ev;
|
||||
|
||||
/* XXX: Maybe instead of having UV_READING we just test if
|
||||
* tcp->read_cb is NULL or not?
|
||||
*/
|
||||
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
|
||||
assert(stream->alloc_cb);
|
||||
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
|
||||
|
||||
assert(buf.len > 0);
|
||||
assert(buf.base);
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
do {
|
||||
nread = read(stream->fd, buf.base, buf.len);
|
||||
}
|
||||
while (nread < 0 && errno == EINTR);
|
||||
|
||||
if (nread < 0) {
|
||||
/* Error */
|
||||
if (errno == EAGAIN) {
|
||||
/* Wait for the next one. */
|
||||
if (stream->flags & UV_READING) {
|
||||
ev_io_start(ev, &stream->read_watcher);
|
||||
}
|
||||
uv_err_new(stream->loop, EAGAIN);
|
||||
stream->read_cb(stream, 0, buf);
|
||||
return;
|
||||
} else {
|
||||
/* Error. User should call uv_close(). */
|
||||
uv_err_new(stream->loop, errno);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
assert(!ev_is_active(&stream->read_watcher));
|
||||
return;
|
||||
}
|
||||
} else if (nread == 0) {
|
||||
/* EOF */
|
||||
uv_err_new_artificial(stream->loop, UV_EOF);
|
||||
ev_io_stop(ev, &stream->read_watcher);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
return;
|
||||
} else {
|
||||
/* Successful read */
|
||||
stream->read_cb(stream, nread, buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
||||
"uv_shutdown (unix) only supports uv_handle_t right now");
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
if (!(stream->flags & UV_WRITABLE) ||
|
||||
stream->flags & UV_SHUT ||
|
||||
stream->flags & UV_CLOSED ||
|
||||
stream->flags & UV_CLOSING) {
|
||||
uv_err_new(stream->loop, EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Initialize request */
|
||||
uv__req_init((uv_req_t*)req);
|
||||
req->handle = stream;
|
||||
req->cb = cb;
|
||||
|
||||
stream->shutdown_req = req;
|
||||
req->type = UV_SHUTDOWN;
|
||||
|
||||
((uv_handle_t*)stream)->flags |= UV_SHUTTING;
|
||||
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
|
||||
uv_stream_t* stream = watcher->data;
|
||||
|
||||
assert(stream->type == UV_TCP ||
|
||||
stream->type == UV_NAMED_PIPE);
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->connect_req) {
|
||||
uv__stream_connect(stream);
|
||||
} else {
|
||||
assert(revents & (EV_READ | EV_WRITE));
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
if (revents & EV_READ) {
|
||||
uv__read((uv_stream_t*)stream);
|
||||
}
|
||||
|
||||
if (revents & EV_WRITE) {
|
||||
uv_write_t* req = uv__write(stream);
|
||||
if (req) {
|
||||
/* Error. Notify the user. */
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
} else {
|
||||
uv__write_callbacks(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* We get called here from directly following a call to connect(2).
|
||||
* In order to determine if we've errored out or succeeded must call
|
||||
* getsockopt.
|
||||
*/
|
||||
static void uv__stream_connect(uv_stream_t* stream) {
|
||||
int error;
|
||||
uv_connect_t* req = stream->connect_req;
|
||||
socklen_t errorsize = sizeof(int);
|
||||
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
|
||||
assert(req);
|
||||
|
||||
if (stream->delayed_error) {
|
||||
/* To smooth over the differences between unixes errors that
|
||||
* were reported synchronously on the first connect can be delayed
|
||||
* until the next tick--which is now.
|
||||
*/
|
||||
error = stream->delayed_error;
|
||||
stream->delayed_error = 0;
|
||||
} else {
|
||||
/* Normal situation: we need to get the socket error from the kernel. */
|
||||
assert(stream->fd >= 0);
|
||||
getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
|
||||
}
|
||||
|
||||
if (!error) {
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
|
||||
/* Successful connection */
|
||||
stream->connect_req = NULL;
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
|
||||
} else if (error == EINPROGRESS) {
|
||||
/* Still connecting. */
|
||||
return;
|
||||
} else {
|
||||
/* Error */
|
||||
uv_err_new(stream->loop, error);
|
||||
|
||||
stream->connect_req = NULL;
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static int uv__connect(uv_connect_t* req,
|
||||
uv_stream_t* stream,
|
||||
struct sockaddr* addr,
|
||||
socklen_t addrlen,
|
||||
uv_connect_cb cb) {
|
||||
|
||||
int sockfd;
|
||||
int r;
|
||||
|
||||
if (stream->fd <= 0) {
|
||||
if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (uv__stream_open(stream, sockfd, UV_READABLE | UV_WRITABLE)) {
|
||||
uv__close(sockfd);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
|
||||
uv__req_init((uv_req_t*)req);
|
||||
req->cb = cb;
|
||||
req->handle = stream;
|
||||
req->type = UV_CONNECT;
|
||||
ngx_queue_init(&req->queue);
|
||||
|
||||
if (stream->connect_req) {
|
||||
uv_err_new(stream->loop, EALREADY);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (stream->type != UV_TCP) {
|
||||
uv_err_new(stream->loop, ENOTSOCK);
|
||||
return -1;
|
||||
}
|
||||
|
||||
stream->connect_req = req;
|
||||
|
||||
do {
|
||||
r = connect(stream->fd, addr, addrlen);
|
||||
}
|
||||
while (r == -1 && errno == EINTR);
|
||||
|
||||
stream->delayed_error = 0;
|
||||
|
||||
if (r != 0 && errno != EINPROGRESS) {
|
||||
switch (errno) {
|
||||
/* If we get a ECONNREFUSED wait until the next tick to report the
|
||||
* error. Solaris wants to report immediately--other unixes want to
|
||||
* wait.
|
||||
*/
|
||||
case ECONNREFUSED:
|
||||
stream->delayed_error = errno;
|
||||
break;
|
||||
|
||||
default:
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
assert(stream->write_watcher.data == stream);
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
if (stream->delayed_error) {
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_connect(uv_connect_t* req,
|
||||
uv_tcp_t* handle,
|
||||
struct sockaddr_in address,
|
||||
uv_connect_cb cb) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
if (handle->type != UV_TCP) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (address.sin_family != AF_INET) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
status = uv__connect(req,
|
||||
(uv_stream_t*)handle,
|
||||
(struct sockaddr*)&address,
|
||||
sizeof address,
|
||||
cb);
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_connect6(uv_connect_t* req,
|
||||
uv_tcp_t* handle,
|
||||
struct sockaddr_in6 address,
|
||||
uv_connect_cb cb) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
if (handle->type != UV_TCP) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (address.sin6_family != AF_INET6) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
status = uv__connect(req,
|
||||
(uv_stream_t*)handle,
|
||||
(struct sockaddr*)&address,
|
||||
sizeof address,
|
||||
cb);
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen) {
|
||||
socklen_t socklen;
|
||||
int saved_errno;
|
||||
@ -947,101 +318,6 @@ int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen) {
|
||||
}
|
||||
|
||||
|
||||
static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
|
||||
size_t total = 0;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < bufcnt; i++) {
|
||||
total += bufs[i].len;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
|
||||
/* The buffers to be written must remain valid until the callback is called.
|
||||
* This is not required for the uv_buf_t array.
|
||||
*/
|
||||
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
|
||||
uv_write_cb cb) {
|
||||
uv_stream_t* stream;
|
||||
int empty_queue;
|
||||
|
||||
stream = (uv_stream_t*)handle;
|
||||
|
||||
/* Initialize the req */
|
||||
uv__req_init((uv_req_t*) req);
|
||||
req->cb = cb;
|
||||
req->handle = handle;
|
||||
ngx_queue_init(&req->queue);
|
||||
|
||||
assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
|
||||
&& "uv_write (unix) does not yet support other types of streams");
|
||||
|
||||
empty_queue = (stream->write_queue_size == 0);
|
||||
|
||||
if (stream->fd < 0) {
|
||||
uv_err_new(stream->loop, EBADF);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ngx_queue_init(&req->queue);
|
||||
req->type = UV_WRITE;
|
||||
|
||||
|
||||
if (bufcnt < UV_REQ_BUFSML_SIZE) {
|
||||
req->bufs = req->bufsml;
|
||||
}
|
||||
else {
|
||||
req->bufs = malloc(sizeof(uv_buf_t) * bufcnt);
|
||||
}
|
||||
|
||||
memcpy(req->bufs, bufs, bufcnt * sizeof(uv_buf_t));
|
||||
req->bufcnt = bufcnt;
|
||||
|
||||
/*
|
||||
* fprintf(stderr, "cnt: %d bufs: %p bufsml: %p\n", bufcnt, req->bufs, req->bufsml);
|
||||
*/
|
||||
|
||||
req->write_index = 0;
|
||||
stream->write_queue_size += uv__buf_count(bufs, bufcnt);
|
||||
|
||||
/* Append the request to write_queue. */
|
||||
ngx_queue_insert_tail(&stream->write_queue, &req->queue);
|
||||
|
||||
assert(!ngx_queue_empty(&stream->write_queue));
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.data == stream);
|
||||
assert(stream->write_watcher.fd == stream->fd);
|
||||
|
||||
/* If the queue was empty when this function began, we should attempt to
|
||||
* do the write immediately. Otherwise start the write_watcher and wait
|
||||
* for the fd to become writable.
|
||||
*/
|
||||
if (empty_queue) {
|
||||
if (uv__write(stream)) {
|
||||
/* Error. uv_last_error has been set. */
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* If the queue is now empty - we've flushed the request already. That
|
||||
* means we need to make the callback. The callback can only be done on a
|
||||
* fresh stack so we feed the event loop in order to service it.
|
||||
*/
|
||||
if (ngx_queue_empty(&stream->write_queue)) {
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
} else {
|
||||
/* Otherwise there is data to write - so we should wait for the file
|
||||
* descriptor to become writable.
|
||||
*/
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv_ref(uv_loop_t* loop) {
|
||||
ev_ref(loop->ev);
|
||||
}
|
||||
@ -1062,49 +338,6 @@ int64_t uv_now(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
|
||||
|
||||
if (stream->flags & UV_CLOSING) {
|
||||
uv_err_new(stream->loop, EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* The UV_READING flag is irrelevant of the state of the tcp - it just
|
||||
* expresses the desired state of the user.
|
||||
*/
|
||||
((uv_handle_t*)stream)->flags |= UV_READING;
|
||||
|
||||
/* TODO: try to do the read inline? */
|
||||
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
|
||||
* not start the IO watcher.
|
||||
*/
|
||||
assert(stream->fd >= 0);
|
||||
assert(alloc_cb);
|
||||
|
||||
stream->read_cb = read_cb;
|
||||
stream->alloc_cb = alloc_cb;
|
||||
|
||||
/* These should have been set by uv_tcp_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_read_stop(uv_stream_t* stream) {
|
||||
uv_tcp_t* tcp = (uv_tcp_t*)stream;
|
||||
|
||||
((uv_handle_t*)tcp)->flags &= ~UV_READING;
|
||||
|
||||
ev_io_stop(tcp->loop->ev, &tcp->read_watcher);
|
||||
tcp->read_cb = NULL;
|
||||
tcp->alloc_cb = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__req_init(uv_req_t* req) {
|
||||
/* loop->counters.req_init++; */
|
||||
req->type = UV_UNKNOWN_REQ;
|
||||
|
||||
@ -57,6 +57,8 @@ int uv__stream_open(uv_stream_t*, int fd, int flags);
|
||||
void uv__stream_io(EV_P_ ev_io* watcher, int revents);
|
||||
void uv__server_io(EV_P_ ev_io* watcher, int revents);
|
||||
int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len);
|
||||
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
|
||||
socklen_t addrlen, uv_connect_cb cb);
|
||||
|
||||
/* tcp */
|
||||
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb);
|
||||
|
||||
727
src/unix/stream.c
Normal file
727
src/unix/stream.c
Normal file
@ -0,0 +1,727 @@
|
||||
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "uv.h"
|
||||
#include "internal.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <errno.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/uio.h>
|
||||
|
||||
|
||||
static void uv__stream_connect(uv_stream_t*);
|
||||
static uv_write_t* uv__write(uv_stream_t* stream);
|
||||
static void uv__read(uv_stream_t* stream);
|
||||
|
||||
|
||||
static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
|
||||
size_t total = 0;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < bufcnt; i++) {
|
||||
total += bufs[i].len;
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
|
||||
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
|
||||
socklen_t yes;
|
||||
|
||||
assert(fd >= 0);
|
||||
stream->fd = fd;
|
||||
|
||||
((uv_handle_t*)stream)->flags |= flags;
|
||||
|
||||
/* Reuse the port address if applicable. */
|
||||
yes = 1;
|
||||
if (stream->type == UV_TCP
|
||||
&& setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1) {
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Associate the fd with each ev_io watcher. */
|
||||
ev_io_set(&stream->read_watcher, fd, EV_READ);
|
||||
ev_io_set(&stream->write_watcher, fd, EV_WRITE);
|
||||
|
||||
/* These should have been set up by uv_tcp_init or uv_pipe_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
int fd;
|
||||
struct sockaddr_storage addr;
|
||||
uv_stream_t* stream = watcher->data;
|
||||
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
assert(revents == EV_READ);
|
||||
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->accepted_fd >= 0) {
|
||||
ev_io_stop(EV_A, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
|
||||
/* connection_cb can close the server socket while we're
|
||||
* in the loop so check it on each iteration.
|
||||
*/
|
||||
while (stream->fd != -1) {
|
||||
assert(stream->accepted_fd < 0);
|
||||
fd = uv__accept(stream->fd, (struct sockaddr*)&addr, sizeof addr);
|
||||
|
||||
if (fd < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
/* No problem. */
|
||||
return;
|
||||
} else if (errno == EMFILE) {
|
||||
/* TODO special trick. unlock reserved socket, accept, close. */
|
||||
return;
|
||||
} else {
|
||||
uv_err_new(stream->loop, errno);
|
||||
stream->connection_cb((uv_stream_t*)stream, -1);
|
||||
}
|
||||
} else {
|
||||
stream->accepted_fd = fd;
|
||||
stream->connection_cb((uv_stream_t*)stream, 0);
|
||||
if (stream->accepted_fd >= 0) {
|
||||
/* The user hasn't yet accepted called uv_accept() */
|
||||
ev_io_stop(stream->loop->ev, &stream->read_watcher);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int uv_accept(uv_stream_t* server, uv_stream_t* client) {
|
||||
uv_stream_t* streamServer;
|
||||
uv_stream_t* streamClient;
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
/* TODO document this */
|
||||
assert(server->loop == client->loop);
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
streamServer = (uv_stream_t*)server;
|
||||
streamClient = (uv_stream_t*)client;
|
||||
|
||||
if (streamServer->accepted_fd < 0) {
|
||||
uv_err_new(server->loop, EAGAIN);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (uv__stream_open(streamClient, streamServer->accepted_fd,
|
||||
UV_READABLE | UV_WRITABLE)) {
|
||||
/* TODO handle error */
|
||||
streamServer->accepted_fd = -1;
|
||||
uv__close(streamServer->accepted_fd);
|
||||
goto out;
|
||||
}
|
||||
|
||||
ev_io_start(streamServer->loop->ev, &streamServer->read_watcher);
|
||||
streamServer->accepted_fd = -1;
|
||||
status = 0;
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
|
||||
switch (stream->type) {
|
||||
case UV_TCP:
|
||||
return uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
|
||||
case UV_NAMED_PIPE:
|
||||
return uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
|
||||
default:
|
||||
assert(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
uv_write_t* uv_write_queue_head(uv_stream_t* stream) {
|
||||
ngx_queue_t* q;
|
||||
uv_write_t* req;
|
||||
|
||||
if (ngx_queue_empty(&stream->write_queue)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
q = ngx_queue_head(&stream->write_queue);
|
||||
if (!q) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
req = ngx_queue_data(q, struct uv_write_s, queue);
|
||||
assert(req);
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
|
||||
static void uv__drain(uv_stream_t* stream) {
|
||||
uv_shutdown_t* req;
|
||||
|
||||
assert(!uv_write_queue_head(stream));
|
||||
assert(stream->write_queue_size == 0);
|
||||
|
||||
ev_io_stop(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
/* Shutdown? */
|
||||
if ((stream->flags & UV_SHUTTING) &&
|
||||
!(stream->flags & UV_CLOSING) &&
|
||||
!(stream->flags & UV_SHUT)) {
|
||||
assert(stream->shutdown_req);
|
||||
|
||||
req = stream->shutdown_req;
|
||||
|
||||
if (shutdown(stream->fd, SHUT_WR)) {
|
||||
/* Error. Report it. User should call uv_close(). */
|
||||
uv_err_new(stream->loop, errno);
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
} else {
|
||||
uv_err_new(stream->loop, 0);
|
||||
((uv_handle_t*) stream)->flags |= UV_SHUT;
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* On success returns NULL. On error returns a pointer to the write request
|
||||
* which had the error.
|
||||
*/
|
||||
static uv_write_t* uv__write(uv_stream_t* stream) {
|
||||
uv_write_t* req;
|
||||
struct iovec* iov;
|
||||
int iovcnt;
|
||||
ssize_t n;
|
||||
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
/* TODO: should probably while(1) here until EAGAIN */
|
||||
|
||||
/* Get the request at the head of the queue. */
|
||||
req = uv_write_queue_head(stream);
|
||||
if (!req) {
|
||||
assert(stream->write_queue_size == 0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
assert(req->handle == stream);
|
||||
|
||||
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
|
||||
* because Windows's WSABUF is not an iovec.
|
||||
*/
|
||||
assert(sizeof(uv_buf_t) == sizeof(struct iovec));
|
||||
iov = (struct iovec*) &(req->bufs[req->write_index]);
|
||||
iovcnt = req->bufcnt - req->write_index;
|
||||
|
||||
/* Now do the actual writev. Note that we've been updating the pointers
|
||||
* inside the iov each time we write. So there is no need to offset it.
|
||||
*/
|
||||
|
||||
do {
|
||||
if (iovcnt == 1) {
|
||||
n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
|
||||
} else {
|
||||
n = writev(stream->fd, iov, iovcnt);
|
||||
}
|
||||
}
|
||||
while (n == -1 && errno == EINTR);
|
||||
|
||||
if (n < 0) {
|
||||
if (errno != EAGAIN) {
|
||||
/* Error */
|
||||
uv_err_new(stream->loop, errno);
|
||||
return req;
|
||||
}
|
||||
} else {
|
||||
/* Successful write */
|
||||
|
||||
/* Update the counters. */
|
||||
while (n >= 0) {
|
||||
uv_buf_t* buf = &(req->bufs[req->write_index]);
|
||||
size_t len = buf->len;
|
||||
|
||||
assert(req->write_index < req->bufcnt);
|
||||
|
||||
if ((size_t)n < len) {
|
||||
buf->base += n;
|
||||
buf->len -= n;
|
||||
stream->write_queue_size -= n;
|
||||
n = 0;
|
||||
|
||||
/* There is more to write. Break and ensure the watcher is pending. */
|
||||
break;
|
||||
|
||||
} else {
|
||||
/* Finished writing the buf at index req->write_index. */
|
||||
req->write_index++;
|
||||
|
||||
assert((size_t)n >= len);
|
||||
n -= len;
|
||||
|
||||
assert(stream->write_queue_size >= len);
|
||||
stream->write_queue_size -= len;
|
||||
|
||||
if (req->write_index == req->bufcnt) {
|
||||
/* Then we're done! */
|
||||
assert(n == 0);
|
||||
|
||||
/* Pop the req off tcp->write_queue. */
|
||||
ngx_queue_remove(&req->queue);
|
||||
if (req->bufs != req->bufsml) {
|
||||
free(req->bufs);
|
||||
}
|
||||
req->bufs = NULL;
|
||||
|
||||
/* Add it to the write_completed_queue where it will have its
|
||||
* callback called in the near future.
|
||||
* TODO: start trying to write the next request.
|
||||
*/
|
||||
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Either we've counted n down to zero or we've got EAGAIN. */
|
||||
assert(n == 0 || n == -1);
|
||||
|
||||
/* We're not done. */
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
static void uv__write_callbacks(uv_stream_t* stream) {
|
||||
int callbacks_made = 0;
|
||||
ngx_queue_t* q;
|
||||
uv_write_t* req;
|
||||
|
||||
while (!ngx_queue_empty(&stream->write_completed_queue)) {
|
||||
/* Pop a req off write_completed_queue. */
|
||||
q = ngx_queue_head(&stream->write_completed_queue);
|
||||
assert(q);
|
||||
req = ngx_queue_data(q, struct uv_write_s, queue);
|
||||
ngx_queue_remove(q);
|
||||
|
||||
/* NOTE: call callback AFTER freeing the request data. */
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
|
||||
callbacks_made++;
|
||||
}
|
||||
|
||||
assert(ngx_queue_empty(&stream->write_completed_queue));
|
||||
|
||||
/* Write queue drained. */
|
||||
if (!uv_write_queue_head(stream)) {
|
||||
uv__drain(stream);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static void uv__read(uv_stream_t* stream) {
|
||||
uv_buf_t buf;
|
||||
ssize_t nread;
|
||||
struct ev_loop* ev = stream->loop->ev;
|
||||
|
||||
/* XXX: Maybe instead of having UV_READING we just test if
|
||||
* tcp->read_cb is NULL or not?
|
||||
*/
|
||||
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
|
||||
assert(stream->alloc_cb);
|
||||
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
|
||||
|
||||
assert(buf.len > 0);
|
||||
assert(buf.base);
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
do {
|
||||
nread = read(stream->fd, buf.base, buf.len);
|
||||
}
|
||||
while (nread < 0 && errno == EINTR);
|
||||
|
||||
if (nread < 0) {
|
||||
/* Error */
|
||||
if (errno == EAGAIN) {
|
||||
/* Wait for the next one. */
|
||||
if (stream->flags & UV_READING) {
|
||||
ev_io_start(ev, &stream->read_watcher);
|
||||
}
|
||||
uv_err_new(stream->loop, EAGAIN);
|
||||
stream->read_cb(stream, 0, buf);
|
||||
return;
|
||||
} else {
|
||||
/* Error. User should call uv_close(). */
|
||||
uv_err_new(stream->loop, errno);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
assert(!ev_is_active(&stream->read_watcher));
|
||||
return;
|
||||
}
|
||||
} else if (nread == 0) {
|
||||
/* EOF */
|
||||
uv_err_new_artificial(stream->loop, UV_EOF);
|
||||
ev_io_stop(ev, &stream->read_watcher);
|
||||
stream->read_cb(stream, -1, buf);
|
||||
return;
|
||||
} else {
|
||||
/* Successful read */
|
||||
stream->read_cb(stream, nread, buf);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
|
||||
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
|
||||
"uv_shutdown (unix) only supports uv_handle_t right now");
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
if (!(stream->flags & UV_WRITABLE) ||
|
||||
stream->flags & UV_SHUT ||
|
||||
stream->flags & UV_CLOSED ||
|
||||
stream->flags & UV_CLOSING) {
|
||||
uv_err_new(stream->loop, EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Initialize request */
|
||||
uv__req_init((uv_req_t*)req);
|
||||
req->handle = stream;
|
||||
req->cb = cb;
|
||||
|
||||
stream->shutdown_req = req;
|
||||
req->type = UV_SHUTDOWN;
|
||||
|
||||
((uv_handle_t*)stream)->flags |= UV_SHUTTING;
|
||||
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void uv__stream_io(EV_P_ ev_io* watcher, int revents) {
|
||||
uv_stream_t* stream = watcher->data;
|
||||
|
||||
assert(stream->type == UV_TCP ||
|
||||
stream->type == UV_NAMED_PIPE);
|
||||
assert(watcher == &stream->read_watcher ||
|
||||
watcher == &stream->write_watcher);
|
||||
assert(!(stream->flags & UV_CLOSING));
|
||||
|
||||
if (stream->connect_req) {
|
||||
uv__stream_connect(stream);
|
||||
} else {
|
||||
assert(revents & (EV_READ | EV_WRITE));
|
||||
assert(stream->fd >= 0);
|
||||
|
||||
if (revents & EV_READ) {
|
||||
uv__read((uv_stream_t*)stream);
|
||||
}
|
||||
|
||||
if (revents & EV_WRITE) {
|
||||
uv_write_t* req = uv__write(stream);
|
||||
if (req) {
|
||||
/* Error. Notify the user. */
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
} else {
|
||||
uv__write_callbacks(stream);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* We get called here from directly following a call to connect(2).
|
||||
* In order to determine if we've errored out or succeeded must call
|
||||
* getsockopt.
|
||||
*/
|
||||
static void uv__stream_connect(uv_stream_t* stream) {
|
||||
int error;
|
||||
uv_connect_t* req = stream->connect_req;
|
||||
socklen_t errorsize = sizeof(int);
|
||||
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
|
||||
assert(req);
|
||||
|
||||
if (stream->delayed_error) {
|
||||
/* To smooth over the differences between unixes errors that
|
||||
* were reported synchronously on the first connect can be delayed
|
||||
* until the next tick--which is now.
|
||||
*/
|
||||
error = stream->delayed_error;
|
||||
stream->delayed_error = 0;
|
||||
} else {
|
||||
/* Normal situation: we need to get the socket error from the kernel. */
|
||||
assert(stream->fd >= 0);
|
||||
getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
|
||||
}
|
||||
|
||||
if (!error) {
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
|
||||
/* Successful connection */
|
||||
stream->connect_req = NULL;
|
||||
if (req->cb) {
|
||||
req->cb(req, 0);
|
||||
}
|
||||
|
||||
} else if (error == EINPROGRESS) {
|
||||
/* Still connecting. */
|
||||
return;
|
||||
} else {
|
||||
/* Error */
|
||||
uv_err_new(stream->loop, error);
|
||||
|
||||
stream->connect_req = NULL;
|
||||
if (req->cb) {
|
||||
req->cb(req, -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int uv__connect(uv_connect_t* req, uv_stream_t* stream, struct sockaddr* addr,
|
||||
socklen_t addrlen, uv_connect_cb cb) {
|
||||
int sockfd;
|
||||
int r;
|
||||
|
||||
if (stream->fd <= 0) {
|
||||
if ((sockfd = uv__socket(addr->sa_family, SOCK_STREAM, 0)) == -1) {
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (uv__stream_open(stream, sockfd, UV_READABLE | UV_WRITABLE)) {
|
||||
uv__close(sockfd);
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
|
||||
uv__req_init((uv_req_t*)req);
|
||||
req->cb = cb;
|
||||
req->handle = stream;
|
||||
req->type = UV_CONNECT;
|
||||
ngx_queue_init(&req->queue);
|
||||
|
||||
if (stream->connect_req) {
|
||||
uv_err_new(stream->loop, EALREADY);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (stream->type != UV_TCP) {
|
||||
uv_err_new(stream->loop, ENOTSOCK);
|
||||
return -1;
|
||||
}
|
||||
|
||||
stream->connect_req = req;
|
||||
|
||||
do {
|
||||
r = connect(stream->fd, addr, addrlen);
|
||||
}
|
||||
while (r == -1 && errno == EINTR);
|
||||
|
||||
stream->delayed_error = 0;
|
||||
|
||||
if (r != 0 && errno != EINPROGRESS) {
|
||||
switch (errno) {
|
||||
/* If we get a ECONNREFUSED wait until the next tick to report the
|
||||
* error. Solaris wants to report immediately--other unixes want to
|
||||
* wait.
|
||||
*/
|
||||
case ECONNREFUSED:
|
||||
stream->delayed_error = errno;
|
||||
break;
|
||||
|
||||
default:
|
||||
uv_err_new(stream->loop, errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
assert(stream->write_watcher.data == stream);
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
|
||||
if (stream->delayed_error) {
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/* The buffers to be written must remain valid until the callback is called.
|
||||
* This is not required for the uv_buf_t array.
|
||||
*/
|
||||
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
|
||||
uv_write_cb cb) {
|
||||
uv_stream_t* stream;
|
||||
int empty_queue;
|
||||
|
||||
stream = (uv_stream_t*)handle;
|
||||
|
||||
/* Initialize the req */
|
||||
uv__req_init((uv_req_t*) req);
|
||||
req->cb = cb;
|
||||
req->handle = handle;
|
||||
ngx_queue_init(&req->queue);
|
||||
|
||||
assert((handle->type == UV_TCP || handle->type == UV_NAMED_PIPE)
|
||||
&& "uv_write (unix) does not yet support other types of streams");
|
||||
|
||||
empty_queue = (stream->write_queue_size == 0);
|
||||
|
||||
if (stream->fd < 0) {
|
||||
uv_err_new(stream->loop, EBADF);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ngx_queue_init(&req->queue);
|
||||
req->type = UV_WRITE;
|
||||
|
||||
|
||||
if (bufcnt < UV_REQ_BUFSML_SIZE) {
|
||||
req->bufs = req->bufsml;
|
||||
}
|
||||
else {
|
||||
req->bufs = malloc(sizeof(uv_buf_t) * bufcnt);
|
||||
}
|
||||
|
||||
memcpy(req->bufs, bufs, bufcnt * sizeof(uv_buf_t));
|
||||
req->bufcnt = bufcnt;
|
||||
|
||||
/*
|
||||
* fprintf(stderr, "cnt: %d bufs: %p bufsml: %p\n", bufcnt, req->bufs, req->bufsml);
|
||||
*/
|
||||
|
||||
req->write_index = 0;
|
||||
stream->write_queue_size += uv__buf_count(bufs, bufcnt);
|
||||
|
||||
/* Append the request to write_queue. */
|
||||
ngx_queue_insert_tail(&stream->write_queue, &req->queue);
|
||||
|
||||
assert(!ngx_queue_empty(&stream->write_queue));
|
||||
assert(stream->write_watcher.cb == uv__stream_io);
|
||||
assert(stream->write_watcher.data == stream);
|
||||
assert(stream->write_watcher.fd == stream->fd);
|
||||
|
||||
/* If the queue was empty when this function began, we should attempt to
|
||||
* do the write immediately. Otherwise start the write_watcher and wait
|
||||
* for the fd to become writable.
|
||||
*/
|
||||
if (empty_queue) {
|
||||
if (uv__write(stream)) {
|
||||
/* Error. uv_last_error has been set. */
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
/* If the queue is now empty - we've flushed the request already. That
|
||||
* means we need to make the callback. The callback can only be done on a
|
||||
* fresh stack so we feed the event loop in order to service it.
|
||||
*/
|
||||
if (ngx_queue_empty(&stream->write_queue)) {
|
||||
ev_feed_event(stream->loop->ev, &stream->write_watcher, EV_WRITE);
|
||||
} else {
|
||||
/* Otherwise there is data to write - so we should wait for the file
|
||||
* descriptor to become writable.
|
||||
*/
|
||||
ev_io_start(stream->loop->ev, &stream->write_watcher);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
|
||||
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE);
|
||||
|
||||
if (stream->flags & UV_CLOSING) {
|
||||
uv_err_new(stream->loop, EINVAL);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* The UV_READING flag is irrelevant of the state of the tcp - it just
|
||||
* expresses the desired state of the user.
|
||||
*/
|
||||
((uv_handle_t*)stream)->flags |= UV_READING;
|
||||
|
||||
/* TODO: try to do the read inline? */
|
||||
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
|
||||
* not start the IO watcher.
|
||||
*/
|
||||
assert(stream->fd >= 0);
|
||||
assert(alloc_cb);
|
||||
|
||||
stream->read_cb = read_cb;
|
||||
stream->alloc_cb = alloc_cb;
|
||||
|
||||
/* These should have been set by uv_tcp_init. */
|
||||
assert(stream->read_watcher.cb == uv__stream_io);
|
||||
|
||||
ev_io_start(stream->loop->ev, &stream->read_watcher);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_read_stop(uv_stream_t* stream) {
|
||||
uv_tcp_t* tcp = (uv_tcp_t*)stream;
|
||||
|
||||
((uv_handle_t*)tcp)->flags &= ~UV_READING;
|
||||
|
||||
ev_io_stop(tcp->loop->ev, &tcp->read_watcher);
|
||||
tcp->read_cb = NULL;
|
||||
tcp->alloc_cb = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -160,3 +160,67 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_connect(uv_connect_t* req,
|
||||
uv_tcp_t* handle,
|
||||
struct sockaddr_in address,
|
||||
uv_connect_cb cb) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
if (handle->type != UV_TCP) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (address.sin_family != AF_INET) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
status = uv__connect(req,
|
||||
(uv_stream_t*)handle,
|
||||
(struct sockaddr*)&address,
|
||||
sizeof address,
|
||||
cb);
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
|
||||
int uv_tcp_connect6(uv_connect_t* req,
|
||||
uv_tcp_t* handle,
|
||||
struct sockaddr_in6 address,
|
||||
uv_connect_cb cb) {
|
||||
int saved_errno;
|
||||
int status;
|
||||
|
||||
saved_errno = errno;
|
||||
status = -1;
|
||||
|
||||
if (handle->type != UV_TCP) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
if (address.sin6_family != AF_INET6) {
|
||||
uv_err_new(handle->loop, EINVAL);
|
||||
goto out;
|
||||
}
|
||||
|
||||
status = uv__connect(req,
|
||||
(uv_stream_t*)handle,
|
||||
(struct sockaddr*)&address,
|
||||
sizeof address,
|
||||
cb);
|
||||
|
||||
out:
|
||||
errno = saved_errno;
|
||||
return status;
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user