diff --git a/Makefile b/Makefile index fb4a84ff..861ac91f 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,11 @@ test/echo-server: test/echo-server.c ol.a - $(CC) -o test/echo-server test/echo-server.c ol.a -lm + $(CC) -g -o test/echo-server test/echo-server.c ol.a -lm ol.a: ol-unix.o ev/ev.o $(AR) rcs ol.a ol-unix.o ev/ev.o ol-unix.o: ol-unix.c ol.h ol-unix.h - $(CC) -c ol-unix.c -o ol-unix.o -lm + $(CC) -g -c ol-unix.c -o ol-unix.o -lm ev/ev.o: ev/config.h ev/ev.c $(MAKE) -C ev diff --git a/ol-unix.c b/ol-unix.c index 2b7b769a..c70ffc9d 100644 --- a/ol-unix.c +++ b/ol-unix.c @@ -11,10 +11,15 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req); int ol_close_error(ol_handle* handle, ol_err err); -static int ol_err_new(int e) { +static ol_err ol_err_new(ol_handle* handle, int e) { + handle->_.err = e; return e; } +ol_err ol_err_last(ol_handle *handle) { + return handle->_.err; +} + struct sockaddr_in ol_ip4_addr(char *ip, int port) { struct sockaddr_in addr; @@ -32,8 +37,13 @@ int ol_close(ol_handle* handle) { } +void ol_init() { + ev_default_loop(0); +} + + int ol_run() { - ev_run(0); + ev_run(EV_DEFAULT_ 0); } @@ -42,6 +52,8 @@ ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) { handle->close_cb = close_cb; handle->data = data; + handle->_.fd = -1; + ev_init(&handle->_.read_watcher, ol_tcp_io); ev_init(&handle->_.write_watcher, ol_tcp_io); @@ -49,33 +61,114 @@ ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) { } +int ol_tcp_lazy_open(ol_handle* handle, int domain) { + assert(handle->_.fd < 0); + + /* Lazily allocate a file descriptor for this handle */ + handle->_.fd = socket(domain, SOCK_STREAM, 0); + int yes = 1; + setsockopt(handle->_.fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + fcntl(handle->_.fd, F_SETFL, O_NONBLOCK); + + return 0; +} + + int ol_bind(ol_handle* handle, struct sockaddr* addr) { int addrsize; + int domain; if (addr->sa_family == AF_INET) { addrsize = sizeof(struct sockaddr_in); + domain = AF_INET; } else if (addr->sa_family == AF_INET6) { addrsize = sizeof(struct sockaddr_in6); + domain = AF_INET6; } else { assert(0); return -1; } - int r = bind(handle->_.fd, addr, addrsize); + int r = 0; - return ol_err_new(r); + if (handle->_.fd < 0) { + r = ol_tcp_lazy_open(handle, domain); + if (r) { + return ol_err_new(handle, r); + } + } + + r = bind(handle->_.fd, addr, addrsize); + + return ol_err_new(handle, r); +} + + +ol_handle* ol_tcp_open(ol_handle* parent, int fd) { + ol_handle* h = ol_handle_new(NULL, NULL); + h->_.fd = fd; + return h; +} + + +void ol_server_io(EV_P_ ev_io* watcher, int revents) { + ol_handle* handle = watcher->data; + + assert(revents == EV_READ); + + while (1) { + struct sockaddr addr; + socklen_t addrlen; + int fd = accept(handle->_.fd, &addr, &addrlen); + + if (fd < 0) { + if (errno == EAGAIN) { + return; // No problem. + } else if (errno == EMFILE) { + // TODO special trick. unlock reserved socket, accept, close. + return; + } else { + ol_close_error(handle, ol_err_new(handle, errno)); + } + } else { + if (handle->accept_cb) { + ol_handle* new_client = ol_tcp_open(handle, fd); + if (!new_client) { + ol_close_error(handle, ol_err_last(handle)); + return; + } + + handle->accept_cb(handle, new_client); + } else { + close(fd); + } + } + } } int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) { + if (handle->_.fd < 0) { + /* Lazily allocate a file descriptor for this handle */ + handle->_.fd = socket(AF_INET, SOCK_STREAM, 0); + } + int r = listen(handle->_.fd, backlog); + if (r < 0) { + return ol_err_new(handle, errno); + } + handle->accept_cb = cb; - return ol_err_new(r); + ev_io_init(&handle->_.read_watcher, ol_server_io, handle->_.fd, EV_READ); + ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + handle->_.read_watcher.data = handle; + + return 0; } int ol_close_error(ol_handle* handle, ol_err err) { - ev_io_stop(&handle->_.read_watcher); + ev_io_stop(EV_DEFAULT_ &handle->_.read_watcher); close(handle->_.fd); handle->_.fd = -1; @@ -121,9 +214,9 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) { ol_connect_cb connect_cb = req->connect_cb; if (connect_cb) { if (req->_.local) { - connect_cb(NULL, ol_err_new(0)); + connect_cb(NULL, ol_err_new(handle, 0)); } else { - connect_cb(req, ol_err_new(0)); + connect_cb(req, ol_err_new(handle, 0)); } } @@ -139,11 +232,13 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) { return; } else { + ol_err err = ol_err_new(handle, error); + if (req->_.connect_cb) { - req->_.connect_cb(req, ol_err_new(error)); + req->_.connect_cb(req, err); } - ol_close_error(handle, ol_err_new(error)); + ol_close_error(handle, err); } } @@ -165,16 +260,16 @@ ol_req* ol_req_maybe_alloc(ol_handle* handle, ol_req* in_req) { int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) { if (handle->_.connect_req) { - return ol_err_new(EALREADY); + return ol_err_new(handle, EALREADY); } if (handle->type != OL_TCP) { - return ol_err_new(ENOTSOCK); + return ol_err_new(handle, ENOTSOCK); } ol_req *req = ol_req_maybe_alloc(handle, req_in); if (!req) { - return ol_err_new(ENOMEM); + return ol_err_new(handle, ENOMEM); } handle->_.connect_req = req; @@ -195,16 +290,16 @@ int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) { /* socket(2) failed */ if (handle->_.fd < 0) { - return ol_err_new(errno); + return ol_err_new(handle, errno); } int r = connect(handle->_.fd, addr, addrsize); ev_io_init(&handle->_.read_watcher, ol_tcp_io, handle->_.fd, EV_READ); ev_io_init(&handle->_.write_watcher, ol_tcp_io, handle->_.fd, EV_WRITE); - ev_io_start(&handle->_.read_watcher); + ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); - return ol_err_new(r); + return ol_err_new(handle, r); } int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { @@ -223,9 +318,60 @@ int ol_write2(ol_handle* handle, const char* msg) { } -int ol_read(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { - // stub - assert(0); +void ol_req_append(ol_handle* handle, ol_req *req) { + ngx_queue_insert_tail(&handle->read_reqs, &req->read_queue); +} + + +int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) { + assert(handle->_.fd >= 0); + + if (!ngx_queue_empty(&handle->read_reqs)) { + /* There are already pending read_reqs. We must get in line. */ + assert(ev_is_active(&handle->read_watcher)); + + ol_req *req = ol_req_maybe_alloc(handle, req_in); + if (!req) { + return ol_err_new(handle, ENOMEM); + } + + ol_req_append(handle, req); + + return ol_err_new(handle, EINPROGRESS); + + } else { + /* Attempt to read immediately */ + ssize_t nread = readv(handle->_.fd, (struct iovec) bufs, bufcnt); + + if (nread < 0) { + + } + + ssize_t buftotal; + for (int i = 0; i < bufcnt; i++) { + buftotal += bufs.len; + if (buftotal == nread) { + /* We read everything */ + if (req_in.read_cb) { + req_in.read_cb(handle, req_in, nread, 0); + } + } + + if (buftotal > nread) break; + } + + + ol_req *req = ol_req_maybe_alloc(handle, req_in); + if (!req) { + return ol_err_new(handle, ENOMEM); + } + + // blah + } + + + ngx_queue_insert_tail(); + return 0; } diff --git a/ol-unix.h b/ol-unix.h index 8750b0ae..39bad08a 100644 --- a/ol-unix.h +++ b/ol-unix.h @@ -3,7 +3,6 @@ #include "ngx-queue.h" -#define EV_MULTIPLICITY 0 #include "ev/ev.h" #include @@ -42,6 +41,8 @@ typedef struct { typedef struct { int fd; + ol_err err; + ol_read_cb read_cb; ol_close_cb close_cb; diff --git a/ol.h b/ol.h index b7e32db7..53748b80 100644 --- a/ol.h +++ b/ol.h @@ -73,6 +73,7 @@ struct ol_req_s { }; +void ol_init(); int ol_run(); ol_handle* ol_handle_new(ol_close_cb close_cb, void* data); diff --git a/test/echo-server.c b/test/echo-server.c index 96227a44..9adf70ff 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -80,11 +80,23 @@ void on_accept(ol_handle* server, ol_handle* new_client) { int main(int argc, char** argv) { + ol_init(); + ol_handle* server = ol_handle_new(on_close, NULL); struct sockaddr_in addr = ol_ip4_addr("0.0.0.0", 8000); - ol_bind(server, (struct sockaddr*) &addr); - ol_listen(server, 128, on_accept); + + int r = ol_bind(server, (struct sockaddr*) &addr); + if (r) { + fprintf(stderr, "Bind error\n"); + return 1; + } + + r = ol_listen(server, 128, on_accept); + if (r) { + fprintf(stderr, "Listen error\n"); + return 1; + } ol_run();