diff --git a/ngx-queue.h b/ngx-queue.h index 9e422d6a..f8576d67 100644 --- a/ngx-queue.h +++ b/ngx-queue.h @@ -96,7 +96,7 @@ struct ngx_queue_s { #define ngx_queue_data(q, type, link) \ - (type *) ((u_char *) q - offsetof(type, link)) + (type *) ((unsigned char *) q - offsetof(type, link)) #endif /* _NGX_QUEUE_H_INCLUDED_ */ diff --git a/ol-unix.c b/ol-unix.c index 93f3d179..79023919 100644 --- a/ol-unix.c +++ b/ol-unix.c @@ -1,5 +1,7 @@ #include "ol.h" +#include /* printf */ + #include #include #include @@ -14,7 +16,7 @@ void ol_tcp_io(EV_P_ ev_io* watcher, int revents); void ol_tcp_connect(ol_handle* handle, ol_req* req); -ol_handle* ol_tcp_open(ol_handle* parent, int fd); +int ol_tcp_open(ol_handle*, int fd); int ol_close_error(ol_handle* handle, ol_err err); @@ -56,38 +58,38 @@ int ol_run() { ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data) { ol_handle *handle = calloc(sizeof(ol_handle), 1); + if (!handle) { + ol_err_new(NULL, ENOMEM); + return NULL; + } + handle->close_cb = close_cb; handle->data = data; - handle->_.fd = -1; + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + ol_err_new(handle, errno); + free(handle); + return NULL; + } - ngx_queue_init(&handle->_.read_reqs); - - ev_init(&handle->_.read_watcher, ol_tcp_io); - ev_init(&handle->_.write_watcher, ol_tcp_io); + if (ol_tcp_open(handle, fd)) { + close(fd); + free(handle); + return NULL; + } return handle; } -int ol_tcp_lazy_open(ol_handle* handle, int domain) { - assert(handle->_.fd < 0); - - /* Lazily allocate a file descriptor for this handle */ - int fd = socket(domain, SOCK_STREAM, 0); - - /* Set non-blocking, etc */ - ol_tcp_init_fd(fd); - - handle->_.fd = fd; - - return 0; -} - int ol_bind(ol_handle* handle, struct sockaddr* addr) { int addrsize; int domain; + int r; + + assert(handle->_.fd >= 0); if (addr->sa_family == AF_INET) { addrsize = sizeof(struct sockaddr_in); @@ -100,15 +102,6 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) { return -1; } - int r = 0; - - if (handle->_.fd < 0) { - r = ol_tcp_lazy_open(handle, domain); - if (r) { - return ol_err_new(handle, r); - } - } - r = bind(handle->_.fd, addr, addrsize); return ol_err_new(handle, r); @@ -126,17 +119,21 @@ int ol_tcp_init_fd(int fd) { } -ol_handle* ol_tcp_open(ol_handle* parent, int fd) { - ol_handle* h = ol_tcp_handle_new(NULL, NULL); - if (!h) { - return NULL; - } - h->_.fd = fd; - +int ol_tcp_open(ol_handle* handle, int fd) { /* Set non-blocking, etc */ ol_tcp_init_fd(fd); - return h; + handle->_.fd = fd; + + ngx_queue_init(&handle->_.read_reqs); + + ev_io_init(&handle->_.read_watcher, ol_tcp_io, fd, EV_READ); + ev_io_init(&handle->_.write_watcher, ol_tcp_io, fd, EV_WRITE); + + handle->_.read_watcher.data = handle; + handle->_.write_watcher.data = handle; + + return 0; } @@ -164,13 +161,17 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) { if (!handle->accept_cb) { close(fd); } else { - ol_handle* new_client = ol_tcp_open(handle, fd); + ol_handle* new_client = ol_tcp_handle_new(NULL, NULL); if (!new_client) { - ol_close_error(handle, ol_err_last(handle)); - return; + /* Ignore error for now */ + } else { + if (ol_tcp_open(new_client, fd)) { + /* Ignore error for now */ + } else { + ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + handle->accept_cb(handle, new_client); + } } - - handle->accept_cb(handle, new_client); } } } @@ -178,10 +179,7 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) { int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) { - if (handle->_.fd < 0) { - /* Lazily allocate a file descriptor for this handle */ - handle->_.fd = socket(AF_INET, SOCK_STREAM, 0); - } + assert(handle->_.fd >= 0); int r = listen(handle->_.fd, backlog); if (r < 0) { @@ -210,15 +208,103 @@ int ol_close_error(ol_handle* handle, ol_err err) { } +ol_req* ol_read_reqs_head(ol_handle* handle) { + ngx_queue_t* q = ngx_queue_head(&(handle->_.read_reqs)); + if (!q) { + return NULL; + } + + ol_req_private* p = ngx_queue_data(q, ol_req_private, read_reqs); + assert(p); + int off = offsetof(ol_req, _); + ol_req* req = (ol_req*) ((char*)p - off); + + return req; +} + + +int ol_read_reqs_empty(ol_handle* handle) { + return ngx_queue_empty(&(handle->_.read_reqs)); +} + + +void ol__read(ol_handle* handle) { + assert(handle->_.fd >= 0); + + /* Get the request at the head of the read_reqs queue. */ + ol_req* req = ol_read_reqs_head(handle); + if (!req) { + ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher)); + return; + } + + /* Cast to iovec. We had to have our own ol_buf instead of iovec + * because Windows's WSABUF is not an iovec. + */ + struct iovec* iov = (struct iovec*) req->_.read_bufs; + int iovcnt = req->_.read_bufcnt; + + assert(iov); + assert(iovcnt > 0); + + /* Now do the actual read. */ + + ssize_t nread = readv(handle->_.fd, iov, iovcnt); + + ol_read_cb cb = req->cb; + + if (nread < 0) { + if (errno == EAGAIN) { + /* Just wait for the next one. */ + assert(ev_is_active(&(handle->_.read_watcher))); + } else { + ol_err err = ol_err_new(handle, errno); + if (cb) { + cb(req, 0, err); + } + ol_close_error(handle, errno); + } + } else { + /* Successful read */ + + /* First pop the req off handle->_.read_reqs */ + ngx_queue_remove(&(req->_.read_reqs)); + + /* Must free req if local. Also must free req->_.read_bufs. */ + free(req->_.read_bufs); + req->_.read_bufs = NULL; + if (req->_.local) { + free(req); + } + + /* NOTE: call callback AFTER freeing the request data. */ + if (cb) { + cb(req, nread, 0); + } + + if (ol_read_reqs_empty(handle)) { + ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher)); + } + } +} + + void ol_tcp_io(EV_P_ ev_io* watcher, int revents) { ol_handle* handle = watcher->data; + assert(handle->_.fd >= 0); + if (handle->_.connect_req) { ol_tcp_connect(handle, handle->_.connect_req); } else { - } + if (revents & EV_READ) { + ol__read(handle); + } - assert(handle->_.fd >= 0); + if (revents & EV_WRITE) { + assert(0 && "Queued writes are not supported"); + } + } } @@ -275,19 +361,20 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) { ol_req* ol_req_maybe_alloc(ol_handle* handle, ol_req* in_req) { if (in_req) { + ngx_queue_init(&(in_req->_.read_reqs)); in_req->handle = handle; in_req->_.local = 0; return in_req; } else { ol_req *req = calloc(sizeof(ol_req), 1); req->handle = handle; + ngx_queue_init(&(req->_.read_reqs)); req->_.local = 1; return req; } } - int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) { if (handle->_.connect_req) { return ol_err_new(handle, EALREADY); @@ -332,7 +419,9 @@ int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) { return ol_err_new(handle, r); } + int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { + assert(handle->_.fd >= 0); ssize_t r; r = writev(handle->_.fd, (struct iovec*)bufs, bufcnt); @@ -340,6 +429,10 @@ int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { if (r < 0) { return ol_err_new(handle, r); } else { + if (req && req->cb) { + ol_write_cb cb = req->cb; + cb(req, 0); + } return 0; } } @@ -354,57 +447,68 @@ int ol_write2(ol_handle* handle, const char* msg) { } -void ol_req_append(ol_handle* handle, ol_req *req) { - ngx_queue_insert_tail(&handle->_.read_reqs, &req->_.read_reqs); -} - - int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) { + ssize_t nread = -1; + errno = EAGAIN; + ol_read_cb cb = req_in->cb; + assert(handle->_.fd >= 0); - if (!ngx_queue_empty(&handle->_.read_reqs)) { - /* There are already pending read_reqs. We must get in line. */ - assert(ev_is_active(&handle->_.read_watcher)); - - ol_req* req = ol_req_maybe_alloc(handle, req_in); - if (!req) { - return ol_err_new(handle, ENOMEM); - } - - ol_req_append(handle, req); - - return ol_err_new(handle, EINPROGRESS); - - } else { - /* Attempt to read immediately */ + if (ngx_queue_empty(&handle->_.read_reqs)) { + /* Attempt to read immediately. */ ssize_t nread = readv(handle->_.fd, (struct iovec*) bufs, bufcnt); - - ol_read_cb cb = req_in->cb; - - if (nread < 0) { - if (errno == EAGAIN) { - ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); - return 0; - } else { - ol_err err = ol_err_new(handle, errno); - - if (cb) { - cb(req_in, nread, err); - } - - return err; - } - - } else { - if (cb) { - cb(req_in, nread, 0); - } - return 0; - } } - assert(0 && "Unreachable"); - return 0; + if (nread < 0 && errno != EAGAIN) { + /* Real error. */ + ol_err err = ol_err_new(handle, errno); + + if (cb) { + cb(req_in, nread, err); + } + + return err; + } + + if (nread >= 0) { + /* Successful read. */ + if (cb) { + cb(req_in, nread, 0); + } + return 0; + } + + /* Either we never read anything, or we got EAGAIN. */ + assert(!ngx_queue_empty(&handle->_.read_reqs) || + (nread < 0 && errno == EAGAIN)); + + /* Two possible states: + * - EAGAIN, meaning the socket is not wriable currently. We must wait for + * it to become readable with the handle->_.read_watcher. + * - The read_reqs queue already has reads. Meaning: the user has issued + * many ol_reads calls some of which are still waiting for the socket to + * become readable. + * In the meantime we append the request to handle->_.read_reqs + */ + ol_req* req = ol_req_maybe_alloc(handle, req_in); + if (!req) { + return ol_err_new(handle, ENOMEM); + } + + /* Copy the bufs data over into our ol_req struct. This is so the user can + * free the ol_buf array. The actual data inside the ol_bufs is however + * owned by the user and cannot be deallocated until the read completes. + */ + req->_.read_bufs = malloc(sizeof(ol_buf) * bufcnt); + memcpy(req->_.read_bufs, bufs, bufcnt * sizeof(ol_buf)); + req->_.read_bufcnt = bufcnt; + + /* Append the request to read_reqs. */ + ngx_queue_insert_tail(&(handle->_.read_reqs), &(req->_.read_reqs)); + + ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + + return ol_err_new(handle, EINPROGRESS); } diff --git a/ol-unix.h b/ol-unix.h index 1bcf7127..0f646462 100644 --- a/ol-unix.h +++ b/ol-unix.h @@ -25,6 +25,8 @@ typedef struct { int local; ol_req_cb connect_cb; ngx_queue_t read_reqs; + ol_buf* read_bufs; + int read_bufcnt; } ol_req_private;