Update unix backend to read_start/read_stop API

This commit is contained in:
Ryan Dahl 2011-05-02 18:15:17 -07:00
parent d553776f5d
commit d4289f47fc
2 changed files with 80 additions and 160 deletions

View File

@ -35,9 +35,10 @@
static oio_err last_err;
static oio_alloc_cb alloc_cb;
void oio_tcp_io(EV_P_ ev_io* watcher, int revents);
void oio__tcp_io(EV_P_ ev_io* watcher, int revents);
void oio__next(EV_P_ ev_idle* watcher, int revents);
static void oio_tcp_connect(oio_handle* handle);
int oio_tcp_open(oio_handle*, int fd);
@ -47,7 +48,8 @@ void oio_finish_close(oio_handle* handle);
/* flags */
enum {
OIO_CLOSING = 0x00000001,
OIO_CLOSED = 0x00000002
OIO_CLOSED = 0x00000002,
OIO_READING = 0x00000004,
};
@ -80,6 +82,7 @@ static oio_err_code oio_translate_sys_error(int sys_errno) {
switch (sys_errno) {
case 0: return OIO_OK;
case EACCES: return OIO_EACCESS;
case EAGAIN: return OIO_EAGAIN;
case ECONNRESET: return OIO_ECONNRESET;
case EFAULT: return OIO_EFAULT;
case EMFILE: return OIO_EMFILE;
@ -92,6 +95,15 @@ static oio_err_code oio_translate_sys_error(int sys_errno) {
}
static oio_err oio_err_new_artificial(oio_handle* handle, int code) {
oio_err err;
err.sys_errno_ = 0;
err.code = code;
last_err = err;
return err;
}
static oio_err oio_err_new(oio_handle* handle, int sys_error) {
oio_err err;
err.sys_errno_ = sys_error;
@ -126,7 +138,9 @@ int oio_close(oio_handle* handle) {
}
void oio_init() {
void oio_init(oio_alloc_cb cb) {
assert(cb);
alloc_cb = cb;
ev_default_loop(0);
}
@ -154,10 +168,10 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb,
ev_init(&handle->next_watcher, oio__next);
handle->next_watcher.data = handle;
ev_init(&handle->read_watcher, oio_tcp_io);
ev_init(&handle->read_watcher, oio__tcp_io);
handle->read_watcher.data = handle;
ev_init(&handle->write_watcher, oio_tcp_io);
ev_init(&handle->write_watcher, oio__tcp_io);
handle->write_watcher.data = handle;
assert(ngx_queue_empty(&handle->write_queue));
@ -238,6 +252,8 @@ int oio_tcp_open(oio_handle* handle, int fd) {
assert(handle->next_watcher.data == handle);
assert(handle->write_watcher.data == handle);
assert(handle->read_watcher.data == handle);
assert(handle->read_watcher.cb == oio__tcp_io);
assert(handle->write_watcher.cb == oio__tcp_io);
return 0;
}
@ -516,68 +532,52 @@ void oio__write(oio_handle* handle) {
void oio__read(oio_handle* handle) {
int errorno;
assert(handle->fd >= 0);
/* TODO: should probably while(1) here until EAGAIN */
/* Get the request at the head of the read_reqs queue. */
oio_req* req = oio_read_reqs_head(handle);
if (!req) {
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
return;
}
/* Cast to iovec. We had to have our own oio_buf instead of iovec
* because Windows's WSABUF is not an iovec.
/* XXX: Maybe instead of having OIO_READING we just test if
* handle->read_cb is NULL or not?
*/
struct iovec* iov = (struct iovec*) req->read_bufs;
int iovcnt = req->read_bufcnt;
while (handle->read_cb && oio_flag_is_set(handle, OIO_READING)) {
assert(alloc_cb);
oio_buf buf = alloc_cb(handle, 64 * 1024);
assert(iov);
assert(iovcnt > 0);
assert(buf.len > 0);
assert(buf.base);
/* Now do the actual read. */
struct iovec* iov = (struct iovec*) &buf;
ssize_t nread = readv(handle->fd, iov, iovcnt);
errorno = errno;
ssize_t nread = readv(handle->fd, iov, 1);
oio_read_cb cb = req->cb;
if (nread < 0) {
if (errorno == EAGAIN) {
/* Just wait for the next one. */
assert(ev_is_active(&handle->read_watcher));
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
} else {
oio_err err = oio_err_new(handle, errorno);
if (cb) {
cb(req, 0, -1);
if (nread < 0) {
/* Error */
if (errno == EAGAIN) {
/* Wait for the next one. */
if (oio_flag_is_set(handle, OIO_READING)) {
ev_io_start(EV_DEFAULT_UC_ &handle->read_watcher);
}
oio_err_new(handle, EAGAIN);
handle->read_cb(handle, 0, buf);
return;
} else {
oio_err_new(handle, errno);
oio_close(handle);
handle->read_cb(handle, -1, buf);
assert(!ev_is_active(&handle->read_watcher));
return;
}
oio_close(handle);
}
} else {
/* Successful read */
/* First pop the req off handle->read_reqs */
ngx_queue_remove(&req->read_reqs);
free(req->read_bufs); /* FIXME: we should not be allocing for each read */
req->read_bufs = NULL;
/* NOTE: call callback AFTER freeing the request data. */
if (cb) {
cb(req, nread, 0);
}
if (oio_read_reqs_empty(handle)) {
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
} else if (nread == 0) {
/* EOF */
oio_err_new_artificial(handle, OIO_EOF);
ev_io_stop(EV_DEFAULT_UC_ &handle->read_watcher);
handle->read_cb(handle, -1, buf);
return;
} else {
/* Successful read */
handle->read_cb(handle, nread, buf);
}
}
}
void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
void oio__tcp_io(EV_P_ ev_io* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->read_watcher ||
watcher == &handle->write_watcher);
@ -708,42 +708,6 @@ static size_t oio__buf_count(oio_buf bufs[], int bufcnt) {
int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) {
oio_handle* handle = req->handle;
assert(handle->fd >= 0);
ssize_t written;
int errorno;
size_t total = oio__buf_count(bufs, bufcnt);
if (ngx_queue_empty(&req->read_reqs)) {
/* First try to do the write inline. */
written = writev(handle->fd, (struct iovec*)bufs, bufcnt);
errorno = errno;
if (written < 0) {
if (errorno == EAGAIN) {
oio_err_new(handle, errorno);
return -1;
}
} else {
if (written == total) {
/* Successful write. We're done. */
assert(ngx_queue_empty(&handle->write_queue));
assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
if (req && req->cb) {
oio_write_cb cb = req->cb;
cb(req, 0);
}
return 0;
}
}
}
/* Either we got EAGAIN or there is already data in the userspace write
* queue.
*/
ngx_queue_init(&req->read_reqs);
req->type = OIO_WRITE;
@ -758,12 +722,13 @@ int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) {
req->read_bufcnt = bufcnt;
req->write_index = 0;
handle->write_queue_size += total;
handle->write_queue_size += oio__buf_count(bufs, bufcnt);
/* Append the request to write_queue. */
ngx_queue_insert_tail(&handle->write_queue, &req->read_reqs);
assert(!ngx_queue_empty(&handle->write_queue));
assert(handle->write_watcher.cb == oio__tcp_io);
assert(handle->write_watcher.data == handle);
assert(handle->write_watcher.fd == handle->fd);
@ -808,79 +773,33 @@ int oio_timeout(oio_req* req, int64_t timeout) {
}
int oio_read(oio_req* req, oio_buf* bufs, int bufcnt) {
oio_handle* handle = req->handle;
ssize_t nread = -1;
int errorno = EAGAIN;
oio_read_cb cb = req->cb;
int oio_read_start(oio_handle* handle, oio_read_cb cb) {
/* The OIO_READING flag is irrelevant of the state of the handle - it just
* expresses the desired state of the user.
*/
oio_flag_set(handle, OIO_READING);
/* TODO: try to do the read inline? */
/* TODO: keep track of handle state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
assert(handle->fd >= 0);
handle->read_cb = cb;
if (ngx_queue_empty(&handle->read_reqs)) {
/* Attempt to read immediately. */
nread = readv(handle->fd, (struct iovec*) bufs, bufcnt);
errorno = errno;
}
/* The request should have been just initialized. Therefore the
* ngx_queue_t for read_reqs should be empty.
*/
assert(ngx_queue_empty(&req->read_reqs));
assert(req->type == OIO_UNKNOWN_REQ);
req->type = OIO_READ;
if (nread < 0 && errorno != EAGAIN) {
/* Real error. */
oio_err err = oio_err_new(handle, errorno);
if (cb) {
cb(req, nread, -1);
}
return -1;
}
if (nread >= 0) {
/* Successful read. */
if (cb) {
cb(req, nread, 0);
}
return 0;
}
/* Either we never read anything, or we got EAGAIN. */
assert(!ngx_queue_empty(&handle->read_reqs) ||
(nread < 0 && errorno == EAGAIN));
/* Two possible states:
* - EAGAIN, meaning the socket is not wriable currently. We must wait for
* it to become readable with the handle->read_watcher.
* - The read_reqs queue already has reads. Meaning: the user has issued
* many oio_reads calls some of which are still waiting for the socket to
* become readable.
* In the meantime we append the request to handle->read_reqs
*/
/* Copy the bufs data over into our oio_req struct. This is so the user can
* free the oio_buf array. The actual data inside the oio_bufs is however
* owned by the user and cannot be deallocated until the read completes.
*
* TODO: instead of mallocing here - just have a fixed number of oio_bufs
* included in the oio_req object.
*/
req->read_bufs = malloc(sizeof(oio_buf) * bufcnt);
memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf));
req->read_bufcnt = bufcnt;
/* Append the request to read_reqs. */
ngx_queue_insert_tail(&handle->read_reqs, &req->read_reqs);
assert(!ngx_queue_empty(&handle->read_reqs));
/* These should have been set by oio_tcp_init. */
assert(handle->read_watcher.data == handle);
assert(handle->read_watcher.fd == handle->fd);
assert(handle->read_watcher.cb == oio__tcp_io);
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
ev_io_start(EV_DEFAULT_UC_ &handle->read_watcher);
return 0;
}
int oio_read_stop(oio_handle* handle) {
oio_flag_unset(handle, OIO_READING);
ev_io_stop(EV_DEFAULT_UC_ &handle->read_watcher);
handle->read_cb = NULL;
return 0;
}

1
oio.h
View File

@ -63,6 +63,7 @@ typedef enum {
OIO_OK = 0,
OIO_EOF,
OIO_EACCESS,
OIO_EAGAIN,
OIO_EADDRINUSE,
OIO_EADDRNOTAVAIL,
OIO_EAFNOSUPPORT,