diff --git a/TODO b/TODO deleted file mode 100644 index 108b4db5..00000000 --- a/TODO +++ /dev/null @@ -1,9 +0,0 @@ -- "test-runner ping_pong" should run just the ping pong test with the echo -server process. - -- FATAL should take a string - -- ASSERT and FATAL should require semicolons. - -- Should oio_req need to be initialized before each use? Currently in Linux - this is necessary. diff --git a/oio-unix.c b/oio-unix.c index 2b146a9b..08cf7c5a 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -26,6 +26,7 @@ size_t strnlen (register const char* s, size_t maxlen) { void oio_tcp_io(EV_P_ ev_io* watcher, int revents); +void oio__next(EV_P_ ev_idle* watcher, int revents); void oio_tcp_connect(oio_handle* handle); int oio_tcp_open(oio_handle*, int fd); void oio_finish_close(oio_handle* handle); @@ -78,12 +79,12 @@ struct sockaddr_in oio_ip4_addr(char *ip, int port) { int oio_close(oio_handle* handle) { oio_flag_set(handle, OIO_CLOSING); - if (!ev_is_active(&handle->write_watcher)) { - ev_io_start(EV_DEFAULT_ &handle->write_watcher); - } + ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); - ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_WRITE); - assert(ev_is_pending(&handle->write_watcher)); + ev_idle_start(EV_DEFAULT_ &handle->next_watcher); + ev_feed_event(EV_DEFAULT_ &handle->next_watcher, EV_IDLE); + assert(ev_is_pending(&handle->next_watcher)); return 0; } @@ -105,13 +106,21 @@ int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb, handle->close_cb = close_cb; handle->data = data; handle->flags = 0; - handle->connect_req = NULL; handle->accepted_fd = -1; handle->fd = -1; ngx_queue_init(&handle->read_reqs); + ev_init(&handle->next_watcher, oio__next); + handle->next_watcher.data = handle; + + ev_init(&handle->read_watcher, oio_tcp_io); + handle->read_watcher.data = handle; + + ev_init(&handle->write_watcher, oio_tcp_io); + handle->write_watcher.data = handle; + return 0; } @@ -154,37 +163,41 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) { int oio_tcp_open(oio_handle* handle, int fd) { - /* Set non-blocking, etc */ + assert(fd >= 0); + handle->fd = fd; + + /* Set non-blocking. */ int yes = 1; int r = fcntl(fd, F_SETFL, O_NONBLOCK); assert(r == 0); + + /* Reuse the port address. */ r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); assert(r == 0); - handle->fd = fd; - + /* Initialize the queue structure for oio_read() requests. */ ngx_queue_init(&handle->read_reqs); - ev_io_init(&handle->read_watcher, oio_tcp_io, fd, EV_READ); - ev_io_init(&handle->write_watcher, oio_tcp_io, fd, EV_WRITE); + /* Associate the fd with each ev_io watcher. */ + ev_io_set(&handle->read_watcher, fd, EV_READ); + ev_io_set(&handle->write_watcher, fd, EV_WRITE); - handle->read_watcher.data = handle; - handle->write_watcher.data = handle; + /* These should have been set up by oio_tcp_handle_init. */ + assert(handle->next_watcher.data == handle); + assert(handle->write_watcher.data == handle); + assert(handle->read_watcher.data == handle); return 0; } -void oio_server_io(EV_P_ ev_io* watcher, int revents) { +void oio__server_io(EV_P_ ev_io* watcher, int revents) { oio_handle* handle = watcher->data; assert(watcher == &handle->read_watcher || watcher == &handle->write_watcher); assert(revents == EV_READ); - if (oio_flag_is_set(handle, OIO_CLOSING)) { - oio_finish_close(handle); - return; - } + assert(!oio_flag_is_set(handle, OIO_CLOSING)); if (handle->accepted_fd >= 0) { ev_io_stop(EV_DEFAULT_ &handle->read_watcher); @@ -251,20 +264,25 @@ int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb) { } handle->accept_cb = cb; - ev_io_init(&handle->read_watcher, oio_server_io, handle->fd, EV_READ); + + /* Start listening for connections. */ + ev_io_set(&handle->read_watcher, handle->fd, EV_READ); + ev_set_cb(&handle->read_watcher, oio__server_io); ev_io_start(EV_DEFAULT_ &handle->read_watcher); - handle->read_watcher.data = handle; return 0; } void oio_finish_close(oio_handle* handle) { + assert(oio_flag_is_set(handle, OIO_CLOSING)); assert(!oio_flag_is_set(handle, OIO_CLOSED)); oio_flag_set(handle, OIO_CLOSED); ev_io_stop(EV_DEFAULT_ &handle->read_watcher); ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + ev_idle_stop(EV_DEFAULT_ &handle->next_watcher); + close(handle->fd); handle->fd = -1; @@ -298,6 +316,19 @@ int oio_read_reqs_empty(oio_handle* handle) { } +void oio__next(EV_P_ ev_idle* watcher, int revents) { + oio_handle* handle = watcher->data; + assert(watcher == &handle->next_watcher); + assert(revents == EV_IDLE); + + /* For now this function is only to handle the closing event, but we might + * put more stuff here later. + */ + assert(oio_flag_is_set(handle, OIO_CLOSING)); + oio_finish_close(handle); +} + + void oio__read(oio_handle* handle) { int errorno; assert(handle->fd >= 0); @@ -368,10 +399,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { assert(handle->fd >= 0); - if (oio_flag_is_set(handle, OIO_CLOSING)) { - oio_finish_close(handle); - return; - } + assert(!oio_flag_is_set(handle, OIO_CLOSING)); if (handle->connect_req) { oio_tcp_connect(handle); diff --git a/oio-unix.h b/oio-unix.h index 10af900a..2506127b 100644 --- a/oio-unix.h +++ b/oio-unix.h @@ -35,6 +35,7 @@ typedef struct { oio_req *connect_req; \ ev_io read_watcher; \ ev_io write_watcher; \ + ev_idle next_watcher; \ ngx_queue_t write_queue; \ ngx_queue_t read_reqs; diff --git a/oio.h b/oio.h index cf76085d..9f1e56bf 100644 --- a/oio.h +++ b/oio.h @@ -60,6 +60,7 @@ struct oio_handle_s { oio_handle_private_fields }; + struct oio_req_s { /* read-only */ oio_req_type type; diff --git a/test/echo-server.c b/test/echo-server.c index 3f2b2d83..b269e129 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -64,8 +64,12 @@ void on_close(oio_handle* peer, oio_err err) { void on_accept(oio_handle* server) { peer_t* p = (peer_t*)calloc(sizeof(peer_t), 1); - if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p)) + int r = oio_tcp_handle_init(&p->handle, on_close, (void*)p); + ASSERT(!r) + + if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p)) { FATAL(oio_tcp_handle_accept failed) + } p->buf.base = (char*)&p->read_buffer;