diff --git a/Makefile b/Makefile index 71a66725..bf6a493c 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: test/echo-demo test/test-ping-pong +all: oio.a test/echo-demo: test/echo-demo.c test/echo.o oio.a $(CC) -ansi -g -o test/echo-demo test/echo-demo.c test/echo.o oio.a -lm diff --git a/oio-unix.c b/oio-unix.c index 65875dd1..80ce3a97 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -32,12 +32,12 @@ int oio_close_error(oio_handle* handle, oio_err err); static oio_err oio_err_new(oio_handle* handle, int e) { - handle->_.err = e; + handle->err = e; return e; } oio_err oio_err_last(oio_handle *handle) { - return handle->_.err; + return handle->err; } @@ -101,7 +101,7 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) { int domain; int r; - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); if (addr->sa_family == AF_INET) { addrsize = sizeof(struct sockaddr_in); @@ -114,7 +114,7 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) { return -1; } - r = bind(handle->_.fd, addr, addrsize); + r = bind(handle->fd, addr, addrsize); return oio_err_new(handle, r); } @@ -135,15 +135,15 @@ int oio_tcp_open(oio_handle* handle, int fd) { /* Set non-blocking, etc */ oio_tcp_init_fd(fd); - handle->_.fd = fd; + handle->fd = fd; - ngx_queue_init(&handle->_.read_reqs); + ngx_queue_init(&handle->read_reqs); - ev_io_init(&handle->_.read_watcher, oio_tcp_io, fd, EV_READ); - ev_io_init(&handle->_.write_watcher, oio_tcp_io, fd, EV_WRITE); + ev_io_init(&handle->read_watcher, oio_tcp_io, fd, EV_READ); + ev_io_init(&handle->write_watcher, oio_tcp_io, fd, EV_WRITE); - handle->_.read_watcher.data = handle; - handle->_.write_watcher.data = handle; + handle->read_watcher.data = handle; + handle->write_watcher.data = handle; return 0; } @@ -157,7 +157,7 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { while (1) { struct sockaddr addr; socklen_t addrlen; - int fd = accept(handle->_.fd, &addr, &addrlen); + int fd = accept(handle->fd, &addr, &addrlen); if (fd < 0) { if (errno == EAGAIN) { @@ -180,8 +180,8 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { if (oio_tcp_open(new_client, fd)) { /* Ignore error for now */ } else { - ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); - handle->accept_cb(handle, new_client); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); + handle->accept_cb(handle); } } } @@ -191,26 +191,26 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb) { - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); - int r = listen(handle->_.fd, backlog); + int r = listen(handle->fd, backlog); if (r < 0) { return oio_err_new(handle, errno); } handle->accept_cb = cb; - ev_io_init(&handle->_.read_watcher, oio_server_io, handle->_.fd, EV_READ); - ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); - handle->_.read_watcher.data = handle; + ev_io_init(&handle->read_watcher, oio_server_io, handle->fd, EV_READ); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); + handle->read_watcher.data = handle; return 0; } int oio_close_error(oio_handle* handle, oio_err err) { - ev_io_stop(EV_DEFAULT_ &handle->_.read_watcher); - close(handle->_.fd); - handle->_.fd = -1; + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); + close(handle->fd); + handle->fd = -1; if (handle->close_cb) { handle->close_cb(handle, err); @@ -221,54 +221,52 @@ int oio_close_error(oio_handle* handle, oio_err err) { oio_req* oio_read_reqs_head(oio_handle* handle) { - ngx_queue_t* q = ngx_queue_head(&(handle->_.read_reqs)); + ngx_queue_t* q = ngx_queue_head(&(handle->read_reqs)); if (!q) { return NULL; } - oio_req_private* p = ngx_queue_data(q, oio_req_private, read_reqs); - assert(p); - int off = offsetof(oio_req, _); - oio_req* req = (oio_req*) ((char*)p - off); + oio_req* req = ngx_queue_data(q, struct oio_req_s, read_reqs); + assert(req); return req; } int oio_read_reqs_empty(oio_handle* handle) { - return ngx_queue_empty(&(handle->_.read_reqs)); + return ngx_queue_empty(&(handle->read_reqs)); } void oio__read(oio_handle* handle) { - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); /* Get the request at the head of the read_reqs queue. */ oio_req* req = oio_read_reqs_head(handle); if (!req) { - ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher)); + ev_io_stop(EV_DEFAULT_ &(handle->read_watcher)); return; } /* Cast to iovec. We had to have our own oio_buf instead of iovec * because Windows's WSABUF is not an iovec. */ - struct iovec* iov = (struct iovec*) req->_.read_bufs; - int iovcnt = req->_.read_bufcnt; + struct iovec* iov = (struct iovec*) req->read_bufs; + int iovcnt = req->read_bufcnt; assert(iov); assert(iovcnt > 0); /* Now do the actual read. */ - ssize_t nread = readv(handle->_.fd, iov, iovcnt); + ssize_t nread = readv(handle->fd, iov, iovcnt); oio_read_cb cb = req->cb; if (nread < 0) { if (errno == EAGAIN) { /* Just wait for the next one. */ - assert(ev_is_active(&(handle->_.read_watcher))); + assert(ev_is_active(&(handle->read_watcher))); } else { oio_err err = oio_err_new(handle, errno); if (cb) { @@ -279,13 +277,13 @@ void oio__read(oio_handle* handle) { } else { /* Successful read */ - /* First pop the req off handle->_.read_reqs */ - ngx_queue_remove(&(req->_.read_reqs)); + /* First pop the req off handle->read_reqs */ + ngx_queue_remove(&(req->read_reqs)); - /* Must free req if local. Also must free req->_.read_bufs. */ - free(req->_.read_bufs); - req->_.read_bufs = NULL; - if (req->_.local) { + /* Must free req if local. Also must free req->read_bufs. */ + free(req->read_bufs); + req->read_bufs = NULL; + if (req->local) { free(req); } @@ -295,7 +293,7 @@ void oio__read(oio_handle* handle) { } if (oio_read_reqs_empty(handle)) { - ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher)); + ev_io_stop(EV_DEFAULT_ &(handle->read_watcher)); } } } @@ -304,10 +302,10 @@ void oio__read(oio_handle* handle) { void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { oio_handle* handle = watcher->data; - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); - if (handle->_.connect_req) { - oio_tcp_connect(handle, handle->_.connect_req); + if (handle->connect_req) { + oio_tcp_connect(handle, handle->connect_req); } else { if (revents & EV_READ) { oio__read(handle); @@ -326,21 +324,21 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { * getsockopt. */ void oio_tcp_connect(oio_handle* handle, oio_req* req) { - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); assert(req); int error; int errorsize = sizeof(int); - getsockopt(handle->_.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); + getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); if (!error) { - ev_io_init(&handle->_.write_watcher, oio_tcp_io, handle->_.fd, EV_WRITE); - ev_set_cb(&handle->_.read_watcher, oio_tcp_io); + ev_io_init(&handle->write_watcher, oio_tcp_io, handle->fd, EV_WRITE); + ev_set_cb(&handle->read_watcher, oio_tcp_io); /* Successful connection */ oio_connect_cb connect_cb = req->cb; if (connect_cb) { - if (req->_.local) { + if (req->local) { connect_cb(NULL, oio_err_new(handle, 0)); } else { connect_cb(req, oio_err_new(handle, 0)); @@ -348,7 +346,7 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { } /* Free up connect_req if we own it. */ - if (req->_.local) { + if (req->local) { free(req); } @@ -361,8 +359,8 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { } else { oio_err err = oio_err_new(handle, error); - if (req->_.connect_cb) { - req->_.connect_cb(req, err); + if (req->connect_cb) { + req->connect_cb(req, err); } oio_close_error(handle, err); @@ -372,23 +370,25 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { oio_req* oio_req_maybe_alloc(oio_handle* handle, oio_req* in_req) { if (in_req) { - ngx_queue_init(&(in_req->_.read_reqs)); + ngx_queue_init(&(in_req->read_reqs)); in_req->handle = handle; - in_req->_.local = 0; + in_req->local = 0; return in_req; } else { oio_req *req = malloc(sizeof(oio_req)); - oio_req_init(req, NULL); + oio_req_init(req, NULL, NULL); req->handle = handle; - ngx_queue_init(&(req->_.read_reqs)); - req->_.local = 1; + ngx_queue_init(&(req->read_reqs)); + req->local = 1; return req; } } -int oio_connect(oio_handle* handle, oio_req *req_in, struct sockaddr* addr) { - if (handle->_.connect_req) { +int oio_connect(oio_req *req_in, struct sockaddr* addr) { + oio_handle* handle = req_in->handle; + + if (handle->connect_req) { return oio_err_new(handle, EALREADY); } @@ -401,42 +401,43 @@ int oio_connect(oio_handle* handle, oio_req *req_in, struct sockaddr* addr) { return oio_err_new(handle, ENOMEM); } - handle->_.connect_req = req; + handle->connect_req = req; int addrsize; if (addr->sa_family == AF_INET) { addrsize = sizeof(struct sockaddr_in); - handle->_.fd = socket(AF_INET, SOCK_STREAM, 0); + handle->fd = socket(AF_INET, SOCK_STREAM, 0); } else if (addr->sa_family == AF_INET6) { addrsize = sizeof(struct sockaddr_in6); - handle->_.fd = socket(AF_INET6, SOCK_STREAM, 0); + handle->fd = socket(AF_INET6, SOCK_STREAM, 0); } else { assert(0); return -1; } /* socket(2) failed */ - if (handle->_.fd < 0) { + if (handle->fd < 0) { return oio_err_new(handle, errno); } - int r = connect(handle->_.fd, addr, addrsize); + int r = connect(handle->fd, addr, addrsize); - ev_io_init(&handle->_.read_watcher, oio_tcp_io, handle->_.fd, EV_READ); - ev_io_init(&handle->_.write_watcher, oio_tcp_io, handle->_.fd, EV_WRITE); - ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + ev_io_init(&(handle->read_watcher), oio_tcp_io, handle->fd, EV_READ); + ev_io_init(&(handle->write_watcher), oio_tcp_io, handle->fd, EV_WRITE); + ev_io_start(EV_DEFAULT_ &(handle->read_watcher)); return oio_err_new(handle, r); } -int oio_write(oio_handle* handle, oio_req *req, oio_buf* bufs, int bufcnt) { - assert(handle->_.fd >= 0); +int oio_write(oio_req *req, oio_buf* bufs, int bufcnt) { + oio_handle* handle = req->handle; + assert(handle->fd >= 0); ssize_t r; - r = writev(handle->_.fd, (struct iovec*)bufs, bufcnt); + r = writev(handle->fd, (struct iovec*)bufs, bufcnt); if (r < 0) { return oio_err_new(handle, r); @@ -450,25 +451,26 @@ int oio_write(oio_handle* handle, oio_req *req, oio_buf* bufs, int bufcnt) { } -int oio_write2(oio_handle* handle, const char* msg) { +int oio_write2(oio_req* req, const char* msg) { size_t len = strnlen(msg, 1024 * 1024); oio_buf b; b.base = (char*)msg; b.len = len; - return oio_write(handle, NULL, &b, 1); + return oio_write(req, &b, 1); } -int oio_read(oio_handle* handle, oio_req *req_in, oio_buf* bufs, int bufcnt) { +int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { + oio_handle* handle = req_in->handle; ssize_t nread = -1; errno = EAGAIN; oio_read_cb cb = req_in->cb; - assert(handle->_.fd >= 0); + assert(handle->fd >= 0); - if (ngx_queue_empty(&handle->_.read_reqs)) { + if (ngx_queue_empty(&handle->read_reqs)) { /* Attempt to read immediately. */ - ssize_t nread = readv(handle->_.fd, (struct iovec*) bufs, bufcnt); + ssize_t nread = readv(handle->fd, (struct iovec*) bufs, bufcnt); } if (nread < 0 && errno != EAGAIN) { @@ -491,16 +493,16 @@ int oio_read(oio_handle* handle, oio_req *req_in, oio_buf* bufs, int bufcnt) { } /* Either we never read anything, or we got EAGAIN. */ - assert(!ngx_queue_empty(&handle->_.read_reqs) || + assert(!ngx_queue_empty(&handle->read_reqs) || (nread < 0 && errno == EAGAIN)); /* Two possible states: * - EAGAIN, meaning the socket is not wriable currently. We must wait for - * it to become readable with the handle->_.read_watcher. + * it to become readable with the handle->read_watcher. * - The read_reqs queue already has reads. Meaning: the user has issued * many oio_reads calls some of which are still waiting for the socket to * become readable. - * In the meantime we append the request to handle->_.read_reqs + * In the meantime we append the request to handle->read_reqs */ oio_req* req = oio_req_maybe_alloc(handle, req_in); if (!req) { @@ -511,14 +513,14 @@ int oio_read(oio_handle* handle, oio_req *req_in, oio_buf* bufs, int bufcnt) { * free the oio_buf array. The actual data inside the oio_bufs is however * owned by the user and cannot be deallocated until the read completes. */ - req->_.read_bufs = malloc(sizeof(oio_buf) * bufcnt); - memcpy(req->_.read_bufs, bufs, bufcnt * sizeof(oio_buf)); - req->_.read_bufcnt = bufcnt; + req->read_bufs = malloc(sizeof(oio_buf) * bufcnt); + memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf)); + req->read_bufcnt = bufcnt; /* Append the request to read_reqs. */ - ngx_queue_insert_tail(&(handle->_.read_reqs), &(req->_.read_reqs)); + ngx_queue_insert_tail(&(handle->read_reqs), &(req->read_reqs)); - ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); return oio_err_new(handle, EINPROGRESS); } @@ -531,8 +533,9 @@ void oio_free(oio_handle* handle) { } -void oio_req_init(oio_req *req, void *cb) { +void oio_req_init(oio_req* req, oio_handle* handle, void* cb) { req->type = OIO_UNKNOWN_REQ; req->cb = cb; - ngx_queue_init(&(req->_.read_reqs)); + req->handle = handle; + ngx_queue_init(&(req->read_reqs)); } diff --git a/oio-unix.h b/oio-unix.h index 49d1b875..251d2614 100644 --- a/oio-unix.h +++ b/oio-unix.h @@ -20,33 +20,24 @@ typedef struct { } oio_buf; - -typedef struct { - int local; - oio_connect_cb connect_cb; - ngx_queue_t read_reqs; - oio_buf* read_bufs; +#define oio_req_private_fields \ + int local; \ + oio_connect_cb connect_cb; \ + ngx_queue_t read_reqs; \ + oio_buf* read_bufs; \ int read_bufcnt; -} oio_req_private; -typedef struct { - int fd; - - oio_err err; - - oio_read_cb read_cb; - oio_close_cb close_cb; - - oio_req *connect_req; - - ev_io read_watcher; - ev_io write_watcher; - - ngx_queue_t write_queue; +#define oio_handle_private_fields \ + int fd; \ + oio_err err; \ + oio_read_cb read_cb; \ + oio_accept_cb accept_cb; \ + oio_req *connect_req; \ + ev_io read_watcher; \ + ev_io write_watcher; \ + ngx_queue_t write_queue; \ ngx_queue_t read_reqs; -} oio_handle_private; - #endif /* OIO_UNIX_H */ diff --git a/oio.h b/oio.h index 3ee5b2d4..bf7bff0b 100644 --- a/oio.h +++ b/oio.h @@ -54,10 +54,10 @@ struct oio_handle_s { oio_close_cb close_cb; void* data; /* private */ - struct oio_handle_private_s; + oio_handle_private_fields }; -typedef struct oio_req_s { +struct oio_req_s { /* read-only */ oio_req_type type; /* public */ @@ -65,8 +65,8 @@ typedef struct oio_req_s { void* cb; void* data; /* private */ - struct oio_req_private_s; -} oio_req; + oio_req_private_fields +}; /**