From dee44d886b48dc2e782ffe26c050509d19c2702e Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 29 Mar 2011 19:53:29 -0700 Subject: [PATCH] unix: simple write/read for now --- ol-unix.c | 86 ++++++++++++++++++++++++++++++++++++------------------- ol.h | 9 ++++++ 2 files changed, 65 insertions(+), 30 deletions(-) diff --git a/ol-unix.c b/ol-unix.c index 5f7c4d1e..6c4a1daa 100644 --- a/ol-unix.c +++ b/ol-unix.c @@ -14,6 +14,7 @@ void ol_tcp_io(EV_P_ ev_io* watcher, int revents); void ol_tcp_connect(ol_handle* handle, ol_req* req); +ol_handle* ol_tcp_open(ol_handle* parent, int fd); int ol_close_error(ol_handle* handle, ol_err err); @@ -60,6 +61,8 @@ ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) { handle->_.fd = -1; + ngx_queue_init(&handle->_.read_reqs); + ev_init(&handle->_.read_watcher, ol_tcp_io); ev_init(&handle->_.write_watcher, ol_tcp_io); @@ -71,10 +74,12 @@ int ol_tcp_lazy_open(ol_handle* handle, int domain) { assert(handle->_.fd < 0); /* Lazily allocate a file descriptor for this handle */ - handle->_.fd = socket(domain, SOCK_STREAM, 0); - int yes = 1; - setsockopt(handle->_.fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); - fcntl(handle->_.fd, F_SETFL, O_NONBLOCK); + int fd = socket(domain, SOCK_STREAM, 0); + + /* Set non-blocking, etc */ + ol_tcp_init_fd(fd); + + handle->_.fd = fd; return 0; } @@ -110,9 +115,27 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) { } +int ol_tcp_init_fd(int fd) { + int r; + int yes = 1; + r = fcntl(fd, F_SETFL, O_NONBLOCK); + assert(r == 0); + r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); + assert(r == 0); + return 0; +} + + ol_handle* ol_tcp_open(ol_handle* parent, int fd) { ol_handle* h = ol_handle_new(NULL, NULL); + if (!h) { + return NULL; + } h->_.fd = fd; + + /* Set non-blocking, etc */ + ol_tcp_init_fd(fd); + return h; } @@ -136,8 +159,11 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) { } else { ol_close_error(handle, ol_err_new(handle, errno)); } + } else { - if (handle->accept_cb) { + if (!handle->accept_cb) { + close(fd); + } else { ol_handle* new_client = ol_tcp_open(handle, fd); if (!new_client) { ol_close_error(handle, ol_err_last(handle)); @@ -145,8 +171,6 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) { } handle->accept_cb(handle, new_client); - } else { - close(fd); } } } @@ -309,9 +333,15 @@ int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) { } int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { - /* stub */ - assert(0); - return 0; + ssize_t r; + + r = writev(handle->_.fd, (struct iovec*)bufs, bufcnt); + + if (r < 0) { + return ol_err_new(handle, r); + } else { + return 0; + } } @@ -349,35 +379,31 @@ int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) { /* Attempt to read immediately */ ssize_t nread = readv(handle->_.fd, (struct iovec*) bufs, bufcnt); + ol_read_cb cb = req_in->cb; + if (nread < 0) { + if (errno == EAGAIN) { + ev_io_start(EV_DEFAULT_ &handle->_.read_watcher); + return 0; + } else { + ol_err err = ol_err_new(handle, errno); - } - - ssize_t buftotal; - int i; - for (i = 0; i < bufcnt; i++) { - buftotal += bufs[i].len; - if (buftotal == nread) { - /* We read everything */ - ol_read_cb cb = req_in->cb; if (cb) { - cb(req_in, nread, 0); + cb(req_in, nread, err); } + + return err; } - if (buftotal > nread) break; - } - - - ol_req *req = ol_req_maybe_alloc(handle, req_in); - if (!req) { - return ol_err_new(handle, ENOMEM); + } else { + if (cb) { + cb(req_in, nread, 0); + } + return 0; } } - - /* ngx_queue_insert_tail(); */ - + assert(0 && "Unreachable"); return 0; } diff --git a/ol.h b/ol.h index 287fde03..cb8e3da8 100644 --- a/ol.h +++ b/ol.h @@ -68,6 +68,15 @@ struct ol_req_s { }; +/** + * Most functions return boolean: 0 for success and -1 for failure. + * On error the user should then call ol_last_error() to determine + * the error code. + */ +ol_err ol_last_error(); +const char* ol_err_str(ol_err err); + + void ol_init(); int ol_run();