From 95394439050bcbadf9180f79adaf36d0d659a671 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Tue, 26 Apr 2011 01:26:37 -0700 Subject: [PATCH] Partial implementation of buffer writes on unix. Hitting callstack overflow problem in echo-server. --- oio-unix.c | 245 ++++++++++++++++++++++++++++++++++++++++++++++------- oio-unix.h | 5 +- oio.h | 12 +-- 3 files changed, 225 insertions(+), 37 deletions(-) diff --git a/oio-unix.c b/oio-unix.c index 2dbe2346..aeeb2a20 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -40,7 +40,7 @@ static oio_err last_err; void oio_tcp_io(EV_P_ ev_io* watcher, int revents); void oio__next(EV_P_ ev_idle* watcher, int revents); -void oio_tcp_connect(oio_handle* handle); +static void oio_tcp_connect(oio_handle* handle); int oio_tcp_open(oio_handle*, int fd); void oio_finish_close(oio_handle* handle); @@ -81,6 +81,7 @@ static oio_err_code oio_translate_sys_error(int sys_errno) { switch (sys_errno) { case 0: return OIO_OK; case EACCES: return OIO_EACCESS; + case ECONNRESET: return OIO_ECONNRESET; case EFAULT: return OIO_EFAULT; case EMFILE: return OIO_EMFILE; case EINVAL: return OIO_EINVAL; @@ -101,11 +102,6 @@ static oio_err oio_err_new(oio_handle* handle, int sys_error) { } -oio_err oio_err_last(oio_handle* handle) { - return handle->err; -} - - struct sockaddr_in oio_ip4_addr(char* ip, int port) { struct sockaddr_in addr; @@ -153,6 +149,9 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb, ngx_queue_init(&handle->read_reqs); + ngx_queue_init(&handle->write_queue); + handle->write_queue_size = 0; + ev_init(&handle->next_watcher, oio__next); handle->next_watcher.data = handle; @@ -162,6 +161,9 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb, ev_init(&handle->write_watcher, oio_tcp_io); handle->write_watcher.data = handle; + assert(ngx_queue_empty(&handle->write_queue)); + assert(handle->write_queue_size == 0); + return 0; } @@ -270,7 +272,7 @@ void oio__server_io(EV_P_ ev_io* watcher, int revents) { /* TODO special trick. unlock reserved socket, accept, close. */ return; } else { - handle->err = oio_err_new(handle, errno); + oio_err_new(handle, errno); oio_close(handle); } @@ -355,7 +357,28 @@ void oio_finish_close(oio_handle* handle) { oio_req* oio_read_reqs_head(oio_handle* handle) { - ngx_queue_t* q = ngx_queue_head(&(handle->read_reqs)); + if (ngx_queue_empty(&handle->read_reqs)) { + return NULL; + } + + ngx_queue_t* q = ngx_queue_head(&handle->read_reqs); + if (!q) { + return NULL; + } + + oio_req* req = ngx_queue_data(q, struct oio_req_s, read_reqs); + assert(req); + + return req; +} + + +oio_req* oio_write_queue_head(oio_handle* handle) { + if (ngx_queue_empty(&handle->write_queue)) { + return NULL; + } + + ngx_queue_t* q = ngx_queue_head(&handle->write_queue); if (!q) { return NULL; } @@ -385,6 +408,114 @@ void oio__next(EV_P_ ev_idle* watcher, int revents) { } +void oio__write(oio_handle* handle) { + assert(handle->fd >= 0); + + /* TODO: should probably while(1) here until EAGAIN */ + + /* Get the request at the head of the read_reqs queue. */ + oio_req* req = oio_write_queue_head(handle); + if (!req) { + /* This probably shouldn't happen. Maybe assert(0) here. */ + ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + return; + } + + assert(req->handle == handle); + + /* Cast to iovec. We had to have our own oio_buf instead of iovec + * because Windows's WSABUF is not an iovec. + */ + struct iovec* iov = (struct iovec*) (req->read_bufs + + req->write_index * sizeof(oio_buf)); + int iovcnt = req->read_bufcnt - req->write_index; + + assert(iov); + assert(iovcnt > 0); + + /* Now do the actual writev. Note that we've been updating the pointers + * inside the iov each time we write. So there is no need to offset it. + */ + + ssize_t n = writev(handle->fd, iov, iovcnt); + + oio_write_cb cb = req->cb; + + if (n < 0) { + if (errno != EAGAIN) { + oio_err err = oio_err_new(handle, errno); + + /* XXX How do we handle the error? Need test coverage here. */ + + if (cb) { + cb(req, -1); + } + oio_close(handle); + return; + } + } else { + /* Successful write */ + + /* The loop updates the counters. */ + while (n > 0) { + char* base = req->read_bufs[req->write_index].base; + size_t len = req->read_bufs[req->write_index].len; + assert(req->write_index < req->read_bufcnt); + + if (n < len) { + req->read_bufs[req->write_index].base += n; + req->read_bufs[req->write_index].len -= n; + handle->write_queue_size -= n; + n = 0; + /* There is more to write. Break and ensure the watcher is pending. */ + break; + + } else { + /* Finished writing the buf at index req->write_index. */ + req->write_index++; + + assert(n >= len); + n -= len; + + assert(handle->write_queue_size >= len); + handle->write_queue_size -= len; + + if (req->write_index == req->read_bufcnt) { + /* Then we're done! */ + assert(n == 0); + + /* Pop the req off handle->write_queue. */ + ngx_queue_remove(&req->read_reqs); + free(req->read_bufs); /* FIXME: we should not be allocing for each read */ + req->read_bufs = NULL; + + /* NOTE: call callback AFTER freeing the request data. */ + if (cb) { + cb(req, 0); + } + + if (ngx_queue_empty(&handle->write_queue)) { + assert(handle->write_queue_size == 0); + ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + } else { + assert(handle->write_queue_size > 0); + } + + return; + } + } + } + } + + /* Either we've counted n down to zero or we've got EAGAIN. */ + assert(n == 0 || n == -1); + + /* We're not done yet. */ + assert(ev_is_active(&handle->write_watcher)); + ev_io_start(EV_DEFAULT_ &handle->write_watcher); +} + + void oio__read(oio_handle* handle) { int errorno; assert(handle->fd >= 0); @@ -424,7 +555,6 @@ void oio__read(oio_handle* handle) { if (cb) { cb(req, 0, -1); } - handle->err = err; oio_close(handle); } } else { @@ -465,8 +595,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { } if (revents & EV_WRITE) { - /* ignore for now */ - ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + oio__write(handle); } } } @@ -477,7 +606,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { * In order to determine if we've errored out or succeeded must call * getsockopt. */ -void oio_tcp_connect(oio_handle* handle) { +static void oio_tcp_connect(oio_handle* handle) { assert(handle->fd >= 0); oio_req* req = handle->connect_req; @@ -504,12 +633,13 @@ void oio_tcp_connect(oio_handle* handle) { } else { oio_err err = oio_err_new(handle, error); + handle->connect_req = NULL; + oio_connect_cb connect_cb = req->cb; if (connect_cb) { connect_cb(req, -1); } - handle->err = err; oio_close(handle); } } @@ -561,31 +691,86 @@ int oio_connect(oio_req* req, struct sockaddr* addr) { } -/* The buffers to be written must remain valid until the callback is called. */ -/* This is not required for the oio_buf array. */ +static size_t oio__buf_count(oio_buf bufs[], int bufcnt) { + size_t total = 0; + int i; + + for (i = 0; i < bufcnt; i++) { + total += bufs[i].len; + } + + return total; +} + + +/* The buffers to be written must remain valid until the callback is called. + * This is not required for the oio_buf array. + */ int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) { oio_handle* handle = req->handle; assert(handle->fd >= 0); - ssize_t r; + ssize_t written; int errorno; - ngx_queue_init(&(req->read_reqs)); + size_t total = oio__buf_count(bufs, bufcnt); + + if (ngx_queue_empty(&req->read_reqs)) { + /* First try to do the write inline. */ + + written = writev(handle->fd, (struct iovec*)bufs, bufcnt); + errorno = errno; + + if (written < 0) { + if (errorno == EAGAIN) { + oio_err_new(handle, errorno); + return -1; + } + } else { + if (written == total) { + /* Successful write. We're done. */ + assert(ngx_queue_empty(&handle->write_queue)); + assert(handle->write_queue_size == 0); + ev_io_stop(EV_DEFAULT_ &handle->write_watcher); + + if (req && req->cb) { + oio_write_cb cb = req->cb; + cb(req, 0); + } + + return 0; + } + } + } + + /* Either we got EAGAIN or there is already data in the userspace write + * queue. + */ + + ngx_queue_init(&req->read_reqs); req->type = OIO_WRITE; - r = writev(handle->fd, (struct iovec*)bufs, bufcnt); - errorno = errno; + /* TODO rename: + * req->read_reqs to req->queue + * req->read_bufs to req->bufs + * req->read_bufcnt to req->bufcnt + */ + req->read_bufs = malloc(sizeof(oio_buf) * bufcnt); + memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf)); + req->read_bufcnt = bufcnt; - if (r < 0) { - assert(errorno != EAGAIN && "write queueing not yet supported"); - oio_err_new(handle, errorno); - return -1; - } else { - if (req && req->cb) { - oio_write_cb cb = req->cb; - cb(req, 0); - } - return 0; - } + req->write_index = 0; + handle->write_queue_size += total; + + /* Append the request to write_queue. */ + ngx_queue_insert_tail(&handle->write_queue, &req->read_reqs); + + assert(!ngx_queue_empty(&handle->write_queue)); + assert(handle->write_watcher.data == handle); + assert(handle->write_watcher.fd == handle->fd); + + ev_io_start(EV_DEFAULT_ &handle->write_watcher); + + return 0; } diff --git a/oio-unix.h b/oio-unix.h index e556113c..3266bcc3 100644 --- a/oio-unix.h +++ b/oio-unix.h @@ -40,16 +40,16 @@ typedef struct { #define oio_req_private_fields \ + int write_index; \ + ev_timer timer; \ ngx_queue_t read_reqs; \ oio_buf* read_bufs; \ - ev_timer timer; \ int read_bufcnt; #define oio_handle_private_fields \ int fd; \ int flags; \ - oio_err err; \ oio_read_cb read_cb; \ oio_accept_cb accept_cb; \ int accepted_fd; \ @@ -58,6 +58,7 @@ typedef struct { ev_io write_watcher; \ ev_idle next_watcher; \ ngx_queue_t write_queue; \ + size_t write_queue_size; \ ngx_queue_t read_reqs; diff --git a/oio.h b/oio.h index 9c049ec5..6aa2d5ed 100644 --- a/oio.h +++ b/oio.h @@ -178,17 +178,19 @@ int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb); int oio_accept(oio_handle* server, oio_handle* client, oio_close_cb close_cb, void* data); -/* Generic read/write methods. */ -/* The buffers to be written or read into must remain valid until the */ -/* callback is called. The oio_buf array does need not remain valid! */ +/* Generic read/write methods. The buffers to be written or read into must + * remain valid until the callback is called. The oio_buf array does need + * not remain valid! + */ int oio_read(oio_req* req, oio_buf* bufs, int bufcnt); int oio_write(oio_req* req, oio_buf* bufs, int bufcnt); /* Timer methods */ int oio_timeout(oio_req* req, int64_t timeout); -/* Request handle to be closed. close_cb will be called */ -/* asynchronously after this call. */ +/* Request handle to be closed. close_cb will be called + * asynchronously after this call. + */ int oio_close(oio_handle* handle);