New start

This commit is contained in:
Ryan Dahl 2011-03-28 01:55:29 -07:00
parent dc2d8bc313
commit cb17a5e37a
8 changed files with 959 additions and 458 deletions

View File

@ -16,6 +16,24 @@
dt { margin-top: 1em; }
dd { margin-bottom: 1em; }
table {
margin: 1em 0;
padding: 0;
}
table th {
text-align: left;
padding: 0.5em;
white-space: nowrap;
}
table td {
padding: 0.5em;
text-align: left;
white-space: nowrap;
}
</style>
<title>Asynchronous I/O in Windows for Unix Programmers</title>
</head>
@ -67,14 +85,15 @@ the data and then wait for it to have been sent.
<p> Unix non-blocking I/O is not beautiful. A major feature of Unix is the
unified treatment of all things as files (or more precisely as file
descriptors); TCP sockets work with <code>write(2)</code>,
<code>read(2)</code>, and <code>close(2)</code> just as they do on regular
files. Well, kind of. Synchronous operations work similarly on different
descriptors). <code>write(2)</code>, <code>read(2)</code>, and
<code>close(2)</code> work with TCP sockets just as they do on regular
files. Well&mdash;kind of. Synchronous operations work similarly on different
types of file descriptors but once demands on performance drive you to world of
<code>O_NONBLOCK</code>, various types of file descriptors can act quite
<code>O_NONBLOCK</code> various types of file descriptors can act quite
different for even the most basic operations. In particular,
regular file system files do <i>not</i> support non-blocking operations (and not a
single man page mentions it).
regular file system files do <i>not</i> support non-blocking operations.
(Disturbingly no man page mentions this rather important fact.)
For example, one cannot poll on a regular file FD for readability expecting
it to indicate when it is safe to do a non-blocking read.
@ -103,17 +122,154 @@ Common practice for accessing the disk asynchronously is still done using custom
userland thread pools&mdash;not POSIX AIO.
<p>Windows IOCP does support both sockets and regular file I/O which
greatly simplifies the handling of disks.
greatly simplifies the handling of disks. Although the function names are
not exactly the same in Windows for sockets and regular files, the
they act similar.
<table>
<tr>
<th></th>
<th>Regular File Read</th>
<th>Regular File Write</th>
<th>Socket Read</th>
<th>Socket Write</th>
</tr>
<tr>
<th>Windows</th>
<td><code>ReadFile()</code></td>
<td><code>WriteFile()</code></td>
<td><code>WSARecv()</code></td>
<td><code>WSASend()</code></td>
</tr>
<tr>
<th>POSIX</th>
<td><code>read()</code></td>
<td><code>write()</code></td>
<td><code>read()</code> or <code>recv()</code></td>
<td><code>write()</code> or <code>send()</code></td>
</tr>
</table>
<p><code>ReadFile()</code> and <code>WSARecv()</code> operate on regular
files and sockets, respectively. They both are for sending data. Both take a
<a href="http://msdn.microsoft.com/en-us/library/ms741665(v=VS.85).aspx"><code>OVERLAPPED</code></a>
argument.
<pre>
typedef void* HANDLE;
typedef HANDLE SOCKET;
BOOL ReadFile(HANDLE file,
void* buffer,
DWORD numberOfBytesToRead,
DWORD* numberOfBytesRead,
OVERLAPPED* overlapped);
int WSARecv(SOCKET s,
WSABUF* buffers,
DWORD bufferCount,
DWORD* numberOfBytesRecvd,
DWORD* flags,
OVERLAPPED* overlapped,
OVERLAPPED_COMPLETION_ROUTINE completionRoutine);
</pre>
For now ignore the <code>completionRoutine</code> parameter in
<code>WSARecv</code>.
<p>
Both functions have the possibility of executing the read synchronously
or asynchronously. A synchronous operation is indicated by
returning 0 and <a
href="http://msdn.microsoft.com/en-us/library/ms741580(v=VS.85).aspx">WSAGetLastError()</code></a>
returning <code>WSA_IO_PENDING</code>.
<p>
When either function operates asynchronously the
the user-supplied <a
href="http://msdn.microsoft.com/en-us/library/ms741665(v=VS.85).aspx"><code>OVERLAPPED*</code></a>
is a handle to the incomplete operation.
<pre>
typedef struct {
unsigned long* Internal;
unsigned long* InternalHigh;
union {
struct {
WORD Offset;
WORD OffsetHigh;
};
void* Pointer;
};
HANDLE hEvent;
} OVERLAPPED;
</pre>
To poll on the completion of one of these functions,
use an IOCP, <code>overlapped->hEvent</code>, and
<a
href="http://msdn.microsoft.com/en-us/library/aa364986(v=vs.85).aspx"><code>GetQueuedCompletionStatus()</code></a>.
<h3>Simple TCP Connection Example</h3>
<p>To demonstrate the use of <code>GetQueuedCompletionStatus()</code> an
example of connecting to <code>localhost</code> at port 8000 is presented.
<pre>
char* buffer[200];
WSABUF b = { buffer, 200 };
size_t bytes_recvd;
int r, total_events;
OVERLAPPED overlapped;
HANDLE port;
port = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
if (!port) {
goto error;
}
r = WSARecv(socket, &b, 1, &bytes_recvd, NULL, &overlapped, NULL);
CreateIoCompletionPort(port, &overlapped.hEvent,
if (r == 0) {
if (WSAGetLastError() == WSA_IO_PENDING) {
/* Asynchronous */
GetQueuedCompletionStatus()
if (r == WAIT_TIMEOUT) {
printf("Timeout\n");
} else {
}
} else {
/* Error */
printf("Error %d\n", WSAGetLastError());
}
} else {
/* Synchronous */
printf("read %ld bytes from socket\n", bytes_recvd);
}
</pre>
<h3>Previous Work</h3>
<p> Writing code that can take advantage of the best worlds on across Unix operating
systems and Windows is very difficult, requiring one to understand intricate
APIs and undocumented details from many different operating systems. There
are several projects which have made attempts to provide an abstraction
layer but in the author's opinion, none are satisfactory.
<ul>
<li>Marc Lehmann's
layer but in the author's opinion, none are completely satisfactory.
<p><b>Marc Lehmann's
<a href="http://software.schmorp.de/pkg/libev.html">libev</a> and
<a href="http://software.schmorp.de/pkg/libeio.html">libeio</a>.
<a href="http://software.schmorp.de/pkg/libeio.html">libeio</a>.</b>
libev is the perfect minimal abstraction of the Unix I/O multiplexers. It
includes several helpful tools like <code>ev_async</code>, which is for
asynchronous notification, but the main piece is the <code>ev_io</code>,
@ -130,7 +286,7 @@ layer but in the author's opinion, none are satisfactory.
<code>select()</code>&mdash;which is limited to 64 file descriptors per
thread.
<li><a href="http://monkey.org/~provos/libevent/">libevent</a>.
<p><b><a href="http://monkey.org/~provos/libevent/">libevent</a>.</b>
Somewhat bulkier than libev with code for RPC, DNS, and HTTP included.
Does not support file I/O.
libev was
@ -143,15 +299,16 @@ layer but in the author's opinion, none are satisfactory.
href="http://www.mail-archive.com/libevent-users@monkey.org/msg01730.html">anecdotal
evidence</a> suggests that it is still not working correctly.
<li><a
<p><b><a
href="http://www.boost.org/doc/libs/1_43_0/doc/html/boost_asio.html">Boost
ASIO</a>
</ul>
ASIO</a>.</b> It basically does what you want on Windows and Unix for
sockets. That is, epoll on Linux, kqueue on Macintosh, IOCP on Windows.
It does not support file I/O. In the author's opinion is it too large
for a not extremely difficult problem (~300 files, ~12000 semicolons).
<h2>File Types</h2>
<p>
Almost every socket operation that you're familiar with has an
overlapped counter-part. The following section tries to pair Windows
@ -311,7 +468,7 @@ Examples:
</ul>
<h3>On Disk Files</h3>
<h3>Regular Files</h3>
<p>
In Unix file system files are not able to use non-blocking I/O. There are
@ -392,7 +549,7 @@ is also blocking but this is probably acceptable.
<h2 id="foot2">Links</h2>
<h2>Assorted Links</h2>
<p>
tips
<ul>
@ -405,7 +562,7 @@ tips
IOCP:
<ul>
<li><a href="http://msdn.microsoft.com/en-us/library/ms686358(v=vs.85).aspx">Synchronization and Overlapped Input and Output</a>
<li><a href="http://msdn.microsoft.com/en-us/library/ms741665(v=VS.85).aspx"><code>WSAOVERLAPPED</code> Structure</a>
<li><a href="http://msdn.microsoft.com/en-us/library/ms741665(v=VS.85).aspx"><code>OVERLAPPED</code> Structure</a>
<ul>
<li><a href="http://msdn.microsoft.com/en-us/library/ms683209(v=VS.85).aspx"><code>GetOverlappedResult()</code></a>
<li><a href="http://msdn.microsoft.com/en-us/library/ms683244(v=VS.85).aspx"><code>HasOverlappedIoCompleted()</code></a>
@ -445,6 +602,7 @@ Pipes:
<li><a href="http://msdn.microsoft.com/en-us/library/aa365146(v=VS.85).aspx"><code>ConnectNamedPipe</code></a>
</ul>
<code>WaitForMultipleObjectsEx</code> is pronounced "wait for multiple object sex".
Also useful:
<a

187
ol-old.h Normal file
View File

@ -0,0 +1,187 @@
/**
* Overlapped I/O for every operating system.
*/
#ifdef __POSIX__
# include "ol-unix.h"
#else
# include "ol-win.h"
#endif
typedef struct {
int code;
const char* msg;
} ol_err;
/**
* Error codes are not cross-platform, so we have our own.
*/
typedef enum {
OL_SUCCESS = 0,
OL_EPENDING = -1,
OL_EPIPE = -2,
OL_EMEM = -3
} ol_err;
inline const char* ol_err_string(int errorno) {
switch (errorno) {
case OL_SUCCESS:
case OL_EPENDING:
return "";
case OL_EPIPE:
return "EPIPE: Write to non-writable handle";
case OL_EMEM:
return "EMEM: Out of memory!";
default:
assert(0);
return "Unknown error code. Bug.";
}
}
/**
* Do not make assumptions about the order of the elements in this sturct.
* Always use offsetof because the order is platform dependent. Has a char*
* buf and size_t len. That's all you need to know.
*/
struct ol_buf;
typedef enum {
OL_TCP,
OL_TCP6,
OL_NAMED_PIPE,
OL_FILE,
OL_TTY
} ol_handle_type;
typedef void(*)(ol_handle* h, ol_buf *bufs, int bufcnt) ol_read_cb;
typedef void(*)(ol_handle* h) ol_connect_cb;
typedef void(*)(ol_handle* h, ol_handle *peer) ol_accept_cb;
typedef void(*)(ol_handle* h) ol_write_cb;
typedef enum {
OL_READ,
OL_WRITE,
OL_CONNECT,
OL_ACCEPT,
OL_DESTROY
} ol_req_type;
typedef struct {
ol_req_type type;
ol_req_private _;
/* following are rw */
union {
ol_write_cb write_cb;
ol_connect_cb connect_cb;
};
void* data; /* rw */
} ol_req;
ol_handle* ol_handle_new();
ol_handle* ol_open_file(ol_handle* h, ol_req* req, char *filename);
ol_handle* ol_open_named_pipe(ol_handle* h, ol_req* req, char *filename);
ol_handle* ol_open_tty(ol_handle* h, ol_req* req);
struct sockaddr oi_ip4_addr(char*, int port);
/**
* Depth of write buffer in bytes.
*/
size_t ol_buffer_size(ol_handle* h);
/**
* Returns file descriptor associated with the handle. There may be only
* limited numbers of file descriptors allowed by the operating system. On
* Windows this limit is 2048 (see
* _setmaxstdio[http://msdn.microsoft.com/en-us/library/6e3b887c.aspx])
*/
int ol_get_fd(ol_handle* h);
/**
* Returns the type of the handle.
*/
ol_handle_type ol_get_type(ol_handle* h);
/**
* Only works with named pipes and TCP sockets.
*/
int ol_connect(ol_handle* h, ol_req* req, sockaddr* addr, ol_buf* initial_buf);
int ol_accept(ol_handle* h, ol_req* req);
/**
* Send data to handle. User responsible for bufs until callback is made.
* Multiple ol_handle_write() calls may be issued before the previous ones
* complete - data will sent in the correct order.
*
* Returns zero on succuessful write and bytes_sent is filled with the
* number of bytes successfully written. If an asyncrhonous write was
* successfully initiated then OL_EAGAIN is returned.
*/
int ol_write(ol_handle* h, ol_req* req, ol_buf* bufs, int bufcnt);
int ol_write2(ol_handle* h, ol_req* req, const char *string);
int ol_read(ol_handle* h, ol_req* req, ol_buf* bufs, int bufcnt);
/**
* Works on both named pipes and TCP handles. Synchronous.
*/
int ol_listen(ol_handle* h, int backlog);
/**
* See http://msdn.microsoft.com/en-us/library/ms737757(v=VS.85).aspx
*/
int ol_disconnect(ol_handle* h, ol_req* req);
/**
* Immediately closes the handle. If there is data in the send buffer
* it will not be sent.
*/
int ol_close(ol_handle* h);
/**
* Releases memory associated with handle. You MUST call this after
* is made with both 0 arguments.
*/
int ol_free(ol_handle* h);
ol_loop* ol_loop_new();
void ol_associate(ol_loop* loop, ol_handle* handle);
void ol_loop_free(ol_loop* loop);
void ol_run(ol_loop* loop);

300
ol-unix-old.c Normal file
View File

@ -0,0 +1,300 @@
#include "ol.h"
ol_loop* ol_loop_new() {
ol_loop* loop = calloc(sizeof(ol_loop), 1);
if (!loop) {
return NULL;
}
loop.evloop = ev_loop_new(0);
if (!loop.evloop) {
return NULL;
}
return loop;
}
void ol_associate(ol_loop* loop, ol_handle* handle) {
assert(!handle->loop);
handle->loop = loop;
}
void ol_run(ol_loop *loop) {
ev_run(loop, 0);
}
ol_handle* ol_tcp_new(int v4, sockaddr* addr, sockaddr_len len,
ol_read_cb read_cb, ol_close_cb close_cb) {
ol_handle *handle = calloc(sizeof(ol_handle), 1);
if (!handle) {
return NULL;
}
handle->read_cb = read_cb;
handle->close_cb = close_cb;
handle->type = v4 ? OL_TCP : OL_TCP6;
int domain = v4 ? AF_INET : AF_INET6;
handle->fd = socket(domain, SOCK_STREAM, 0);
if (handle->fd == -1) {
free(handle);
got_error("socket", err);
return NULL;
}
/* Lose the pesky "Address already in use" error message */
int yes = 1;
int r = setsockopt(handle->fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if (r == -1) {
close(handle->fd);
free(handle);
unhandled_error("setsockopt", r);
return NULL;
}
/* We auto-bind the specified address */
if (addr) {
int r = bind(handle->fd, addr, v4 ? sizeof(sockaddr_in) :
sizeof(sockaddr_in6));
if (r < 0) {
got_error("bind", errno);
close(handle->fd);
free(handle);
return NULL;
}
}
return handle;
}
static void tcp_io(EV_P_ ev_io *w, int revents) {
ol_handle* h = (ol_handle*)w->data;
if (h->connecting) {
tcp_check_connect_status(h);
} else {
}
}
static void tcp_check_connect_status(ol_handle* h) {
assert(h->connecting);
int error;
socklen_t len = sizeof(int);
getsockopt(h->fd, SOL_SOCKET, SO_ERROR, &error, &len);
if (error == 0) {
tcp_connected(h);
} else if (err != EINPROGRESS) {
close(h->fd);
got_error("connect", err);
}
/* EINPROGRESS - unlikely. What to do? */
}
ol_bucket* ol_handle_first_bucket(ol_handle* h) {
ngx_queue_t* element = ngx_queue_head(h);
if (!element) {
return NULL;
}
return ngx_queue_data(element, ol_bucket, write_queue);
}
static void tcp_flush(oi_loop* loop, ol_handle* h) {
ol_bucket* bucket = ol_handle_first_bucket(h);
for (; bucket; bucket = ol_handle_first_bucket(h)) {
io_vec* vec = (io_vec*) bucket->bufs[bucket->current_index];
int remaining_bufcnt = bucket->bufcnt - bucket->current_index;
ssize_t written = writev(h->fd, vec, remaining_bufcnt);
if (written < 0) {
if (err == EAGAIN) {
ev_io_start(loop, &h->write_watcher);
} else {
got_error("writev", err);
}
} else {
/* See how much was written, increase current_index, and update bufs. */
oi_buf current = bucket->bufs[bucket->current_index];
while (bucket->current_index < bucket->bufcnt) {
if (current.len >= written) {
current = bucket->bufs[++bucket->current_index];
} else {
bucket->bufs[bucket->current_index].buf += written;
break;
}
}
}
}
}
static void tcp_connected(ol_handle* h) {
assert(h->connecting);
if (h->connect_cb) {
h->connect_cb(h);
}
h->connecting = 0;
h->connect_cb = NULL;
ev_io_init(&h->read_watcher, tcp_io, h->fd, EV_READ);
ev_io_start(h->loop, &h->read_watcher);
if (ngx_queue_empty(&h->write_queue)) {
ev_io_stop(h->loop, &h->write_watcher);
} else {
/* Now that we're connected let's try to flush. */
tcp_flush(h);
}
}
int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
ol_buf* buf, ol_connect_cb connect_cb) {
if (h->connecting) {
return got_error("connect", EALREADY);
}
h->connecting = 1;
h->connect_addr = addr;
h->connect_addrlen = addrlen;
if (buf) {
/* We're allowed to ol_write before the socket becomes connected. */
ol_write(h, buf, 1, connect_cb);
} else {
/* Nothing to write. Don't call the callback until we're connected. */
h->connect_cb = connect_cb;
}
int r = connect(h->fd, h->connect_addr, h->connect_addrlen);
if (r != 0) {
if (err == EINPROGRESS) {
/* Wait for fd to become writable. */
h->connecting = 1;
ev_io_init(&h->write_watcher, tcp_io, h->fd, EV_WRITE);
ev_io_start(h->loop, &h->write_watcher);
}
return got_error("connect", err);
}
/* Connected */
tcp_connected(h);
return 0;
}
int ol_get_fd(ol_handle* h) {
return h->fd;
}
ol_bucket* bucket_new(oi_handle* h, oi_buf* bufs, int bufcnt, ol_write_cb cb) {
ol_bucket* bucket = malloc(sizeof(ol_bucket));
if (!bucket) {
got_error("malloc", OL_EMEM);
return NULL;
}
bucket->bufs = bufs;
bucket->bufcnt = bufcnt;
bucket->write_cb = write_cb;
bucket->handle = handle;
ngx_queue_init(&bucket->write_queue);
return bucket;
}
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb) {
if (!h->connecting && !h->writable) {
return got_error("write", OL_EPIPE);
}
if (!h->writable) {
/* This happens when writing to a socket which is not connected yet. */
bucket* b = bucket_new(h, buf, bufcnt, cb);
bucket_append(h, b);
return got_error("write", OL_EPENDING);
}
ssize_t written;
/* If the write queue is empty, attempt to write now. */
if (ngx_queue_empty(&h->write_queue)) {
/* The queue is empty. Attempt the writev immediately. */
written = writev(h->fd, (io_vec*)bufs, bufcnt);
if (written >= 0) {
size_t seen = 0;
/* Figure out what's left to be written */
for (int i = 0; i < bufcnt; i++) {
seen += bufs[i].len;
if (seen == written) {
/* We wrote the entire thing. */
return 0;
}
if (seen > written) {
break;
}
}
assert(seen > written);
/* We've made a partial write of the bufs. bufs[i] is the first buf
* that wasn't totally flushed. We must now add
* bufs[i], bufs[i + 1], ..., bufs[bufcnt - 1]
* to the write queue.
*/
}
}
}
int ol_write2(ol_handle* h, char *base, size_t len) {
ol_buf buf;
buf.base = base;
buf.len = len;
return ol_write(h, &buf, 1, NULL);
}
int ol_write3(ol_handle* h, const char *string) {
/* You're doing it wrong if strlen(string) > 1mb. */
return ol_write2(h, string, strnlen(string, 1024 * 1024));
}
struct sockaddr oi_ip4_addr(char *ip, int port) {
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
return addr;
}

397
ol-unix.c
View File

@ -1,300 +1,169 @@
#include "ol.h"
ol_loop* ol_loop_new() {
ol_loop* loop = calloc(sizeof(ol_loop), 1);
if (!loop) {
return NULL;
}
loop.evloop = ev_loop_new(0);
if (!loop.evloop) {
return NULL;
}
return loop;
}
#include <sys/types.h>
#include <sys/socket.h>
void ol_associate(ol_loop* loop, ol_handle* handle) {
assert(!handle->loop);
handle->loop = loop;
}
void ol_run(ol_loop *loop) {
ev_run(loop, 0);
}
ol_handle* ol_tcp_new(int v4, sockaddr* addr, sockaddr_len len,
ol_read_cb read_cb, ol_close_cb close_cb) {
ol_handle *handle = calloc(sizeof(ol_handle), 1);
if (!handle) {
return NULL;
}
handle->read_cb = read_cb;
handle->close_cb = close_cb;
handle->type = v4 ? OL_TCP : OL_TCP6;
int domain = v4 ? AF_INET : AF_INET6;
handle->fd = socket(domain, SOCK_STREAM, 0);
if (handle->fd == -1) {
free(handle);
got_error("socket", err);
return NULL;
}
/* Lose the pesky "Address already in use" error message */
int yes = 1;
int r = setsockopt(handle->fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
if (r == -1) {
close(handle->fd);
free(handle);
unhandled_error("setsockopt", r);
return NULL;
}
/* We auto-bind the specified address */
if (addr) {
int r = bind(handle->fd, addr, v4 ? sizeof(sockaddr_in) :
sizeof(sockaddr_in6));
if (r < 0) {
got_error("bind", errno);
close(handle->fd);
free(handle);
return NULL;
}
}
return handle;
}
static void tcp_io(EV_P_ ev_io *w, int revents) {
ol_handle* h = (ol_handle*)w->data;
if (h->connecting) {
tcp_check_connect_status(h);
static int got_error(int e) {
if (e == 0) {
return e;
} else {
}
}
static void tcp_check_connect_status(ol_handle* h) {
assert(h->connecting);
ol_handle* ol_handle_new(ol_close_cb close_cb, void* data) {
ol_handle *handle = calloc(sizeof(ol_handle), 1);
handle->close_cb = close_cb;
handle->data = data;
ev_init(&handle->read_watcher, ol_tcp_io);
ev_init(&handle->write_watcher, ol_tcp_io);
retuen handle;
}
int ol_bind(ol_handle* handle, sockaddr* addr) {
int addrsize;
if (addr->sa_family == AF_INET) {
addrsize = sizeof(sockaddr_in);
} else if (addr->sa_family == AF_INET6) {
addrsize = sizeof(sockaddr_in6);
} else {
assert(0);
return -1
}
int r = bind(handle->_.fd, addr, addrsize);
return ol_err_new(r);
}
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) {
int r = listen(handle->_.fd, backlog);
handle->accept_cb = cb;
return ol_err_new(r);
}
void ol_close_error(ol_handle* handle, ol_err err) {
ev_io_stop(&handle->read_watcher);
close(handle->fd);
handle->fd = -1;
if (handle->close_cb) {
handle->close_cb(handle, ol_err_new(error));
}
}
void ol_tcp_io(EV_P_ ev_io* watcher, int revents) {
ev_handle* handle = watcher->data;
if (handle->connect_req) {
ol_tcp_connect(handle, handle->connect_req);
} else {
}
assert(handle->_.fd >= 0);
}
/**
* We get called here from directly following a call to connect(2).
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
void ol_tcp_connect(ev_handle* handle, ev_req* req) {
assert(handle->_.fd >= 0);
assert(req);
assert(req->type == OL_CONNECT);
int error;
socklen_t len = sizeof(int);
getsockopt(h->fd, SOL_SOCKET, SO_ERROR, &error, &len);
getsockopt(handle->_.fd, SOL_SOCKET, SO_ERROR, &error, sizeof(int));
if (error == 0) {
tcp_connected(h);
} else if (err != EINPROGRESS) {
close(h->fd);
got_error("connect", err);
}
if (!error) {
ev_io_init(&handle->write_watcher, tcp_io, handle->_.fd, EV_WRITE);
ev_set_cb(&handle->read_watcher, tcp_io);
/* EINPROGRESS - unlikely. What to do? */
}
ol_bucket* ol_handle_first_bucket(ol_handle* h) {
ngx_queue_t* element = ngx_queue_head(h);
if (!element) {
return NULL;
}
return ngx_queue_data(element, ol_bucket, write_queue);
}
static void tcp_flush(oi_loop* loop, ol_handle* h) {
ol_bucket* bucket = ol_handle_first_bucket(h);
for (; bucket; bucket = ol_handle_first_bucket(h)) {
io_vec* vec = (io_vec*) bucket->bufs[bucket->current_index];
int remaining_bufcnt = bucket->bufcnt - bucket->current_index;
ssize_t written = writev(h->fd, vec, remaining_bufcnt);
if (written < 0) {
if (err == EAGAIN) {
ev_io_start(loop, &h->write_watcher);
/* Successful connection */
ol_connect_cb connect_cb = req->connect_cb;
if (connect_cb) {
if (req->_.local) {
connect_cb(NULL, ol_err_new(0));
} else {
got_error("writev", err);
}
} else {
/* See how much was written, increase current_index, and update bufs. */
oi_buf current = bucket->bufs[bucket->current_index];
while (bucket->current_index < bucket->bufcnt) {
if (current.len >= written) {
current = bucket->bufs[++bucket->current_index];
} else {
bucket->bufs[bucket->current_index].buf += written;
break;
}
connect_cb(req, ol_err_new(0));
}
}
}
}
/* Free up connect_req if we own it. */
if (req->_.local) {
free(req);
}
static void tcp_connected(ol_handle* h) {
assert(h->connecting);
if (h->connect_cb) {
h->connect_cb(h);
}
h->connecting = 0;
h->connect_cb = NULL;
req = NULL;
ev_io_init(&h->read_watcher, tcp_io, h->fd, EV_READ);
ev_io_start(h->loop, &h->read_watcher);
} else if (error == EINPROGRESS) {
/* Still connecting. */
return;
if (ngx_queue_empty(&h->write_queue)) {
ev_io_stop(h->loop, &h->write_watcher);
} else {
/* Now that we're connected let's try to flush. */
tcp_flush(h);
if (req->connect_cb) {
req->connect_cb(req, ol_err_new(error));
}
ol_close_error(handle, ol_err_new(error));
}
}
int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen,
ol_buf* buf, ol_connect_cb connect_cb) {
if (h->connecting) {
return got_error("connect", EALREADY);
int ol_connect(ol_handle* handle, ol_req *req_in, sockaddr* addr) {
if (handle->_.connect_req) {
return ol_err_new(EALREADY);
}
h->connecting = 1;
h->connect_addr = addr;
h->connect_addrlen = addrlen;
if (handle->type != OL_TCP) {
return ol_err_new(ENOTSOCK);
}
if (buf) {
/* We're allowed to ol_write before the socket becomes connected. */
ol_write(h, buf, 1, connect_cb);
ol_req *req = ol_req_maybe_alloc(handle, req_in);
if (!req) {
return ol_err_new(ENOMEM);
}
handle->_.connect_req = req;
int addrsize;
if (addr->sa_family == AF_INET) {
addrsize = sizeof(sockaddr_in);
handle->_.fd = socket(AF_INET, SOCK_STREAM, 0);
} else if (addr->sa_family == AF_INET6) {
addrsize = sizeof(sockaddr_in6);
handle->_.fd = socket(AF_INET6, SOCK_STREAM, 0);
} else {
/* Nothing to write. Don't call the callback until we're connected. */
h->connect_cb = connect_cb;
assert(0);
return -1
}
int r = connect(h->fd, h->connect_addr, h->connect_addrlen);
if (r != 0) {
if (err == EINPROGRESS) {
/* Wait for fd to become writable. */
h->connecting = 1;
ev_io_init(&h->write_watcher, tcp_io, h->fd, EV_WRITE);
ev_io_start(h->loop, &h->write_watcher);
}
return got_error("connect", err);
/* socket(2) failed */
if (handle->_.fd < 0) {
return ol_err_new(errno);
}
/* Connected */
tcp_connected(h);
return 0;
int r = connect(handle->_.fd, addr, addrsize);
ev_io_init(&handle->read_watcher, ol_tcp_connect, handle->_.fd, EV_READ);
ev_io_init(&handle->write_watcher, ol_tcp_io, handle->_.fd, EV_WRITE);
ev_io_start(&handle->read_watcher);
return ol_err_new(r);
}
int ol_get_fd(ol_handle* h) {
return h->fd;
}
ol_bucket* bucket_new(oi_handle* h, oi_buf* bufs, int bufcnt, ol_write_cb cb) {
ol_bucket* bucket = malloc(sizeof(ol_bucket));
if (!bucket) {
got_error("malloc", OL_EMEM);
return NULL;
}
bucket->bufs = bufs;
bucket->bufcnt = bufcnt;
bucket->write_cb = write_cb;
bucket->handle = handle;
ngx_queue_init(&bucket->write_queue);
return bucket;
}
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb) {
if (!h->connecting && !h->writable) {
return got_error("write", OL_EPIPE);
}
if (!h->writable) {
/* This happens when writing to a socket which is not connected yet. */
bucket* b = bucket_new(h, buf, bufcnt, cb);
bucket_append(h, b);
return got_error("write", OL_EPENDING);
}
ssize_t written;
/* If the write queue is empty, attempt to write now. */
if (ngx_queue_empty(&h->write_queue)) {
/* The queue is empty. Attempt the writev immediately. */
written = writev(h->fd, (io_vec*)bufs, bufcnt);
if (written >= 0) {
size_t seen = 0;
/* Figure out what's left to be written */
for (int i = 0; i < bufcnt; i++) {
seen += bufs[i].len;
if (seen == written) {
/* We wrote the entire thing. */
return 0;
}
if (seen > written) {
break;
}
}
assert(seen > written);
/* We've made a partial write of the bufs. bufs[i] is the first buf
* that wasn't totally flushed. We must now add
* bufs[i], bufs[i + 1], ..., bufs[bufcnt - 1]
* to the write queue.
*/
}
}
}
int ol_write2(ol_handle* h, char *base, size_t len) {
ol_buf buf;
buf.base = base;
buf.len = len;
return ol_write(h, &buf, 1, NULL);
}
int ol_write3(ol_handle* h, const char *string) {
/* You're doing it wrong if strlen(string) > 1mb. */
return ol_write2(h, string, strnlen(string, 1024 * 1024));
}
struct sockaddr oi_ip4_addr(char *ip, int port) {
sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip);
return addr;
}

View File

@ -18,9 +18,13 @@ typedef struct {
} ol_bucket;
typedef struct {
} ol_req_private;
typedef struct {
int fd;
ol_handle_type type;
ol_read_cb read_cb;
ol_close_cb close_cb;
@ -31,4 +35,4 @@ typedef struct {
ngx_queue_s write_queue;
ngx_queue_s all_handles;
} ol_handle;
} ol_handle_private;

View File

@ -10,3 +10,9 @@ typedef struct _ol_buf {
_ol_buf* next;
_ol_buf* prev;
} ol_buf;
typedef struct {
OVERLAPPED overlapped;
ngx_queue_t queue;
} ol_req_private;

230
ol.h
View File

@ -1,192 +1,78 @@
/**
* Overlapped I/O for every operating system.
*/
typedef int ol_err; // FIXME
#ifdef __POSIX__
# include "ol-unix.h"
#else
# include "ol-win.h"
#endif
typedef struct {
int code;
const char* msg;
} ol_err;
/**
* Error codes are not cross-platform, so we have our own.
*/
typedef enum {
OL_SUCCESS = 0,
OL_EPENDING = -1,
OL_EPIPE = -2,
OL_EMEM = -3
} ol_err;
inline const char* ol_err_string(int errorno) {
switch (errorno) {
case OL_SUCCESS:
case OL_EPENDING:
return "";
case OL_EPIPE:
return "EPIPE: Write to non-writable handle";
case OL_EMEM:
return "EMEM: Out of memory!";
default:
assert(0);
return "Unknown error code. Bug.";
}
}
/**
* Do not make assumptions about the order of the elements in this sturct.
* Always use offsetof because the order is platform dependent. Has a char*
* buf and size_t len. That's all you need to know.
*/
struct ol_buf;
typedef void (*)(ol_req* req, ol_err e) ol_req_cb;
typedef void (*)(ol_req* req, size_t nread, ol_err e) ol_read_cb;
typedef void (*)(ol_req* req, ol_err e) ol_write_cb;
typedef void (*)(ol_handle* server, ol_handle* new_client) ol_accept_cb;
typedef void (*)(ol_handle* handle, ol_err e) ol_close_cb;
typedef enum {
OL_TCP,
OL_TCP6,
OL_UNKNOWN_HANDLE = 0,
OL_TCP_CONNECTION,
OL_TCP_SERVER,
OL_NAMED_PIPE,
OL_TTY,
OL_FILE,
OL_TTY
} ol_handle_type;
typedef void(*)(ol_handle* h, ol_buf *bufs, int bufcnt) ol_read_cb;
typedef void(*)(ol_handle* h, int read, int write, ol_err err) ol_close_cb;
typedef void(*)(ol_handle* h) ol_connect_cb;
typedef void(*)(ol_handle* h, ol_handle *peer) ol_accept_cb;
typedef void(*)(ol_handle* h) ol_write_cb;
typesef struct {
// read-only
ol_handle_type type;
// private
ol_handle_private _;
// public
ol_accept_cb accept_cb;
ol_close_cb close_cb;
void* data;
} ol_handle;
/**
* Creates a tcp handle used for both client and servers.
*/
ol_handle* ol_tcp_new(int v4, sockaddr* addr,
ol_read_cb read_cb, ol_close_cb close_cb);
typedef enum {
OL_UNKNOWN_REQ = 0,
OL_CONNECT,
OL_READ,
OL_WRITE
OL_SHUTDOWN
} ol_req_type;
/**
* Creates a new file handle. The 'read' parameter is boolean indicating if
* the file should be read from or created.
*/
ol_handle* ol_file_new(char *filename, int read, ol_read_cb read_cb,
ol_close_cb close_cb);
typedef struct {
// read-only
ol_req_type type;
ol_handle *handle;
// private
ol_req_private _;
// public
union {
ol_read_cb read_cb;
ol_write_cb write_cb;
ol_req_cb connect_cb;
ol_req_cb shutdown_cb;
};
void *data;
} ol_req;
/**
* In the case of servers, give a filename. In the case of clients
* leave filename NULL.
*/
ol_handle* ol_named_pipe_new(char *filename, ol_read_cb read_cb,
ol_close_cb close_cb);
int ol_run();
ol_handle* ol_handle_new(ol_close_cb close_cb, void* data);
/**
* Allocates a new tty handle.
*/
ol_handle* ol_tty_new(ol_tty_read_cb read_cb, ol_close_cb close_cb);
/**
* Only works with named pipes and TCP sockets.
*/
int ol_connect(ol_handle* h, sockaddr* addr, ol_buf* initial_buf,
ol_connect_cb connect_cb);
struct sockaddr oi_ip4_addr(char*, int port);
/**
* Depth of write buffer in bytes.
*/
size_t ol_buffer_size(ol_handle* h);
int ol_read_stop(ol_handle* h);
int ol_read_start(ol_handle* h);
/**
* Returns file descriptor associated with the handle. There may be only
* limited numbers of file descriptors allowed by the operating system. On
* Windows this limit is 2048 (see
* _setmaxstdio[http://msdn.microsoft.com/en-us/library/6e3b887c.aspx])
*/
int ol_get_fd(ol_handle* h);
/**
* Returns the type of the handle.
*/
ol_handle_type ol_get_type(ol_handle* h);
/**
* Send data to handle. User responsible for bufs until callback is made.
* Multiple ol_handle_write() calls may be issued before the previous ones
* complete - data will sent in the correct order.
*
* Returns zero on succuessful write and bytes_sent is filled with the
* number of bytes successfully written. If an asyncrhonous write was
* successfully initiated then OL_EAGAIN is returned.
*/
int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, ol_write_cb cb);
int ol_write2(ol_handle* h, char *base, size_t len);
int ol_write3(ol_handle* h, const char *string);
/**
* Works on both named pipes and TCP handles.
*/
int ol_listen(ol_loop* loop, ol_handle* h, int backlog, ol_accept_cb cb);
/**
* Writes EOF or sends a FIN packet.
* Further calls to ol_write() result in OI_EPIPE error. When the send
* buffer is drained and the other side also terminates their writes, the
* handle is finally closed and ol_close_cb() made. There is no need to call
* ol_close() after this.
*/
int ol_graceful_close(ol_handle* h);
/**
* Immediately closes the handle. If there is data in the send buffer
* it will not be sent.
*/
int ol_close(ol_handle* h);
/**
* Releases memory associated with handle. You MUST call this after
* ol_close_cb() is made with both 0 arguments.
*/
int ol_free(ol_handle* h);
ol_loop* ol_loop_new();
void ol_associate(ol_loop* loop, ol_handle* handle);
void ol_loop_free(ol_loop* loop);
void ol_run(ol_loop* loop);
// TCP server methods.
int ol_bind(ol_handle* handle, sockaddr* addr);
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb);
// TCP socket methods.
int ol_connect(ol_handle* handle, ol_req *req, sockaddr* addr);
int ol_read(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt);
int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt);
int ol_shutdown(ol_handle* handle, ol_req *req);
// Request handle to be closed. close_cb will be made
// synchronously during this call.
int ol_close(ol_handle* handle);
// Must be called for all handles after close_cb. Handles that arrive
// via the accept_cb must use ol_free().
int ol_free(ol_handle* handle);

91
test/echo-server.c Normal file
View File

@ -0,0 +1,91 @@
#include "../ol.h"
#include <stdio.h>
static const size_t BUFSIZE = 1024;
typedef struct {
ol_handle *handle;
ol_req req;
char read_buffer[BUFSIZE];
ol_buf buf;
} peer_t;
void after_write(ol_req* req, ol_err err);
void after_read(ol_req* req, size_t nread, ol_err err);
void try_read(peer_t* peer);
void on_close(ol_handle* peer, ol_err err);
void on_accept(ol_handle* server, ol_handle* new_client);
void after_write(ol_req* req, ol_err err) {
if (!err) {
peer_t *peer = (peer_t*) req->data;
try_read(peer);
}
}
void after_read(ol_req* req, size_t nread, ol_err err) {
if (!err) {
if (nread == 0) {
ol_close(req->handle);
} else {
peer_t *peer = (peer_t*) req->data;
peer->buf.len = nread;
peer->req.write_cb = after_write;
ol_write(peer->handle, &peer->req, &peer->buf, 1);
}
}
}
void try_read(peer_t* peer) {
peer->buf.len = BUFSIZE;
peer->req.read_cb = after_read;
ol_read(peer->handle, &peer->req, &peer->buf, 1);
}
void on_close(ol_handle* peer, ol_err err) {
if (err) {
fprintf(stdout, "Socket error: %s\n", ol_errno_string(err));
}
ol_free(peer);
}
static void on_accept(ol_handle* server, ol_handle* new_client) {
new_client->close_cb = on_close;
peer_t p = malloc(sizeof(peer_t));
p->handle = new_client;
p->buf.base = p->read_buffer;
p->buf.len = BUFSIZE;
p->req.data = p;
try_read(peer);
r = ol_write2(new_client, "Hello\n");
if (r < 0) {
// error
assert(0);
}
}
int main(int argc, char** argv) {
ol_handle* server = ol_handle_new(NULL);
struct sockaddr addr = oi_ip4_addr("0.0.0.0", 8000);
ol_bind(server, &addr);
ol_listen(server, 128, on_accept);
ol_run();
return 0;
}