diff --git a/ngx_queue.h b/ngx_queue.h index 9a1763d7..9e422d6a 100644 --- a/ngx_queue.h +++ b/ngx_queue.h @@ -4,10 +4,6 @@ */ -#include -#include - - #ifndef _NGX_QUEUE_H_INCLUDED_ #define _NGX_QUEUE_H_INCLUDED_ @@ -103,9 +99,4 @@ struct ngx_queue_s { (type *) ((u_char *) q - offsetof(type, link)) -ngx_queue_t *ngx_queue_middle(ngx_queue_t *queue); -void ngx_queue_sort(ngx_queue_t *queue, - ngx_int_t (*cmp)(const ngx_queue_t *, const ngx_queue_t *)); - - #endif /* _NGX_QUEUE_H_INCLUDED_ */ diff --git a/ol.h b/ol.h index fd595e86..3170890a 100644 --- a/ol.h +++ b/ol.h @@ -13,7 +13,7 @@ */ typedef enum { OL_SUCCESS = 0, - OL_EAGAIN = -1, + OL_EPENDING = -1, /* Windows users think WSA_IO_PENDING */ OL_EPIPE = -2, OL_EMEM = -3, } ol_errno; @@ -36,10 +36,10 @@ typedef enum { } ol_handle_type; -typedef ol_read_cb void(*)(ol_handle* h, ol_buf *bufs, int bufcnt); -typedef ol_close_cb void(*)(ol_handle* h, int read, int write, ol_errno err); -typedef ol_connect_cb void(*)(ol_handle* h); -typedef ol_accept_cb void(*)(ol_handle* h, ol_handle *peer); +typedef void(*)(ol_handle* h, ol_buf *bufs, int bufcnt) ol_read_cb; +typedef void(*)(ol_handle* h, int read, int write, ol_errno err) ol_close_cb; +typedef void(*)(ol_handle* h) ol_connect_cb; +typedef void(*)(ol_handle* h, ol_handle *peer) ol_accept_cb; /** @@ -52,29 +52,29 @@ ol_handle* ol_tcp_new(int v4, ol_read_cb read_cb, ol_close_cb close_cb); * Creates a new file handle. The 'read' parameter is boolean indicating if * the file should be read from or created. */ -ol_handle* ol_file_new(char *filename, int read, ol_read_cb cb, - ol_close_cb cb); +ol_handle* ol_file_new(char *filename, int read, ol_read_cb read_cb, + ol_close_cb close_cb); /** * In the case of servers, give a filename. In the case of clients * leave filename NULL. */ -ol_handle* ol_named_pipe_new(char *filename, ol_read_cb cb, - ol_close_cb cb); +ol_handle* ol_named_pipe_new(char *filename, ol_read_cb read_cb, + ol_close_cb close_cb); /** * Allocates a new tty handle. */ -ol_handle* ol_tty_new(ol_tty_read_cb cb, ol_close_cb cb); +ol_handle* ol_tty_new(ol_tty_read_cb read_cb, ol_close_cb close_cb); /** * Only works with named pipes and TCP sockets. */ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len len, - ol_buf* buf, size_t* bytes_sent, ol_connect_cb cb); + ol_buf* buf, ol_connect_cb connect_cb); /** @@ -111,12 +111,15 @@ ol_handle_type ol_get_type(ol_handle* h); /** - * Send data to h. User responsible for bufs until callback is made. + * Send data to handle. User responsible for bufs until callback is made. * Multiple ol_handle_write() calls may be issued before the previous ones * complete - data will sent in the correct order. + * + * Returns zero on succuessful write and bytes_sent is filled with the + * number of bytes successfully written. If an asyncrhonous write was + * successfully initiated then OL_EAGAIN is returned. */ -int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, - size_t* bytes_sent, ol_write_cb cb); +int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb); /** diff --git a/ol_unix.c b/ol_unix.c index 5ad9511f..4dbc9118 100644 --- a/ol_unix.c +++ b/ol_unix.c @@ -79,6 +79,47 @@ static void tcp_check_connect_status(ol_handle* h) { } +ol_bucket* ol_handle_first_bucket(ol_handle* h) { + ngx_queue_t* element = ngx_queue_head(h); + if (!element) { + return NULL; + } + return ngx_queue_data(element, ol_bucket, write_queue); +} + + +static void tcp_flush(oi_loop* loop, ol_handle* h) { + ol_bucket* bucket = ol_handle_first_bucket(h); + + for (; bucket; bucket = ol_handle_first_bucket(h)) { + io_vec* vec = (io_vec*) bucket->bufs[bucket->current_index]; + int remaining_bufcnt = bucket->bufcnt - bucket->current_index; + ssize_t written = writev(h->fd, vec, remaining_bufcnt); + + if (written < 0) { + if (errno == EAGAIN) { + ev_io_start(loop, &h->write_watcher); + } else { + got_error("writev", errno); + } + + } else { + /* See how much was written, increase current_index, and update bufs. */ + oi_buf current = bucket->bufs[bucket->current_index]; + + while (bucket->current_index < bucket->bufcnt) { + if (current.len >= written) { + current = bucket->bufs[++bucket->current_index]; + } else { + bucket->bufs[bucket->current_index].buf += written; + break; + } + } + } + } +} + + static void tcp_connected(ol_handle* h) { assert(h->connecting); if (h->connect_cb) { @@ -86,11 +127,21 @@ static void tcp_connected(ol_handle* h) { } h->connecting = 0; h->connect_cb = NULL; + + ev_io_init(&h->read_watcher, tcp_io, h->fd, EV_READ); + ev_io_start(h->loop, &h->read_watcher); + + if (ngx_queue_empty(&h->write_queue)) { + ev_io_stop(h->loop, &h->write_watcher); + } else { + /* Now that we're connected let's try to flush. */ + tcp_flush(h); + } } int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen, - ol_buf* buf, size_t* bytes_sent, ol_connect_cb cb) { + ol_buf* buf, ol_connect_cb connect_cb) { if (h->connecting) { return got_error("connect", EALREADY); } @@ -101,9 +152,10 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen, if (buf) { /* We're allowed to ol_write before the socket becomes connected. */ - ol_write(h, buf, 1, bytes_sent, cb); + ol_write(h, buf, 1, connect_cb); } else { - h->connect_cb = cb; + /* Nothing to write. Don't call the callback until we're connected. */ + h->connect_cb = connect_cb; } int r = connect(h->fd, h->connect_addr, h->connect_addrlen); @@ -115,6 +167,7 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen, ev_io_init(&h->write_watcher, tcp_io, h->fd, EV_WRITE); ev_io_start(h->loop, &h->write_watcher); } + return got_error("connect", errno); } @@ -127,3 +180,72 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen, int ol_get_fd(ol_handle* h) { return h->fd; } + + +ol_bucket* bucket_new(oi_handle* h, oi_buf* bufs, int bufcnt, ol_write_cb cb) { + ol_bucket* bucket = malloc(sizeof(ol_bucket)); + if (!bucket) { + got_error("malloc", OL_EMEM); + return NULL; + } + + bucket->bufs = bufs; + bucket->bufcnt = bufcnt; + bucket->write_cb = write_cb; + bucket->handle = handle; + ngx_queue_init(&bucket->write_queue); + + return bucket; +} + + +int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb) { + if (!h->connecting && !h->writable) { + return got_error("write", OL_EPIPE); + } + + if (!h->writable) { + /* This happens when writing to a socket which is not connected yet. */ + bucket* b = bucket_new(h, buf, bufcnt, cb); + bucket_append(h, b); + return got_error("write", OL_EPENDING); + } + + ssize_t written; + + /* If the write queue is empty, attempt to write now. */ + if (ngx_queue_empty(&h->write_queue)) { + /* The queue is empty. Attempt the writev immediately. */ + written = writev(h->fd, (io_vec*)bufs, bufcnt); + + if (written >= 0) { + size_t seen = 0; + + /* Figure out what's left to be written */ + for (int i = 0; i < bufcnt; i++) { + seen += bufs[i].len; + + if (seen == written) { + /* We wrote the entire thing. */ + return 0; + } + + if (seen > written) { + break; + } + } + + assert(seen > written); + + /* We've made a partial write of the bufs. bufs[i] is the first buf + * that wasn't totally flushed. We must now add + * bufs[i], bufs[i + 1], ..., bufs[bufcnt - 1] + * to the write queue. + */ + } + + + } + + +} diff --git a/ol_unix.h b/ol_unix.h index 0c9e3917..7eb352c0 100644 --- a/ol_unix.h +++ b/ol_unix.h @@ -4,10 +4,19 @@ typedef struct { char* buf; size_t len; - ngx_queue_s write_queue; } ol_buf; +typedef struct { + ol_buf* bufs; + int bufcnt; + int current_index; + size_t written; + ol_write_cb write_cb; + ol_handle* handle; + ngx_queue_s write_queue; +} ol_bucket; + typedef struct { int fd;