diff --git a/oio-unix.c b/oio-unix.c index 42be55e6..a3cb0e06 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -35,9 +35,10 @@ static oio_err last_err; +static oio_alloc_cb alloc_cb; -void oio_tcp_io(EV_P_ ev_io* watcher, int revents); +void oio__tcp_io(EV_P_ ev_io* watcher, int revents); void oio__next(EV_P_ ev_idle* watcher, int revents); static void oio_tcp_connect(oio_handle* handle); int oio_tcp_open(oio_handle*, int fd); @@ -47,7 +48,8 @@ void oio_finish_close(oio_handle* handle); /* flags */ enum { OIO_CLOSING = 0x00000001, - OIO_CLOSED = 0x00000002 + OIO_CLOSED = 0x00000002, + OIO_READING = 0x00000004, }; @@ -80,6 +82,7 @@ static oio_err_code oio_translate_sys_error(int sys_errno) { switch (sys_errno) { case 0: return OIO_OK; case EACCES: return OIO_EACCESS; + case EAGAIN: return OIO_EAGAIN; case ECONNRESET: return OIO_ECONNRESET; case EFAULT: return OIO_EFAULT; case EMFILE: return OIO_EMFILE; @@ -92,6 +95,15 @@ static oio_err_code oio_translate_sys_error(int sys_errno) { } +static oio_err oio_err_new_artificial(oio_handle* handle, int code) { + oio_err err; + err.sys_errno_ = 0; + err.code = code; + last_err = err; + return err; +} + + static oio_err oio_err_new(oio_handle* handle, int sys_error) { oio_err err; err.sys_errno_ = sys_error; @@ -126,7 +138,9 @@ int oio_close(oio_handle* handle) { } -void oio_init() { +void oio_init(oio_alloc_cb cb) { + assert(cb); + alloc_cb = cb; ev_default_loop(0); } @@ -154,10 +168,10 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb, ev_init(&handle->next_watcher, oio__next); handle->next_watcher.data = handle; - ev_init(&handle->read_watcher, oio_tcp_io); + ev_init(&handle->read_watcher, oio__tcp_io); handle->read_watcher.data = handle; - ev_init(&handle->write_watcher, oio_tcp_io); + ev_init(&handle->write_watcher, oio__tcp_io); handle->write_watcher.data = handle; assert(ngx_queue_empty(&handle->write_queue)); @@ -238,6 +252,8 @@ int oio_tcp_open(oio_handle* handle, int fd) { assert(handle->next_watcher.data == handle); assert(handle->write_watcher.data == handle); assert(handle->read_watcher.data == handle); + assert(handle->read_watcher.cb == oio__tcp_io); + assert(handle->write_watcher.cb == oio__tcp_io); return 0; } @@ -516,68 +532,52 @@ void oio__write(oio_handle* handle) { void oio__read(oio_handle* handle) { - int errorno; - assert(handle->fd >= 0); - - /* TODO: should probably while(1) here until EAGAIN */ - - /* Get the request at the head of the read_reqs queue. */ - oio_req* req = oio_read_reqs_head(handle); - if (!req) { - ev_io_stop(EV_DEFAULT_ &handle->read_watcher); - return; - } - - /* Cast to iovec. We had to have our own oio_buf instead of iovec - * because Windows's WSABUF is not an iovec. + /* XXX: Maybe instead of having OIO_READING we just test if + * handle->read_cb is NULL or not? */ - struct iovec* iov = (struct iovec*) req->read_bufs; - int iovcnt = req->read_bufcnt; + while (handle->read_cb && oio_flag_is_set(handle, OIO_READING)) { + assert(alloc_cb); + oio_buf buf = alloc_cb(handle, 64 * 1024); - assert(iov); - assert(iovcnt > 0); + assert(buf.len > 0); + assert(buf.base); - /* Now do the actual read. */ + struct iovec* iov = (struct iovec*) &buf; - ssize_t nread = readv(handle->fd, iov, iovcnt); - errorno = errno; + ssize_t nread = readv(handle->fd, iov, 1); - oio_read_cb cb = req->cb; - - if (nread < 0) { - if (errorno == EAGAIN) { - /* Just wait for the next one. */ - assert(ev_is_active(&handle->read_watcher)); - ev_io_start(EV_DEFAULT_ &handle->read_watcher); - } else { - oio_err err = oio_err_new(handle, errorno); - if (cb) { - cb(req, 0, -1); + if (nread < 0) { + /* Error */ + if (errno == EAGAIN) { + /* Wait for the next one. */ + if (oio_flag_is_set(handle, OIO_READING)) { + ev_io_start(EV_DEFAULT_UC_ &handle->read_watcher); + } + oio_err_new(handle, EAGAIN); + handle->read_cb(handle, 0, buf); + return; + } else { + oio_err_new(handle, errno); + oio_close(handle); + handle->read_cb(handle, -1, buf); + assert(!ev_is_active(&handle->read_watcher)); + return; } - oio_close(handle); - } - } else { - /* Successful read */ - - /* First pop the req off handle->read_reqs */ - ngx_queue_remove(&req->read_reqs); - - free(req->read_bufs); /* FIXME: we should not be allocing for each read */ - req->read_bufs = NULL; - - /* NOTE: call callback AFTER freeing the request data. */ - if (cb) { - cb(req, nread, 0); - } - - if (oio_read_reqs_empty(handle)) { - ev_io_stop(EV_DEFAULT_ &handle->read_watcher); + } else if (nread == 0) { + /* EOF */ + oio_err_new_artificial(handle, OIO_EOF); + ev_io_stop(EV_DEFAULT_UC_ &handle->read_watcher); + handle->read_cb(handle, -1, buf); + return; + } else { + /* Successful read */ + handle->read_cb(handle, nread, buf); } } } -void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { +void oio__tcp_io(EV_P_ ev_io* watcher, int revents) { oio_handle* handle = watcher->data; assert(watcher == &handle->read_watcher || watcher == &handle->write_watcher); @@ -708,42 +708,6 @@ static size_t oio__buf_count(oio_buf bufs[], int bufcnt) { int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) { oio_handle* handle = req->handle; assert(handle->fd >= 0); - ssize_t written; - int errorno; - - size_t total = oio__buf_count(bufs, bufcnt); - - if (ngx_queue_empty(&req->read_reqs)) { - /* First try to do the write inline. */ - - written = writev(handle->fd, (struct iovec*)bufs, bufcnt); - errorno = errno; - - if (written < 0) { - if (errorno == EAGAIN) { - oio_err_new(handle, errorno); - return -1; - } - } else { - if (written == total) { - /* Successful write. We're done. */ - assert(ngx_queue_empty(&handle->write_queue)); - assert(handle->write_queue_size == 0); - ev_io_stop(EV_DEFAULT_ &handle->write_watcher); - - if (req && req->cb) { - oio_write_cb cb = req->cb; - cb(req, 0); - } - - return 0; - } - } - } - - /* Either we got EAGAIN or there is already data in the userspace write - * queue. - */ ngx_queue_init(&req->read_reqs); req->type = OIO_WRITE; @@ -758,12 +722,13 @@ int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) { req->read_bufcnt = bufcnt; req->write_index = 0; - handle->write_queue_size += total; + handle->write_queue_size += oio__buf_count(bufs, bufcnt); /* Append the request to write_queue. */ ngx_queue_insert_tail(&handle->write_queue, &req->read_reqs); assert(!ngx_queue_empty(&handle->write_queue)); + assert(handle->write_watcher.cb == oio__tcp_io); assert(handle->write_watcher.data == handle); assert(handle->write_watcher.fd == handle->fd); @@ -808,79 +773,33 @@ int oio_timeout(oio_req* req, int64_t timeout) { } -int oio_read(oio_req* req, oio_buf* bufs, int bufcnt) { - oio_handle* handle = req->handle; - ssize_t nread = -1; - int errorno = EAGAIN; - oio_read_cb cb = req->cb; +int oio_read_start(oio_handle* handle, oio_read_cb cb) { + /* The OIO_READING flag is irrelevant of the state of the handle - it just + * expresses the desired state of the user. + */ + oio_flag_set(handle, OIO_READING); + /* TODO: try to do the read inline? */ + /* TODO: keep track of handle state. If we've gotten a EOF then we should + * not start the IO watcher. + */ assert(handle->fd >= 0); + handle->read_cb = cb; - if (ngx_queue_empty(&handle->read_reqs)) { - /* Attempt to read immediately. */ - nread = readv(handle->fd, (struct iovec*) bufs, bufcnt); - errorno = errno; - } - - /* The request should have been just initialized. Therefore the - * ngx_queue_t for read_reqs should be empty. - */ - assert(ngx_queue_empty(&req->read_reqs)); - assert(req->type == OIO_UNKNOWN_REQ); - req->type = OIO_READ; - - if (nread < 0 && errorno != EAGAIN) { - /* Real error. */ - oio_err err = oio_err_new(handle, errorno); - - if (cb) { - cb(req, nread, -1); - } - - return -1; - } - - if (nread >= 0) { - /* Successful read. */ - if (cb) { - cb(req, nread, 0); - } - return 0; - } - - /* Either we never read anything, or we got EAGAIN. */ - assert(!ngx_queue_empty(&handle->read_reqs) || - (nread < 0 && errorno == EAGAIN)); - - /* Two possible states: - * - EAGAIN, meaning the socket is not wriable currently. We must wait for - * it to become readable with the handle->read_watcher. - * - The read_reqs queue already has reads. Meaning: the user has issued - * many oio_reads calls some of which are still waiting for the socket to - * become readable. - * In the meantime we append the request to handle->read_reqs - */ - - /* Copy the bufs data over into our oio_req struct. This is so the user can - * free the oio_buf array. The actual data inside the oio_bufs is however - * owned by the user and cannot be deallocated until the read completes. - * - * TODO: instead of mallocing here - just have a fixed number of oio_bufs - * included in the oio_req object. - */ - req->read_bufs = malloc(sizeof(oio_buf) * bufcnt); - memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf)); - req->read_bufcnt = bufcnt; - - /* Append the request to read_reqs. */ - ngx_queue_insert_tail(&handle->read_reqs, &req->read_reqs); - - assert(!ngx_queue_empty(&handle->read_reqs)); + /* These should have been set by oio_tcp_init. */ assert(handle->read_watcher.data == handle); - assert(handle->read_watcher.fd == handle->fd); + assert(handle->read_watcher.cb == oio__tcp_io); - ev_io_start(EV_DEFAULT_ &handle->read_watcher); + ev_io_start(EV_DEFAULT_UC_ &handle->read_watcher); + return 0; +} + +int oio_read_stop(oio_handle* handle) { + oio_flag_unset(handle, OIO_READING); + + ev_io_stop(EV_DEFAULT_UC_ &handle->read_watcher); + handle->read_cb = NULL; return 0; } diff --git a/oio.h b/oio.h index e630786c..a13766f6 100644 --- a/oio.h +++ b/oio.h @@ -63,6 +63,7 @@ typedef enum { OIO_OK = 0, OIO_EOF, OIO_EACCESS, + OIO_EAGAIN, OIO_EADDRINUSE, OIO_EADDRNOTAVAIL, OIO_EAFNOSUPPORT,