echo-server on unix works
This commit is contained in:
parent
08ae03ec86
commit
7ce2cb815d
@ -96,7 +96,7 @@ struct ngx_queue_s {
|
||||
|
||||
|
||||
#define ngx_queue_data(q, type, link) \
|
||||
(type *) ((u_char *) q - offsetof(type, link))
|
||||
(type *) ((unsigned char *) q - offsetof(type, link))
|
||||
|
||||
|
||||
#endif /* _NGX_QUEUE_H_INCLUDED_ */
|
||||
|
||||
292
ol-unix.c
292
ol-unix.c
@ -1,5 +1,7 @@
|
||||
#include "ol.h"
|
||||
|
||||
#include <stdio.h> /* printf */
|
||||
|
||||
#include <stdlib.h>
|
||||
#include <errno.h>
|
||||
#include <assert.h>
|
||||
@ -14,7 +16,7 @@
|
||||
|
||||
void ol_tcp_io(EV_P_ ev_io* watcher, int revents);
|
||||
void ol_tcp_connect(ol_handle* handle, ol_req* req);
|
||||
ol_handle* ol_tcp_open(ol_handle* parent, int fd);
|
||||
int ol_tcp_open(ol_handle*, int fd);
|
||||
int ol_close_error(ol_handle* handle, ol_err err);
|
||||
|
||||
|
||||
@ -56,38 +58,38 @@ int ol_run() {
|
||||
|
||||
ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data) {
|
||||
ol_handle *handle = calloc(sizeof(ol_handle), 1);
|
||||
if (!handle) {
|
||||
ol_err_new(NULL, ENOMEM);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
handle->close_cb = close_cb;
|
||||
handle->data = data;
|
||||
|
||||
handle->_.fd = -1;
|
||||
int fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
if (fd < 0) {
|
||||
ol_err_new(handle, errno);
|
||||
free(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ngx_queue_init(&handle->_.read_reqs);
|
||||
|
||||
ev_init(&handle->_.read_watcher, ol_tcp_io);
|
||||
ev_init(&handle->_.write_watcher, ol_tcp_io);
|
||||
if (ol_tcp_open(handle, fd)) {
|
||||
close(fd);
|
||||
free(handle);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
|
||||
int ol_tcp_lazy_open(ol_handle* handle, int domain) {
|
||||
assert(handle->_.fd < 0);
|
||||
|
||||
/* Lazily allocate a file descriptor for this handle */
|
||||
int fd = socket(domain, SOCK_STREAM, 0);
|
||||
|
||||
/* Set non-blocking, etc */
|
||||
ol_tcp_init_fd(fd);
|
||||
|
||||
handle->_.fd = fd;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int ol_bind(ol_handle* handle, struct sockaddr* addr) {
|
||||
int addrsize;
|
||||
int domain;
|
||||
int r;
|
||||
|
||||
assert(handle->_.fd >= 0);
|
||||
|
||||
if (addr->sa_family == AF_INET) {
|
||||
addrsize = sizeof(struct sockaddr_in);
|
||||
@ -100,15 +102,6 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int r = 0;
|
||||
|
||||
if (handle->_.fd < 0) {
|
||||
r = ol_tcp_lazy_open(handle, domain);
|
||||
if (r) {
|
||||
return ol_err_new(handle, r);
|
||||
}
|
||||
}
|
||||
|
||||
r = bind(handle->_.fd, addr, addrsize);
|
||||
|
||||
return ol_err_new(handle, r);
|
||||
@ -126,17 +119,21 @@ int ol_tcp_init_fd(int fd) {
|
||||
}
|
||||
|
||||
|
||||
ol_handle* ol_tcp_open(ol_handle* parent, int fd) {
|
||||
ol_handle* h = ol_tcp_handle_new(NULL, NULL);
|
||||
if (!h) {
|
||||
return NULL;
|
||||
}
|
||||
h->_.fd = fd;
|
||||
|
||||
int ol_tcp_open(ol_handle* handle, int fd) {
|
||||
/* Set non-blocking, etc */
|
||||
ol_tcp_init_fd(fd);
|
||||
|
||||
return h;
|
||||
handle->_.fd = fd;
|
||||
|
||||
ngx_queue_init(&handle->_.read_reqs);
|
||||
|
||||
ev_io_init(&handle->_.read_watcher, ol_tcp_io, fd, EV_READ);
|
||||
ev_io_init(&handle->_.write_watcher, ol_tcp_io, fd, EV_WRITE);
|
||||
|
||||
handle->_.read_watcher.data = handle;
|
||||
handle->_.write_watcher.data = handle;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -164,13 +161,17 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
if (!handle->accept_cb) {
|
||||
close(fd);
|
||||
} else {
|
||||
ol_handle* new_client = ol_tcp_open(handle, fd);
|
||||
ol_handle* new_client = ol_tcp_handle_new(NULL, NULL);
|
||||
if (!new_client) {
|
||||
ol_close_error(handle, ol_err_last(handle));
|
||||
return;
|
||||
/* Ignore error for now */
|
||||
} else {
|
||||
if (ol_tcp_open(new_client, fd)) {
|
||||
/* Ignore error for now */
|
||||
} else {
|
||||
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
|
||||
handle->accept_cb(handle, new_client);
|
||||
}
|
||||
}
|
||||
|
||||
handle->accept_cb(handle, new_client);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -178,10 +179,7 @@ void ol_server_io(EV_P_ ev_io* watcher, int revents) {
|
||||
|
||||
|
||||
int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) {
|
||||
if (handle->_.fd < 0) {
|
||||
/* Lazily allocate a file descriptor for this handle */
|
||||
handle->_.fd = socket(AF_INET, SOCK_STREAM, 0);
|
||||
}
|
||||
assert(handle->_.fd >= 0);
|
||||
|
||||
int r = listen(handle->_.fd, backlog);
|
||||
if (r < 0) {
|
||||
@ -210,15 +208,103 @@ int ol_close_error(ol_handle* handle, ol_err err) {
|
||||
}
|
||||
|
||||
|
||||
ol_req* ol_read_reqs_head(ol_handle* handle) {
|
||||
ngx_queue_t* q = ngx_queue_head(&(handle->_.read_reqs));
|
||||
if (!q) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ol_req_private* p = ngx_queue_data(q, ol_req_private, read_reqs);
|
||||
assert(p);
|
||||
int off = offsetof(ol_req, _);
|
||||
ol_req* req = (ol_req*) ((char*)p - off);
|
||||
|
||||
return req;
|
||||
}
|
||||
|
||||
|
||||
int ol_read_reqs_empty(ol_handle* handle) {
|
||||
return ngx_queue_empty(&(handle->_.read_reqs));
|
||||
}
|
||||
|
||||
|
||||
void ol__read(ol_handle* handle) {
|
||||
assert(handle->_.fd >= 0);
|
||||
|
||||
/* Get the request at the head of the read_reqs queue. */
|
||||
ol_req* req = ol_read_reqs_head(handle);
|
||||
if (!req) {
|
||||
ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher));
|
||||
return;
|
||||
}
|
||||
|
||||
/* Cast to iovec. We had to have our own ol_buf instead of iovec
|
||||
* because Windows's WSABUF is not an iovec.
|
||||
*/
|
||||
struct iovec* iov = (struct iovec*) req->_.read_bufs;
|
||||
int iovcnt = req->_.read_bufcnt;
|
||||
|
||||
assert(iov);
|
||||
assert(iovcnt > 0);
|
||||
|
||||
/* Now do the actual read. */
|
||||
|
||||
ssize_t nread = readv(handle->_.fd, iov, iovcnt);
|
||||
|
||||
ol_read_cb cb = req->cb;
|
||||
|
||||
if (nread < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
/* Just wait for the next one. */
|
||||
assert(ev_is_active(&(handle->_.read_watcher)));
|
||||
} else {
|
||||
ol_err err = ol_err_new(handle, errno);
|
||||
if (cb) {
|
||||
cb(req, 0, err);
|
||||
}
|
||||
ol_close_error(handle, errno);
|
||||
}
|
||||
} else {
|
||||
/* Successful read */
|
||||
|
||||
/* First pop the req off handle->_.read_reqs */
|
||||
ngx_queue_remove(&(req->_.read_reqs));
|
||||
|
||||
/* Must free req if local. Also must free req->_.read_bufs. */
|
||||
free(req->_.read_bufs);
|
||||
req->_.read_bufs = NULL;
|
||||
if (req->_.local) {
|
||||
free(req);
|
||||
}
|
||||
|
||||
/* NOTE: call callback AFTER freeing the request data. */
|
||||
if (cb) {
|
||||
cb(req, nread, 0);
|
||||
}
|
||||
|
||||
if (ol_read_reqs_empty(handle)) {
|
||||
ev_io_stop(EV_DEFAULT_ &(handle->_.read_watcher));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void ol_tcp_io(EV_P_ ev_io* watcher, int revents) {
|
||||
ol_handle* handle = watcher->data;
|
||||
|
||||
assert(handle->_.fd >= 0);
|
||||
|
||||
if (handle->_.connect_req) {
|
||||
ol_tcp_connect(handle, handle->_.connect_req);
|
||||
} else {
|
||||
}
|
||||
if (revents & EV_READ) {
|
||||
ol__read(handle);
|
||||
}
|
||||
|
||||
assert(handle->_.fd >= 0);
|
||||
if (revents & EV_WRITE) {
|
||||
assert(0 && "Queued writes are not supported");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -275,19 +361,20 @@ void ol_tcp_connect(ol_handle* handle, ol_req* req) {
|
||||
|
||||
ol_req* ol_req_maybe_alloc(ol_handle* handle, ol_req* in_req) {
|
||||
if (in_req) {
|
||||
ngx_queue_init(&(in_req->_.read_reqs));
|
||||
in_req->handle = handle;
|
||||
in_req->_.local = 0;
|
||||
return in_req;
|
||||
} else {
|
||||
ol_req *req = calloc(sizeof(ol_req), 1);
|
||||
req->handle = handle;
|
||||
ngx_queue_init(&(req->_.read_reqs));
|
||||
req->_.local = 1;
|
||||
return req;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) {
|
||||
if (handle->_.connect_req) {
|
||||
return ol_err_new(handle, EALREADY);
|
||||
@ -332,7 +419,9 @@ int ol_connect(ol_handle* handle, ol_req *req_in, struct sockaddr* addr) {
|
||||
return ol_err_new(handle, r);
|
||||
}
|
||||
|
||||
|
||||
int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
|
||||
assert(handle->_.fd >= 0);
|
||||
ssize_t r;
|
||||
|
||||
r = writev(handle->_.fd, (struct iovec*)bufs, bufcnt);
|
||||
@ -340,6 +429,10 @@ int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) {
|
||||
if (r < 0) {
|
||||
return ol_err_new(handle, r);
|
||||
} else {
|
||||
if (req && req->cb) {
|
||||
ol_write_cb cb = req->cb;
|
||||
cb(req, 0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@ -354,57 +447,68 @@ int ol_write2(ol_handle* handle, const char* msg) {
|
||||
}
|
||||
|
||||
|
||||
void ol_req_append(ol_handle* handle, ol_req *req) {
|
||||
ngx_queue_insert_tail(&handle->_.read_reqs, &req->_.read_reqs);
|
||||
}
|
||||
|
||||
|
||||
int ol_read(ol_handle* handle, ol_req *req_in, ol_buf* bufs, int bufcnt) {
|
||||
ssize_t nread = -1;
|
||||
errno = EAGAIN;
|
||||
ol_read_cb cb = req_in->cb;
|
||||
|
||||
assert(handle->_.fd >= 0);
|
||||
|
||||
if (!ngx_queue_empty(&handle->_.read_reqs)) {
|
||||
/* There are already pending read_reqs. We must get in line. */
|
||||
assert(ev_is_active(&handle->_.read_watcher));
|
||||
|
||||
ol_req* req = ol_req_maybe_alloc(handle, req_in);
|
||||
if (!req) {
|
||||
return ol_err_new(handle, ENOMEM);
|
||||
}
|
||||
|
||||
ol_req_append(handle, req);
|
||||
|
||||
return ol_err_new(handle, EINPROGRESS);
|
||||
|
||||
} else {
|
||||
/* Attempt to read immediately */
|
||||
if (ngx_queue_empty(&handle->_.read_reqs)) {
|
||||
/* Attempt to read immediately. */
|
||||
ssize_t nread = readv(handle->_.fd, (struct iovec*) bufs, bufcnt);
|
||||
|
||||
ol_read_cb cb = req_in->cb;
|
||||
|
||||
if (nread < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
|
||||
return 0;
|
||||
} else {
|
||||
ol_err err = ol_err_new(handle, errno);
|
||||
|
||||
if (cb) {
|
||||
cb(req_in, nread, err);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
} else {
|
||||
if (cb) {
|
||||
cb(req_in, nread, 0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
assert(0 && "Unreachable");
|
||||
return 0;
|
||||
if (nread < 0 && errno != EAGAIN) {
|
||||
/* Real error. */
|
||||
ol_err err = ol_err_new(handle, errno);
|
||||
|
||||
if (cb) {
|
||||
cb(req_in, nread, err);
|
||||
}
|
||||
|
||||
return err;
|
||||
}
|
||||
|
||||
if (nread >= 0) {
|
||||
/* Successful read. */
|
||||
if (cb) {
|
||||
cb(req_in, nread, 0);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Either we never read anything, or we got EAGAIN. */
|
||||
assert(!ngx_queue_empty(&handle->_.read_reqs) ||
|
||||
(nread < 0 && errno == EAGAIN));
|
||||
|
||||
/* Two possible states:
|
||||
* - EAGAIN, meaning the socket is not wriable currently. We must wait for
|
||||
* it to become readable with the handle->_.read_watcher.
|
||||
* - The read_reqs queue already has reads. Meaning: the user has issued
|
||||
* many ol_reads calls some of which are still waiting for the socket to
|
||||
* become readable.
|
||||
* In the meantime we append the request to handle->_.read_reqs
|
||||
*/
|
||||
ol_req* req = ol_req_maybe_alloc(handle, req_in);
|
||||
if (!req) {
|
||||
return ol_err_new(handle, ENOMEM);
|
||||
}
|
||||
|
||||
/* Copy the bufs data over into our ol_req struct. This is so the user can
|
||||
* free the ol_buf array. The actual data inside the ol_bufs is however
|
||||
* owned by the user and cannot be deallocated until the read completes.
|
||||
*/
|
||||
req->_.read_bufs = malloc(sizeof(ol_buf) * bufcnt);
|
||||
memcpy(req->_.read_bufs, bufs, bufcnt * sizeof(ol_buf));
|
||||
req->_.read_bufcnt = bufcnt;
|
||||
|
||||
/* Append the request to read_reqs. */
|
||||
ngx_queue_insert_tail(&(handle->_.read_reqs), &(req->_.read_reqs));
|
||||
|
||||
ev_io_start(EV_DEFAULT_ &handle->_.read_watcher);
|
||||
|
||||
return ol_err_new(handle, EINPROGRESS);
|
||||
}
|
||||
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user