This commit is contained in:
Ryan Dahl 2011-03-24 01:50:23 -07:00
parent 116bd967d6
commit bdda3252bc
4 changed files with 152 additions and 27 deletions

View File

@ -4,10 +4,6 @@
*/
#include <ngx_config.h>
#include <ngx_core.h>
#ifndef _NGX_QUEUE_H_INCLUDED_
#define _NGX_QUEUE_H_INCLUDED_
@ -103,9 +99,4 @@ struct ngx_queue_s {
(type *) ((u_char *) q - offsetof(type, link))
ngx_queue_t *ngx_queue_middle(ngx_queue_t *queue);
void ngx_queue_sort(ngx_queue_t *queue,
ngx_int_t (*cmp)(const ngx_queue_t *, const ngx_queue_t *));
#endif /* _NGX_QUEUE_H_INCLUDED_ */

31
ol.h
View File

@ -13,7 +13,7 @@
*/
typedef enum {
OL_SUCCESS = 0,
OL_EAGAIN = -1,
OL_EPENDING = -1, /* Windows users think WSA_IO_PENDING */
OL_EPIPE = -2,
OL_EMEM = -3,
} ol_errno;
@ -36,10 +36,10 @@ typedef enum {
} ol_handle_type;
typedef ol_read_cb void(*)(ol_handle* h, ol_buf *bufs, int bufcnt);
typedef ol_close_cb void(*)(ol_handle* h, int read, int write, ol_errno err);
typedef ol_connect_cb void(*)(ol_handle* h);
typedef ol_accept_cb void(*)(ol_handle* h, ol_handle *peer);
typedef void(*)(ol_handle* h, ol_buf *bufs, int bufcnt) ol_read_cb;
typedef void(*)(ol_handle* h, int read, int write, ol_errno err) ol_close_cb;
typedef void(*)(ol_handle* h) ol_connect_cb;
typedef void(*)(ol_handle* h, ol_handle *peer) ol_accept_cb;
/**
@ -52,29 +52,29 @@ ol_handle* ol_tcp_new(int v4, ol_read_cb read_cb, ol_close_cb close_cb);
* Creates a new file handle. The 'read' parameter is boolean indicating if
* the file should be read from or created.
*/
ol_handle* ol_file_new(char *filename, int read, ol_read_cb cb,
ol_close_cb cb);
ol_handle* ol_file_new(char *filename, int read, ol_read_cb read_cb,
ol_close_cb close_cb);
/**
* In the case of servers, give a filename. In the case of clients
* leave filename NULL.
*/
ol_handle* ol_named_pipe_new(char *filename, ol_read_cb cb,
ol_close_cb cb);
ol_handle* ol_named_pipe_new(char *filename, ol_read_cb read_cb,
ol_close_cb close_cb);
/**
* Allocates a new tty handle.
*/
ol_handle* ol_tty_new(ol_tty_read_cb cb, ol_close_cb cb);
ol_handle* ol_tty_new(ol_tty_read_cb read_cb, ol_close_cb close_cb);
/**
* Only works with named pipes and TCP sockets.
*/
int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len len,
ol_buf* buf, size_t* bytes_sent, ol_connect_cb cb);
ol_buf* buf, ol_connect_cb connect_cb);
/**
@ -111,12 +111,15 @@ ol_handle_type ol_get_type(ol_handle* h);
/**
* Send data to h. User responsible for bufs until callback is made.
* Send data to handle. User responsible for bufs until callback is made.
* Multiple ol_handle_write() calls may be issued before the previous ones
* complete - data will sent in the correct order.
*
* Returns zero on succuessful write and bytes_sent is filled with the
* number of bytes successfully written. If an asyncrhonous write was
* successfully initiated then OL_EAGAIN is returned.
*/
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt,
size_t* bytes_sent, ol_write_cb cb);
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb);
/**

128
ol_unix.c
View File

@ -79,6 +79,47 @@ static void tcp_check_connect_status(ol_handle* h) {
}
ol_bucket* ol_handle_first_bucket(ol_handle* h) {
ngx_queue_t* element = ngx_queue_head(h);
if (!element) {
return NULL;
}
return ngx_queue_data(element, ol_bucket, write_queue);
}
static void tcp_flush(oi_loop* loop, ol_handle* h) {
ol_bucket* bucket = ol_handle_first_bucket(h);
for (; bucket; bucket = ol_handle_first_bucket(h)) {
io_vec* vec = (io_vec*) bucket->bufs[bucket->current_index];
int remaining_bufcnt = bucket->bufcnt - bucket->current_index;
ssize_t written = writev(h->fd, vec, remaining_bufcnt);
if (written < 0) {
if (errno == EAGAIN) {
ev_io_start(loop, &h->write_watcher);
} else {
got_error("writev", errno);
}
} else {
/* See how much was written, increase current_index, and update bufs. */
oi_buf current = bucket->bufs[bucket->current_index];
while (bucket->current_index < bucket->bufcnt) {
if (current.len >= written) {
current = bucket->bufs[++bucket->current_index];
} else {
bucket->bufs[bucket->current_index].buf += written;
break;
}
}
}
}
}
static void tcp_connected(ol_handle* h) {
assert(h->connecting);
if (h->connect_cb) {
@ -86,11 +127,21 @@ static void tcp_connected(ol_handle* h) {
}
h->connecting = 0;
h->connect_cb = NULL;
ev_io_init(&h->read_watcher, tcp_io, h->fd, EV_READ);
ev_io_start(h->loop, &h->read_watcher);
if (ngx_queue_empty(&h->write_queue)) {
ev_io_stop(h->loop, &h->write_watcher);
} else {
/* Now that we're connected let's try to flush. */
tcp_flush(h);
}
}
int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
ol_buf* buf, size_t* bytes_sent, ol_connect_cb cb) {
ol_buf* buf, ol_connect_cb connect_cb) {
if (h->connecting) {
return got_error("connect", EALREADY);
}
@ -101,9 +152,10 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
if (buf) {
/* We're allowed to ol_write before the socket becomes connected. */
ol_write(h, buf, 1, bytes_sent, cb);
ol_write(h, buf, 1, connect_cb);
} else {
h->connect_cb = cb;
/* Nothing to write. Don't call the callback until we're connected. */
h->connect_cb = connect_cb;
}
int r = connect(h->fd, h->connect_addr, h->connect_addrlen);
@ -115,6 +167,7 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
ev_io_init(&h->write_watcher, tcp_io, h->fd, EV_WRITE);
ev_io_start(h->loop, &h->write_watcher);
}
return got_error("connect", errno);
}
@ -127,3 +180,72 @@ int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
int ol_get_fd(ol_handle* h) {
return h->fd;
}
ol_bucket* bucket_new(oi_handle* h, oi_buf* bufs, int bufcnt, ol_write_cb cb) {
ol_bucket* bucket = malloc(sizeof(ol_bucket));
if (!bucket) {
got_error("malloc", OL_EMEM);
return NULL;
}
bucket->bufs = bufs;
bucket->bufcnt = bufcnt;
bucket->write_cb = write_cb;
bucket->handle = handle;
ngx_queue_init(&bucket->write_queue);
return bucket;
}
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb) {
if (!h->connecting && !h->writable) {
return got_error("write", OL_EPIPE);
}
if (!h->writable) {
/* This happens when writing to a socket which is not connected yet. */
bucket* b = bucket_new(h, buf, bufcnt, cb);
bucket_append(h, b);
return got_error("write", OL_EPENDING);
}
ssize_t written;
/* If the write queue is empty, attempt to write now. */
if (ngx_queue_empty(&h->write_queue)) {
/* The queue is empty. Attempt the writev immediately. */
written = writev(h->fd, (io_vec*)bufs, bufcnt);
if (written >= 0) {
size_t seen = 0;
/* Figure out what's left to be written */
for (int i = 0; i < bufcnt; i++) {
seen += bufs[i].len;
if (seen == written) {
/* We wrote the entire thing. */
return 0;
}
if (seen > written) {
break;
}
}
assert(seen > written);
/* We've made a partial write of the bufs. bufs[i] is the first buf
* that wasn't totally flushed. We must now add
* bufs[i], bufs[i + 1], ..., bufs[bufcnt - 1]
* to the write queue.
*/
}
}
}

View File

@ -4,10 +4,19 @@
typedef struct {
char* buf;
size_t len;
ngx_queue_s write_queue;
} ol_buf;
typedef struct {
ol_buf* bufs;
int bufcnt;
int current_index;
size_t written;
ol_write_cb write_cb;
ol_handle* handle;
ngx_queue_s write_queue;
} ol_bucket;
typedef struct {
int fd;