Partial implementation of buffer writes on unix.

Hitting callstack overflow problem in echo-server.
This commit is contained in:
Ryan Dahl 2011-04-26 01:26:37 -07:00
parent b4836377f9
commit 9539443905
3 changed files with 225 additions and 37 deletions

View File

@ -40,7 +40,7 @@ static oio_err last_err;
void oio_tcp_io(EV_P_ ev_io* watcher, int revents);
void oio__next(EV_P_ ev_idle* watcher, int revents);
void oio_tcp_connect(oio_handle* handle);
static void oio_tcp_connect(oio_handle* handle);
int oio_tcp_open(oio_handle*, int fd);
void oio_finish_close(oio_handle* handle);
@ -81,6 +81,7 @@ static oio_err_code oio_translate_sys_error(int sys_errno) {
switch (sys_errno) {
case 0: return OIO_OK;
case EACCES: return OIO_EACCESS;
case ECONNRESET: return OIO_ECONNRESET;
case EFAULT: return OIO_EFAULT;
case EMFILE: return OIO_EMFILE;
case EINVAL: return OIO_EINVAL;
@ -101,11 +102,6 @@ static oio_err oio_err_new(oio_handle* handle, int sys_error) {
}
oio_err oio_err_last(oio_handle* handle) {
return handle->err;
}
struct sockaddr_in oio_ip4_addr(char* ip, int port) {
struct sockaddr_in addr;
@ -153,6 +149,9 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb,
ngx_queue_init(&handle->read_reqs);
ngx_queue_init(&handle->write_queue);
handle->write_queue_size = 0;
ev_init(&handle->next_watcher, oio__next);
handle->next_watcher.data = handle;
@ -162,6 +161,9 @@ int oio_tcp_init(oio_handle* handle, oio_close_cb close_cb,
ev_init(&handle->write_watcher, oio_tcp_io);
handle->write_watcher.data = handle;
assert(ngx_queue_empty(&handle->write_queue));
assert(handle->write_queue_size == 0);
return 0;
}
@ -270,7 +272,7 @@ void oio__server_io(EV_P_ ev_io* watcher, int revents) {
/* TODO special trick. unlock reserved socket, accept, close. */
return;
} else {
handle->err = oio_err_new(handle, errno);
oio_err_new(handle, errno);
oio_close(handle);
}
@ -355,7 +357,28 @@ void oio_finish_close(oio_handle* handle) {
oio_req* oio_read_reqs_head(oio_handle* handle) {
ngx_queue_t* q = ngx_queue_head(&(handle->read_reqs));
if (ngx_queue_empty(&handle->read_reqs)) {
return NULL;
}
ngx_queue_t* q = ngx_queue_head(&handle->read_reqs);
if (!q) {
return NULL;
}
oio_req* req = ngx_queue_data(q, struct oio_req_s, read_reqs);
assert(req);
return req;
}
oio_req* oio_write_queue_head(oio_handle* handle) {
if (ngx_queue_empty(&handle->write_queue)) {
return NULL;
}
ngx_queue_t* q = ngx_queue_head(&handle->write_queue);
if (!q) {
return NULL;
}
@ -385,6 +408,114 @@ void oio__next(EV_P_ ev_idle* watcher, int revents) {
}
void oio__write(oio_handle* handle) {
assert(handle->fd >= 0);
/* TODO: should probably while(1) here until EAGAIN */
/* Get the request at the head of the read_reqs queue. */
oio_req* req = oio_write_queue_head(handle);
if (!req) {
/* This probably shouldn't happen. Maybe assert(0) here. */
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
return;
}
assert(req->handle == handle);
/* Cast to iovec. We had to have our own oio_buf instead of iovec
* because Windows's WSABUF is not an iovec.
*/
struct iovec* iov = (struct iovec*) (req->read_bufs +
req->write_index * sizeof(oio_buf));
int iovcnt = req->read_bufcnt - req->write_index;
assert(iov);
assert(iovcnt > 0);
/* Now do the actual writev. Note that we've been updating the pointers
* inside the iov each time we write. So there is no need to offset it.
*/
ssize_t n = writev(handle->fd, iov, iovcnt);
oio_write_cb cb = req->cb;
if (n < 0) {
if (errno != EAGAIN) {
oio_err err = oio_err_new(handle, errno);
/* XXX How do we handle the error? Need test coverage here. */
if (cb) {
cb(req, -1);
}
oio_close(handle);
return;
}
} else {
/* Successful write */
/* The loop updates the counters. */
while (n > 0) {
char* base = req->read_bufs[req->write_index].base;
size_t len = req->read_bufs[req->write_index].len;
assert(req->write_index < req->read_bufcnt);
if (n < len) {
req->read_bufs[req->write_index].base += n;
req->read_bufs[req->write_index].len -= n;
handle->write_queue_size -= n;
n = 0;
/* There is more to write. Break and ensure the watcher is pending. */
break;
} else {
/* Finished writing the buf at index req->write_index. */
req->write_index++;
assert(n >= len);
n -= len;
assert(handle->write_queue_size >= len);
handle->write_queue_size -= len;
if (req->write_index == req->read_bufcnt) {
/* Then we're done! */
assert(n == 0);
/* Pop the req off handle->write_queue. */
ngx_queue_remove(&req->read_reqs);
free(req->read_bufs); /* FIXME: we should not be allocing for each read */
req->read_bufs = NULL;
/* NOTE: call callback AFTER freeing the request data. */
if (cb) {
cb(req, 0);
}
if (ngx_queue_empty(&handle->write_queue)) {
assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
} else {
assert(handle->write_queue_size > 0);
}
return;
}
}
}
}
/* Either we've counted n down to zero or we've got EAGAIN. */
assert(n == 0 || n == -1);
/* We're not done yet. */
assert(ev_is_active(&handle->write_watcher));
ev_io_start(EV_DEFAULT_ &handle->write_watcher);
}
void oio__read(oio_handle* handle) {
int errorno;
assert(handle->fd >= 0);
@ -424,7 +555,6 @@ void oio__read(oio_handle* handle) {
if (cb) {
cb(req, 0, -1);
}
handle->err = err;
oio_close(handle);
}
} else {
@ -465,8 +595,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
}
if (revents & EV_WRITE) {
/* ignore for now */
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
oio__write(handle);
}
}
}
@ -477,7 +606,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
void oio_tcp_connect(oio_handle* handle) {
static void oio_tcp_connect(oio_handle* handle) {
assert(handle->fd >= 0);
oio_req* req = handle->connect_req;
@ -504,12 +633,13 @@ void oio_tcp_connect(oio_handle* handle) {
} else {
oio_err err = oio_err_new(handle, error);
handle->connect_req = NULL;
oio_connect_cb connect_cb = req->cb;
if (connect_cb) {
connect_cb(req, -1);
}
handle->err = err;
oio_close(handle);
}
}
@ -561,31 +691,86 @@ int oio_connect(oio_req* req, struct sockaddr* addr) {
}
/* The buffers to be written must remain valid until the callback is called. */
/* This is not required for the oio_buf array. */
static size_t oio__buf_count(oio_buf bufs[], int bufcnt) {
size_t total = 0;
int i;
for (i = 0; i < bufcnt; i++) {
total += bufs[i].len;
}
return total;
}
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the oio_buf array.
*/
int oio_write(oio_req* req, oio_buf* bufs, int bufcnt) {
oio_handle* handle = req->handle;
assert(handle->fd >= 0);
ssize_t r;
ssize_t written;
int errorno;
ngx_queue_init(&(req->read_reqs));
size_t total = oio__buf_count(bufs, bufcnt);
if (ngx_queue_empty(&req->read_reqs)) {
/* First try to do the write inline. */
written = writev(handle->fd, (struct iovec*)bufs, bufcnt);
errorno = errno;
if (written < 0) {
if (errorno == EAGAIN) {
oio_err_new(handle, errorno);
return -1;
}
} else {
if (written == total) {
/* Successful write. We're done. */
assert(ngx_queue_empty(&handle->write_queue));
assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
if (req && req->cb) {
oio_write_cb cb = req->cb;
cb(req, 0);
}
return 0;
}
}
}
/* Either we got EAGAIN or there is already data in the userspace write
* queue.
*/
ngx_queue_init(&req->read_reqs);
req->type = OIO_WRITE;
r = writev(handle->fd, (struct iovec*)bufs, bufcnt);
errorno = errno;
/* TODO rename:
* req->read_reqs to req->queue
* req->read_bufs to req->bufs
* req->read_bufcnt to req->bufcnt
*/
req->read_bufs = malloc(sizeof(oio_buf) * bufcnt);
memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf));
req->read_bufcnt = bufcnt;
if (r < 0) {
assert(errorno != EAGAIN && "write queueing not yet supported");
oio_err_new(handle, errorno);
return -1;
} else {
if (req && req->cb) {
oio_write_cb cb = req->cb;
cb(req, 0);
}
return 0;
}
req->write_index = 0;
handle->write_queue_size += total;
/* Append the request to write_queue. */
ngx_queue_insert_tail(&handle->write_queue, &req->read_reqs);
assert(!ngx_queue_empty(&handle->write_queue));
assert(handle->write_watcher.data == handle);
assert(handle->write_watcher.fd == handle->fd);
ev_io_start(EV_DEFAULT_ &handle->write_watcher);
return 0;
}

View File

@ -40,16 +40,16 @@ typedef struct {
#define oio_req_private_fields \
int write_index; \
ev_timer timer; \
ngx_queue_t read_reqs; \
oio_buf* read_bufs; \
ev_timer timer; \
int read_bufcnt;
#define oio_handle_private_fields \
int fd; \
int flags; \
oio_err err; \
oio_read_cb read_cb; \
oio_accept_cb accept_cb; \
int accepted_fd; \
@ -58,6 +58,7 @@ typedef struct {
ev_io write_watcher; \
ev_idle next_watcher; \
ngx_queue_t write_queue; \
size_t write_queue_size; \
ngx_queue_t read_reqs;

12
oio.h
View File

@ -178,17 +178,19 @@ int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb);
int oio_accept(oio_handle* server, oio_handle* client,
oio_close_cb close_cb, void* data);
/* Generic read/write methods. */
/* The buffers to be written or read into must remain valid until the */
/* callback is called. The oio_buf array does need not remain valid! */
/* Generic read/write methods. The buffers to be written or read into must
* remain valid until the callback is called. The oio_buf array does need
* not remain valid!
*/
int oio_read(oio_req* req, oio_buf* bufs, int bufcnt);
int oio_write(oio_req* req, oio_buf* bufs, int bufcnt);
/* Timer methods */
int oio_timeout(oio_req* req, int64_t timeout);
/* Request handle to be closed. close_cb will be called */
/* asynchronously after this call. */
/* Request handle to be closed. close_cb will be called
* asynchronously after this call.
*/
int oio_close(oio_handle* handle);