From a2c24c67d602ce4762a6750327be23e5268ed518 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 15 Apr 2011 01:11:13 -0700 Subject: [PATCH] unix: echo-server works --- Makefile | 7 ++- TODO | 2 + oio-unix.c | 142 ++++++++++++++++++++++------------------------------- oio-unix.h | 1 + 4 files changed, 67 insertions(+), 85 deletions(-) create mode 100644 TODO diff --git a/Makefile b/Makefile index 15fe3ef8..8036b919 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -all: oio.a test/test-runner +all: oio.a TESTS=test/echo-server.c \ test/test-pass-always.c \ @@ -31,7 +31,10 @@ ev/config.h: cd ev && ./configure -.PHONY: clean distclean +.PHONY: clean distclean test + +test: test/test-runner + test/test-runner clean: $(RM) -f *.o *.a test/test-runner diff --git a/TODO b/TODO new file mode 100644 index 00000000..5b482307 --- /dev/null +++ b/TODO @@ -0,0 +1,2 @@ +"test-runner ping_pong" should run just the ping pong test with the echo +server process. diff --git a/oio-unix.c b/oio-unix.c index 2135e194..d2b00548 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -67,40 +67,30 @@ int oio_run() { } -int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb, void* data) { - return -1; -} - -/* Remove me */ -oio_handle* oio_tcp_handle_new(oio_close_cb close_cb, void* data) { - oio_handle *handle = calloc(sizeof(oio_handle), 1); - if (!handle) { - oio_err_new(NULL, ENOMEM); - return NULL; - } - +int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb, + void* data) { handle->type = OIO_TCP; handle->close_cb = close_cb; handle->data = data; + handle->accepted_fd = -1; + + ngx_queue_init(&handle->read_reqs); int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { oio_err_new(handle, errno); - free(handle); - return NULL; + return -1; } if (oio_tcp_open(handle, fd)) { close(fd); - free(handle); - return NULL; + return -2; } - return handle; + return 0; } - int oio_bind(oio_handle* handle, struct sockaddr* addr) { int addrsize; int domain; @@ -159,14 +149,22 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { assert(revents == EV_READ); + if (handle->accepted_fd >= 0) { + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); + return; + } + while (1) { struct sockaddr addr; socklen_t addrlen; + + assert(handle->accepted_fd < 0); int fd = accept(handle->fd, &addr, &addrlen); if (fd < 0) { if (errno == EAGAIN) { - return; /* No problem. */ + /* No problem. */ + return; } else if (errno == EMFILE) { /* TODO special trick. unlock reserved socket, accept, close. */ return; @@ -178,16 +176,12 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { if (!handle->accept_cb) { close(fd); } else { - oio_handle* new_client = oio_tcp_handle_new(NULL, NULL); - if (!new_client) { - /* Ignore error for now */ - } else { - if (oio_tcp_open(new_client, fd)) { - /* Ignore error for now */ - } else { - ev_io_start(EV_DEFAULT_ &handle->read_watcher); - handle->accept_cb(handle); - } + handle->accepted_fd = fd; + handle->accept_cb(handle); + if (handle->accepted_fd >= 0) { + /* The user hasn't yet accepted called oio_tcp_handle_accept() */ + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); + return; } } } @@ -195,6 +189,25 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { } +int oio_tcp_handle_accept(oio_handle* server, oio_handle* client, + oio_close_cb close_cb, void* data) { + if (server->accepted_fd < 0) { + return -1; + } + + if (oio_tcp_open(client, server->accepted_fd)) { + /* Ignore error for now */ + server->accepted_fd = -1; + close(server->accepted_fd); + return -1; + } else { + server->accepted_fd = -1; + ev_io_start(EV_DEFAULT_ &server->read_watcher); + return 0; + } +} + + int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb) { assert(handle->fd >= 0); @@ -212,12 +225,6 @@ int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb) { } -int oio_tcp_handle_accept(oio_handle* server, oio_handle* client, - oio_close_cb close_cb, void* data) { - ; -} - - int oio_close_error(oio_handle* handle, oio_err err) { ev_io_stop(EV_DEFAULT_ &handle->read_watcher); close(handle->fd); @@ -291,12 +298,8 @@ void oio__read(oio_handle* handle) { /* First pop the req off handle->read_reqs */ ngx_queue_remove(&(req->read_reqs)); - /* Must free req if local. Also must free req->read_bufs. */ free(req->read_bufs); req->read_bufs = NULL; - if (req->local) { - free(req); - } /* NOTE: call callback AFTER freeing the request data. */ if (cb) { @@ -349,16 +352,7 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { /* Successful connection */ oio_connect_cb connect_cb = req->cb; if (connect_cb) { - if (req->local) { - connect_cb(NULL, oio_err_new(handle, 0)); - } else { - connect_cb(req, oio_err_new(handle, 0)); - } - } - - /* Free up connect_req if we own it. */ - if (req->local) { - free(req); + connect_cb(req, oio_err_new(handle, 0)); } req = NULL; @@ -379,25 +373,11 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { } -oio_req* oio_req_maybe_alloc(oio_handle* handle, oio_req* in_req) { - if (in_req) { - ngx_queue_init(&(in_req->read_reqs)); - in_req->handle = handle; - in_req->local = 0; - return in_req; - } else { - oio_req *req = malloc(sizeof(oio_req)); - oio_req_init(req, NULL, NULL); - req->handle = handle; - ngx_queue_init(&(req->read_reqs)); - req->local = 1; - return req; - } -} +int oio_connect(oio_req *req, struct sockaddr* addr) { + oio_handle* handle = req->handle; - -int oio_connect(oio_req *req_in, struct sockaddr* addr) { - oio_handle* handle = req_in->handle; + req->type = OIO_CONNECT; + ngx_queue_init(&(req->read_reqs)); if (handle->connect_req) { return oio_err_new(handle, EALREADY); @@ -407,14 +387,8 @@ int oio_connect(oio_req *req_in, struct sockaddr* addr) { return oio_err_new(handle, ENOTSOCK); } - oio_req *req = oio_req_maybe_alloc(handle, req_in); - if (!req) { - return oio_err_new(handle, ENOMEM); - } - handle->connect_req = req; - int addrsize; if (addr->sa_family == AF_INET) { @@ -448,6 +422,9 @@ int oio_write(oio_req *req, oio_buf* bufs, int bufcnt) { assert(handle->fd >= 0); ssize_t r; + ngx_queue_init(&(req->read_reqs)); + req->type = OIO_WRITE; + r = writev(handle->fd, (struct iovec*)bufs, bufcnt); if (r < 0) { @@ -471,11 +448,11 @@ int oio_write2(oio_req* req, const char* msg) { } -int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { - oio_handle* handle = req_in->handle; +int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) { + oio_handle* handle = req->handle; ssize_t nread = -1; errno = EAGAIN; - oio_read_cb cb = req_in->cb; + oio_read_cb cb = req->cb; assert(handle->fd >= 0); @@ -484,12 +461,15 @@ int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { ssize_t nread = readv(handle->fd, (struct iovec*) bufs, bufcnt); } + req->type = OIO_READ; + ngx_queue_init(&(req->read_reqs)); + if (nread < 0 && errno != EAGAIN) { /* Real error. */ oio_err err = oio_err_new(handle, errno); if (cb) { - cb(req_in, nread); + cb(req, nread); } return err; @@ -498,7 +478,7 @@ int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { if (nread >= 0) { /* Successful read. */ if (cb) { - cb(req_in, nread); + cb(req, nread); } return 0; } @@ -515,10 +495,6 @@ int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { * become readable. * In the meantime we append the request to handle->read_reqs */ - oio_req* req = oio_req_maybe_alloc(handle, req_in); - if (!req) { - return oio_err_new(handle, ENOMEM); - } /* Copy the bufs data over into our oio_req struct. This is so the user can * free the oio_buf array. The actual data inside the oio_bufs is however @@ -533,7 +509,7 @@ int oio_read(oio_req *req_in, oio_buf* bufs, int bufcnt) { ev_io_start(EV_DEFAULT_ &handle->read_watcher); - return oio_err_new(handle, EINPROGRESS); + return 0; } diff --git a/oio-unix.h b/oio-unix.h index 251d2614..f84dccd0 100644 --- a/oio-unix.h +++ b/oio-unix.h @@ -33,6 +33,7 @@ typedef struct { oio_err err; \ oio_read_cb read_cb; \ oio_accept_cb accept_cb; \ + int accepted_fd; \ oio_req *connect_req; \ ev_io read_watcher; \ ev_io write_watcher; \