diff --git a/iocp-links.html b/iocp-links.html index cacc17cd..49e97a6b 100644 --- a/iocp-links.html +++ b/iocp-links.html @@ -30,6 +30,8 @@

Asynchronous I/O in Windows for UNIX Programmers

+

Ryan Dahl ry@tinyclouds.org +

This document assumes you are familiar with how non-blocking socket I/O is done in UNIX. @@ -40,7 +42,7 @@ high-concurrency servers, they've simply choosen a different paradigm for this called overlapped I/O. The mechanism in Windows by which multiple sockets are polled -for compltion is called +for completion is called I/O completion ports. More or less equivlant to kqueue (Macintosh, @@ -58,9 +60,9 @@ For example, instead of waiting for a socket to become writable and then to it, as you do in UNIX operating systems, you rather WSASend() a buffer and wait for it to have been sent. -The result is that non-blocking write(2) read(2) +The result is that non-blocking write(2) and read(2) are non-portable to Windows. This tends to throw the poor sap assigned with -the job of porting your app to Windows into complusive nervous twitches. +the job of porting your app to Windows into compulsive nervous twitches.

Almost every socket operation that you're familar with has an @@ -100,15 +102,64 @@ overlapped counter-part (see table). WSARecv() + - socket or pipe + socket - connect(2) +

connect(2)
+ Non-blocking connect() is has difficult semantics in + UNIX. The proper way to connect to a remote host is this: call + connect(2) which will usually return EAGAIN. + Poll on the file descriptor for writablity. Then use +
int error;
+socklen_t len = sizeof(int);
+getsockopt(fd, SOL_SOCKET, SO_ERROR, &error, &len);
+ The error should be zero if the connection succeeded. + (Documented in connect(2) under EINPROGRESS + on the Linux man page.) ConnectEx() + + + pipe + +
connect(2)
+ + + ConnectNamedPipe() + + Be sure to set PIPE_NOWAIT in CreateNamedPipe() + + + + + + socket + +
accept(2)
+ + + AcceptEx() + + + + + pipe + +
accept(2)
+ + + ConnectNamedPipe() + + + + file diff --git a/ngx_queue.h b/ngx_queue.h new file mode 100644 index 00000000..9a1763d7 --- /dev/null +++ b/ngx_queue.h @@ -0,0 +1,111 @@ + +/* + * Copyright (C) Igor Sysoev + */ + + +#include +#include + + +#ifndef _NGX_QUEUE_H_INCLUDED_ +#define _NGX_QUEUE_H_INCLUDED_ + + +typedef struct ngx_queue_s ngx_queue_t; + +struct ngx_queue_s { + ngx_queue_t *prev; + ngx_queue_t *next; +}; + + +#define ngx_queue_init(q) \ + (q)->prev = q; \ + (q)->next = q + + +#define ngx_queue_empty(h) \ + (h == (h)->prev) + + +#define ngx_queue_insert_head(h, x) \ + (x)->next = (h)->next; \ + (x)->next->prev = x; \ + (x)->prev = h; \ + (h)->next = x + + +#define ngx_queue_insert_after ngx_queue_insert_head + + +#define ngx_queue_insert_tail(h, x) \ + (x)->prev = (h)->prev; \ + (x)->prev->next = x; \ + (x)->next = h; \ + (h)->prev = x + + +#define ngx_queue_head(h) \ + (h)->next + + +#define ngx_queue_last(h) \ + (h)->prev + + +#define ngx_queue_sentinel(h) \ + (h) + + +#define ngx_queue_next(q) \ + (q)->next + + +#define ngx_queue_prev(q) \ + (q)->prev + + +#if (NGX_DEBUG) + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next; \ + (x)->prev = NULL; \ + (x)->next = NULL + +#else + +#define ngx_queue_remove(x) \ + (x)->next->prev = (x)->prev; \ + (x)->prev->next = (x)->next + +#endif + + +#define ngx_queue_split(h, q, n) \ + (n)->prev = (h)->prev; \ + (n)->prev->next = n; \ + (n)->next = q; \ + (h)->prev = (q)->prev; \ + (h)->prev->next = h; \ + (q)->prev = n; + + +#define ngx_queue_add(h, n) \ + (h)->prev->next = (n)->next; \ + (n)->next->prev = (h)->prev; \ + (h)->prev = (n)->prev; \ + (h)->prev->next = h; + + +#define ngx_queue_data(q, type, link) \ + (type *) ((u_char *) q - offsetof(type, link)) + + +ngx_queue_t *ngx_queue_middle(ngx_queue_t *queue); +void ngx_queue_sort(ngx_queue_t *queue, + ngx_int_t (*cmp)(const ngx_queue_t *, const ngx_queue_t *)); + + +#endif /* _NGX_QUEUE_H_INCLUDED_ */ diff --git a/ol.h b/ol.h index 7ff403ff..31597a20 100644 --- a/ol.h +++ b/ol.h @@ -8,78 +8,102 @@ typedef ol_read_cb void(*)(ol_buf *bufs, int bufcnt); typedef ol_close_cb void(*)(int read, int write); typedef ol_connect_cb void(*)(); -typedef ol_connect_cb void(*)(); - - -typedef enum { - OL_NAMED_PIPE, - OL_TCP, - OL_TCP6 -} ol_socket_type; - - -typedef struct { - size_t len; - char* buf; -} ol_buf; - - -typedef struct { - size_t len; - void* name; -} ol_addr; /** - * Creates a new socket of given type. If bind_addr is NULL a random - * port will be bound in the case of OL_TCP and OL_TCP6. In the case - * of NAMED_PIPE, bind_addr specifies a string describing the location - * to bind to. + * Do not make assumptions about the order of the elements in this sturct. + * Always use offsetof because the order is platform dependent. Has a char* + * buf and size_t len. That's all you need to know. */ -ol_socket* ol_socket_create(ol_socket_type type, ol_buf* bind_addr, - ol_read_cb cb, ol_close_cb cb); +struct ol_buf; -int ol_socket_connect(ol_socket* socket, ol_addr addr, + +/** + * Creates a tcp h. If bind_addr is NULL a random + * port will be bound. + */ +ol_handle* ol_tcp_new(int v4, ol_read_cb read_cb, ol_close_cb close_cb); + + +ol_handle* ol_file_new(char *filename, int read, ol_read_cb cb, + ol_close_cb cb); + + +/** + * In the case of servers, give a filename. In the case of clients + * leave filename NULL. + */ +ol_handle* ol_named_pipe_new(char *filename, ol_read_cb cb, + ol_close_cb cb); + + +ol_handle* ol_tty_new(ol_tty_read_cb cb, ol_close_cb cb); + + +/** + * Only works with named pipes and TCP sockets. + */ +int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len len, ol_buf* buf, size_t* bytes_sent, ol_connect_cb ol); -int ol_socket_pause(ol_socket* socket); - - -int ol_socket_resume(ol_socket* socket); - - /** - * Get local address. addr is filled. + * Depth of write buffer in bytes. */ -int ol_socket_address(ol_socket* socket, ol_addr* addr); +size_t ol_buffer_size(ol_handle* h); + + +int ol_pause(ol_handle* h); + + +int ol_resume(ol_handle* h); /** - * Returns file descriptor. There may be only limited numbers of file - * descriptors allowed by the operating system. On Windows this limit is - * 2048 (see _setmaxstdio[http://msdn.microsoft.com/en-us/library/6e3b887c.aspx]) + * Returns file descriptor associated with the handle. There may be only + * limited numbers of file descriptors allowed by the operating system. On + * Windows this limit is 2048 (see + * _setmaxstdio[http://msdn.microsoft.com/en-us/library/6e3b887c.aspx]) */ -int ol_socket_get_fd(ol_socket* socket); +int ol_get_fd(ol_handle* h); /** - * Send data to socket. User responsible for bufs until callback is made. - * Multiple ol_socket_write() calls may be issued before the previous ones + * Send data to h. User responsible for bufs until callback is made. + * Multiple ol_handle_write() calls may be issued before the previous ones * complete - data will sent in the correct order. */ -int ol_socket_write(ol_socket* socket, ol_buf* bufs, int bufcnt, +int ol_write(ol_handle* h, ol_buf* bufs, int bufcnt, size_t* bytes_sent, ol_write_cb cb); -int ol_socket_listen(ol_socket* server, int backlog, ol_accept_cb cb); +/** + * Note: works on both named pipes and TCP handles. + */ +int ol_listen(ol_handle* h, int backlog, ol_accept_cb cb); -int ol_socket_shutdown_write(ol_socket* socket); +/** + * Writes EOF or sends a FIN packet. + */ +int ol_end(ol_handle* h); + + +int ol_close(ol_handle* h); + + +/** + * Releases memory associated with handle. You MUST call this after + * ol_close_cb() is made with both 0 arguments. + */ +int ol_free(ol_handle* h); + + +ol_loop* ol_loop_new(); +ol_loop* ol_associate(ol_handle* handle); +void ol_run(); -int ol_socket_close(ol_socket* socket); -int ol_socket_free(ol_socket* socket); diff --git a/ol_unix_ev.c b/ol_unix_ev.c new file mode 100644 index 00000000..9ddf6fe1 --- /dev/null +++ b/ol_unix_ev.c @@ -0,0 +1,90 @@ + + + +ol_loop* ol_loop_new() +{ + ol_loop* loop = malloc(sizeof(ol_loop)); + if (!loop) { + return NULL; + } + + loop.evloop = ev_loop_new(0); + if (!loop.evloop) { + return NULL; + } + + + return loop; +} + + +ol_loop* ol_associate(ol_handle* handle) +{ +} + +void ol_run(ol_loop *loop) { + ev_run(loop, 0); +} + + +ol_handle* ol_tcp_new(int v4, ol_read_cb read_cb, ol_close_cb close_cb) { + ol_handle *handle = malloc(sizeof(ol_handle)); + if (!handle) { + return NULL; + } + + handle->read_cb = read_cb; + handle->close_cb = close_cb; + + int domain = v4 ? AF_INET : AF_INET6; + handle->fd = socket(domain, SOCK_STREAM, 0); + if (fd == -1) { + free(handle); + got_error("socket", errno); + return NULL; + } + + return handle; +} + + +int try_connect(ol_handle* h) { + int r = connect(h->fd, h->connect_addr, h->connect_addrlen); + + if (r != 0) { + if (errno == EINPROGRESS) { + /* Wait for fd to become writable */ + h->connecting = 1; + ev_io_init(&h->write_watcher, handle_tcp_io, h->fd, EV_WRITE); + ev_io_start(h->loop, &h->write_watcher); + } + return got_error("connect", errno); + } + + return 0; +} + + +int ol_connect(ol_handle* h, sockaddr* addr, sockaddr_len addrlen, + ol_buf* buf, size_t* bytes_sent, ol_connect_cb cb) { + if (h->connecting) { + return got_error("connect", EALREADY); + } + + h->connecting = 1; + h->connect_addr = addr; + h->connect_addrlen = addrlen; + + if (buf) { + ol_write(h, buf, 1, bytes_sent, cb); + h->connect_cb = cb; + } + + if (0 == try_connect(h)) { + if ( + } + + + + return 0; +} diff --git a/ol_unix_ev.h b/ol_unix_ev.h new file mode 100644 index 00000000..130a1ac3 --- /dev/null +++ b/ol_unix_ev.h @@ -0,0 +1,22 @@ + + +/** + * Note can be cast to io_vec. + */ +typedef struct _ol_buf { + char* buf; + size_t len; + ngx_queue_s write_queue; +} ol_buf; + + + +typedef struct _ol_handle { + int fd; + + ol_read_cb read_cb; + ol_close_cb close_cb; + + ngx_queue_s write_queue; + ngx_queue_s all_handles; +} ol_handle; diff --git a/ol_win.h b/ol_win.h new file mode 100644 index 00000000..4da5cbb3 --- /dev/null +++ b/ol_win.h @@ -0,0 +1,12 @@ + + +/** + * Note can be cast to + * WSABUF[http://msdn.microsoft.com/en-us/library/ms741542(v=vs.85).aspx] + */ +typedef struct _ol_buf { + u_long len; + char* buf; + _ol_buf* next; + _ol_buf* prev; +} ol_buf;