From 0d85eb252bb9c068fe2c5a46aa18273574aee2f6 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Thu, 7 Apr 2011 04:51:59 +0200 Subject: [PATCH] API changes - unix broken --- ol-win.c | 325 +++++++++++++++++++------------------ ol-win.h | 19 ++- ol.h | 92 +++++------ test/echo-server.c | 68 ++++---- test/test-callback-stack.c | 11 +- test/test-ping-pong.c | 129 ++++++++------- test/test.h | 1 + 7 files changed, 336 insertions(+), 309 deletions(-) diff --git a/ol-win.c b/ol-win.c index a45c95a1..95492047 100644 --- a/ol-win.c +++ b/ol-win.c @@ -82,14 +82,6 @@ /* The request is currently queued. */ #define OL_REQ_PENDING 0x01 -/* When STRAY is set, that means that the handle owning the ol_req */ -/* struct was destroyed while the old_req was queued to an iocp */ -#define OL_REQ_STRAY 0x02 - -/* When INTERNAL is set that means that the ol_req struct was */ -/* allocated by libol, so libol also needs to free it again */ -#define OL_REQ_INTERNAL 0x04 - /* * Pointers to winsock extension functions that have to be retrieved dynamically */ @@ -212,39 +204,41 @@ void ol_init() { } -void ol_req_init(ol_req *req, void *cb) { - req->_.flags = 0; +void ol_req_init(ol_req* req, ol_handle* handle, void *cb) { + req->type = OL_UNKNOWN_REQ; + req->flags = 0; + req->handle = handle; req->cb = cb; } ol_req* ol_overlapped_to_req(OVERLAPPED* overlapped) { - return CONTAINING_RECORD(overlapped, ol_req, _.overlapped); + return CONTAINING_RECORD(overlapped, ol_req, overlapped); } -int ol_set_socket_options(ol_handle *handle) { +int ol_set_socket_options(SOCKET socket) { DWORD yes = 1; /* Set the SO_REUSEADDR option on the socket */ /* If it fails, soit. */ - setsockopt(handle->_.socket, + setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof(int)); /* Make the socket non-inheritable */ - if (!SetHandleInformation(handle->_.handle, HANDLE_FLAG_INHERIT, 0)) { + if (!SetHandleInformation((HANDLE)socket, HANDLE_FLAG_INHERIT, 0)) { ol_errno_ = GetLastError(); return -1; } /* Associate it with the I/O completion port. */ /* Use ol_handle pointer as completion key. */ - if (CreateIoCompletionPort(handle->_.handle, + if (CreateIoCompletionPort((HANDLE)socket, ol_iocp_, - (ULONG_PTR)handle, + (ULONG_PTR)socket, 0) == NULL) { ol_errno_ = GetLastError(); return -1; @@ -254,63 +248,82 @@ int ol_set_socket_options(ol_handle *handle) { } -ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data) { - ol_handle* handle; - - handle = (ol_handle*)malloc(sizeof(ol_handle)); +int ol_tcp_handle_init(ol_handle *handle, ol_close_cb close_cb, void* data) { handle->close_cb = close_cb; handle->data = data; handle->type = OL_TCP; - handle->_.flags = 0; - handle->_.reqs_pending = 0; - handle->_.error = 0; + handle->flags = 0; + handle->reqs_pending = 0; + handle->error = 0; + handle->accept_data = NULL; - handle->_.socket = socket(AF_INET, SOCK_STREAM, 0); - if (handle->_.socket == INVALID_SOCKET) { + handle->socket = socket(AF_INET, SOCK_STREAM, 0); + if (handle->socket == INVALID_SOCKET) { ol_errno_ = WSAGetLastError(); - free(handle); - return NULL; + return -1; } - if (ol_set_socket_options(handle) != 0) { - closesocket(handle->_.socket); - free(handle); - return NULL; + if (ol_set_socket_options(handle->socket) != 0) { + closesocket(handle->socket); + return -1; } ol_refs_++; - return handle; + return 0; +} + + +int ol_tcp_handle_accept(ol_handle* server, ol_handle* client, ol_close_cb close_cb, void* data) { + if (!server->accept_data || + server->accept_data->socket == INVALID_SOCKET) { + ol_errno_ = WSAENOTCONN; + return -1; + } + + client->close_cb = close_cb; + client->data = data; + client->type = OL_TCP; + client->socket = server->accept_data->socket; + client->flags = 0; + client->reqs_pending = 0; + client->error = 0; + client->accept_data = NULL; + + server->accept_data->socket = INVALID_SOCKET; + ol_refs_++; + + return 0; } int ol_close_error(ol_handle* handle, ol_err e) { ol_req *req; - if (handle->_.flags & OL_HANDLE_CLOSING) + if (handle->flags & OL_HANDLE_CLOSING) return 0; - handle->_.error = e; + handle->error = e; switch (handle->type) { case OL_TCP: - closesocket(handle->_.socket); - if (handle->_.reqs_pending == 0) { + closesocket(handle->socket); + if (handle->reqs_pending == 0) { /* If there are no operations queued for this socket, queue one */ /* manually, so ol_poll will call close_cb. */ req = (ol_req*)malloc(sizeof(*req)); req->handle = handle; req->type = OL_CLOSE; - req->_.flags = 0; - if (!PostQueuedCompletionStatus(ol_iocp_, 0, (ULONG_PTR)handle, &req->_.overlapped)) + req->flags = 0; + if (!PostQueuedCompletionStatus(ol_iocp_, 0, (ULONG_PTR)handle, &req->overlapped)) ol_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); - req->_.flags |= OL_REQ_PENDING; - handle->_.reqs_pending++; + req->flags |= OL_REQ_PENDING; + handle->reqs_pending++; } /* After all packets to come out, ol_poll will call close_cb. */ - handle->_.flags |= OL_HANDLE_CLOSING; + handle->flags |= OL_HANDLE_CLOSING; return 0; default: @@ -326,12 +339,6 @@ int ol_close(ol_handle* handle) { } -void ol_free(ol_handle* handle) { - free(handle); - ol_refs_--; -} - - struct sockaddr_in ol_ip4_addr(char *ip, int port) { struct sockaddr_in addr; @@ -355,7 +362,7 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) { return -1; } - if (bind(handle->_.socket, addr, addrsize) == SOCKET_ERROR) { + if (bind(handle->socket, addr, addrsize) == SOCKET_ERROR) { ol_errno_ = WSAGetLastError(); return -1; } @@ -364,77 +371,93 @@ int ol_bind(ol_handle* handle, struct sockaddr* addr) { } -void ol_queue_accept(ol_handle *handle, ol_req *req) { - ol_handle* peer; - void *buffer; +void ol_queue_accept(ol_handle *handle) { + ol_accept_data* data; BOOL success; DWORD bytes; - peer = ol_tcp_handle_new(NULL, NULL); - if (peer == NULL) { - /* destroy ourselves */ + data = handle->accept_data; + assert(data != NULL); + + data->socket = socket(AF_INET, SOCK_STREAM, 0); + if (data->socket == INVALID_SOCKET) { + ol_close_error(handle, WSAGetLastError()); + return; + } + + if (ol_set_socket_options(data->socket) != 0) { + closesocket(data->socket); ol_close_error(handle, ol_errno_); return; } - /* AcceptEx specifies that the buffer must be big enough to at least hold */ - /* two socket addresses plus 32 bytes. */ - buffer = malloc(sizeof(struct sockaddr_storage) * 2 + 32); - /* Prepare the ol_req and OVERLAPPED structures. */ - assert(!(req->_.flags & OL_REQ_PENDING)); - req->_.flags |= OL_REQ_PENDING; - req->data = (void*)peer; - memset(&req->_.overlapped, 0, sizeof(req->_.overlapped)); + assert(!(data->req.flags & OL_REQ_PENDING)); + data->req.flags |= OL_REQ_PENDING; + memset(&data->req.overlapped, 0, sizeof(data->req.overlapped)); - success = pAcceptEx(handle->_.socket, - peer->_.socket, - buffer, + success = pAcceptEx(handle->socket, + data->socket, + (void*)&data->buffer, 0, sizeof(struct sockaddr_storage), sizeof(struct sockaddr_storage), &bytes, - &req->_.overlapped); + &data->req.overlapped); if (!success && WSAGetLastError() != ERROR_IO_PENDING) { ol_errno_ = WSAGetLastError(); /* destroy the preallocated client handle */ - ol_close(peer); - ol_free(peer); + closesocket(data->socket); /* destroy ourselves */ ol_close_error(handle, ol_errno_); return; } - handle->_.reqs_pending++; - req->_.flags |= OL_REQ_PENDING; + handle->reqs_pending++; + data->req.flags |= OL_REQ_PENDING; } int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb) { - ol_req* req; + ol_accept_data *data; - if (listen(handle->_.socket, backlog) == SOCKET_ERROR) + if (handle->accept_data != NULL) { + /* Already listening. */ + ol_errno_ = WSAEALREADY; return -1; + } - handle->accept_cb = cb; - req = (ol_req*)malloc(sizeof(*req)); - req->type = OL_ACCEPT; - req->handle = handle; - req->_.flags = OL_REQ_INTERNAL; + data = (ol_accept_data*)malloc(sizeof(*data)); + if (!data) { + ol_errno_ = WSAENOBUFS; + return -1; + } + data->socket = INVALID_SOCKET; + ol_req_init(&data->req, handle, (void*)cb); + data->req.type = OL_ACCEPT; - ol_queue_accept(handle, req); + if (listen(handle->socket, backlog) == SOCKET_ERROR) { + ol_errno_ = WSAGetLastError(); + free(data); + return -1; + } + + handle->accept_data = data; + + ol_queue_accept(handle); return 0; } -int ol_connect(ol_handle* handle, ol_req *req, struct sockaddr* addr) { +int ol_connect(ol_req* req, struct sockaddr* addr) { int addrsize; BOOL success; DWORD bytes; + ol_handle* handle = req->handle; - assert(!(req->_.flags & OL_REQ_PENDING)); + assert(!(req->flags & OL_REQ_PENDING)); if (addr->sa_family == AF_INET) { addrsize = sizeof(struct sockaddr_in); @@ -445,101 +468,96 @@ int ol_connect(ol_handle* handle, ol_req *req, struct sockaddr* addr) { return -1; } - memset(&req->_.overlapped, 0, sizeof(req->_.overlapped)); - req->handle = handle; + memset(&req->overlapped, 0, sizeof(req->overlapped)); req->type = OL_CONNECT; - success = pConnectEx(handle->_.socket, + success = pConnectEx(handle->socket, addr, addrsize, NULL, 0, &bytes, - &req->_.overlapped); + &req->overlapped); if (!success && WSAGetLastError() != ERROR_IO_PENDING) { ol_errno_ = WSAGetLastError(); return -1; } - req->_.flags |= OL_REQ_PENDING; - handle->_.reqs_pending++; + req->flags |= OL_REQ_PENDING; + handle->reqs_pending++; return 0; } -int ol_write(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { +int ol_write(ol_req *req, ol_buf* bufs, int bufcnt) { int result; DWORD bytes; + ol_handle* handle = req->handle; - assert(!(req->_.flags & OL_REQ_PENDING)); + assert(!(req->flags & OL_REQ_PENDING)); - memset(&req->_.overlapped, 0, sizeof(req->_.overlapped)); - req->handle = handle; + memset(&req->overlapped, 0, sizeof(req->overlapped)); req->type = OL_WRITE; - result = WSASend(handle->_.socket, + result = WSASend(handle->socket, (WSABUF*)bufs, bufcnt, &bytes, 0, - &req->_.overlapped, + &req->overlapped, NULL); if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) { ol_errno_ = WSAGetLastError(); return -1; } - req->_.flags |= OL_REQ_PENDING; - handle->_.reqs_pending++; + req->flags |= OL_REQ_PENDING; + handle->reqs_pending++; return 0; } -int ol_read(ol_handle* handle, ol_req *req, ol_buf* bufs, int bufcnt) { +int ol_read(ol_req *req, ol_buf* bufs, int bufcnt) { int result; DWORD bytes, flags; + ol_handle* handle = req->handle; - assert(!(req->_.flags & OL_REQ_PENDING)); + assert(!(req->flags & OL_REQ_PENDING)); - memset(&req->_.overlapped, 0, sizeof(req->_.overlapped)); - req->handle = handle; + memset(&req->overlapped, 0, sizeof(req->overlapped)); req->type = OL_READ; flags = 0; - result = WSARecv(handle->_.socket, + result = WSARecv(handle->socket, (WSABUF*)bufs, bufcnt, &bytes, &flags, - &req->_.overlapped, + &req->overlapped, NULL); if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) { ol_errno_ = WSAGetLastError(); return -1; } - req->_.flags |= OL_REQ_PENDING; - handle->_.reqs_pending++; + req->flags |= OL_REQ_PENDING; + handle->reqs_pending++; return 0; } -int ol_write2(ol_handle* handle, const char* msg) { - ol_req *req; +int ol_write2(ol_req *req, const char* msg) { ol_buf buf; - - req = (ol_req*)malloc(sizeof(*req)); - req->_.flags = OL_REQ_INTERNAL; - req->cb = NULL; + ol_handle* handle = req->handle; buf.base = (char*)msg; buf.len = strlen(msg); - return ol_write(handle, req, &buf, 1); + return ol_write(req, &buf, 1); } @@ -555,8 +573,7 @@ void ol_poll() { OVERLAPPED* overlapped; ol_req* req; ol_handle* handle; - ol_handle *peer; - int free_req; + ol_accept_data *data; success = GetQueuedCompletionStatus(ol_iocp_, &bytes, @@ -571,89 +588,81 @@ void ol_poll() { handle = req->handle; /* Mark the request non-pending */ - req->_.flags &= ~OL_REQ_PENDING; - handle->_.reqs_pending--; - - /* Cache this value, because when the req is not internal the callback */ - /* might free the req structure, so we cannot look at the flags field */ - /* after the callback has been called. */ - free_req = req->_.flags & OL_REQ_INTERNAL; + req->flags &= ~OL_REQ_PENDING; + handle->reqs_pending--; /* If the related socket got closed in the meantime, disregard this */ - /* result. If it is an internal request, free it. If this is the last */ - /* request pending, close the handle's close callback. */ - if (handle->_.flags & OL_HANDLE_CLOSING) { - if (req->type == OL_ACCEPT) { - peer = (ol_handle*)req->data; - ol_close(peer); - ol_free(peer); - } - if (free_req) { - free(req); - } - if (handle->_.reqs_pending == 0) { - handle->_.flags |= OL_HANDLE_CLOSED; - if (handle->close_cb) - handle->close_cb(handle, handle->_.error); + /* result. If this is the last request pending, call the handle's close callback. */ + if (handle->flags & OL_HANDLE_CLOSING) { + if (handle->reqs_pending == 0) { + handle->flags |= OL_HANDLE_CLOSED; + if (handle->accept_data) { + if (handle->accept_data) { + if (handle->accept_data->socket) { + closesocket(handle->accept_data->socket); + } + free(handle->accept_data); + handle->accept_data = NULL; + } + } + if (handle->close_cb) { + handle->close_cb(handle, handle->error); + } + ol_refs_--; } return; } switch (req->type) { case OL_WRITE: - success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE); + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); if (!success) { ol_close_error(handle, GetLastError()); } else if (req->cb) { ((ol_write_cb)req->cb)(req); } - if (free_req) { - free(req); - } return; case OL_READ: - handle = (ol_handle*)key; - success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE); + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); if (!success) { - ((ol_close_cb)req->cb)(handle, GetLastError()); + ol_close_error(handle, GetLastError()); } else if (req->cb) { ((ol_read_cb)req->cb)(req, bytes); } - if (free_req) { - free(req); - } break; case OL_ACCEPT: - peer = (ol_handle*)req->data; - handle = (ol_handle*)key; - success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE); - if (success && handle->accept_cb) { - handle->accept_cb(handle, peer); - } else { - /* Ignore failed accept if the listen socket is still healthy */ - ol_close(peer); - ol_free(peer); + data = handle->accept_data; + assert(data != NULL); + assert(data->socket != INVALID_SOCKET); + + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); + if (success && req->cb) { + ((ol_accept_cb)req->cb)(handle); + } + + /* accept_cb should call ol_accept_handle which sets data->socket */ + /* to INVALID_SOCKET. */ + /* Just ignore failed accept if the listen socket is still healthy. */ + if (data->socket != INVALID_SOCKET) { + closesocket(handle->socket); + data->socket = INVALID_SOCKET; } /* Queue another accept */ - ol_queue_accept(handle, req); + ol_queue_accept(handle); return; case OL_CONNECT: if (req->cb) { - handle = (ol_handle*)key; - success = GetOverlappedResult(handle->_.handle, overlapped, &bytes, FALSE); + success = GetOverlappedResult(handle->handle, overlapped, &bytes, FALSE); if (success) { ((ol_connect_cb)req->cb)(req, 0); } else { ((ol_connect_cb)req->cb)(req, GetLastError()); } } - if (free_req) { - free(req); - } return; case OL_CLOSE: diff --git a/ol-win.h b/ol-win.h index 138516f5..03000b74 100644 --- a/ol-win.h +++ b/ol-win.h @@ -14,18 +14,29 @@ typedef struct _ol_buf { char* base; } ol_buf; - -typedef struct { +struct ol_req_s { + struct ol_req_shared_s; OVERLAPPED overlapped; int flags; -} ol_req_private; +}; typedef struct { + ol_req req; + SOCKET socket; + + /* AcceptEx specifies that the buffer must be big enough to at least hold */ + /* two socket addresses plus 32 bytes. */ + char buffer[sizeof(struct sockaddr_storage) * 2 + 32]; +} ol_accept_data; + +struct ol_handle_s { + struct ol_handle_shared_s; union { SOCKET socket; HANDLE handle; }; + ol_accept_data *accept_data; unsigned int flags; unsigned int reqs_pending; ol_err error; -} ol_handle_private; \ No newline at end of file +}; diff --git a/ol.h b/ol.h index cc959f6a..73535509 100644 --- a/ol.h +++ b/ol.h @@ -4,7 +4,6 @@ #include /* size_t */ - typedef int ol_err; /* FIXME */ typedef struct ol_req_s ol_req; @@ -12,19 +11,12 @@ typedef struct ol_handle_s ol_handle; typedef void (*ol_read_cb)(ol_req* req, size_t nread); typedef void (*ol_write_cb)(ol_req* req); -typedef void (*ol_accept_cb)(ol_handle* server, ol_handle* new_client); +typedef void (*ol_accept_cb)(ol_handle* handle); typedef void (*ol_close_cb)(ol_handle* handle, ol_err e); typedef void (*ol_connect_cb)(ol_req* req, ol_err e); typedef void (*ol_shutdown_cb)(ol_req* req); -#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) -# include "ol-unix.h" -#else -# include "ol-win.h" -#endif - - typedef enum { OL_UNKNOWN_HANDLE = 0, OL_TCP, @@ -33,19 +25,6 @@ typedef enum { OL_FILE, } ol_handle_type; - -struct ol_handle_s { - /* read-only */ - ol_handle_type type; - /* private */ - ol_handle_private _; - /* public */ - ol_accept_cb accept_cb; - ol_close_cb close_cb; - void* data; -}; - - typedef enum { OL_UNKNOWN_REQ = 0, OL_CONNECT, @@ -57,18 +36,31 @@ typedef enum { } ol_req_type; -struct ol_req_s { +struct ol_handle_shared_s { + /* read-only */ + ol_handle_type type; + /* public */ + ol_close_cb close_cb; + void* data; +}; + +struct ol_req_shared_s { /* read-only */ ol_req_type type; - ol_handle* handle; - /* private */ - ol_req_private _; /* public */ + ol_handle* handle; void* cb; - void *data; + void* data; }; +#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) +# include "ol-unix.h" +#else +# include "ol-win.h" +#endif + + /** * Most functions return boolean: 0 for success and -1 for failure. * On error the user should then call ol_last_error() to determine @@ -81,36 +73,36 @@ const char* ol_err_str(ol_err err); void ol_init(); int ol_run(); -void ol_req_init(ol_req* req, void* cb); +void ol_req_init(ol_req* req, ol_handle* handle, void* cb); -ol_handle* ol_tcp_handle_new(ol_close_cb close_cb, void* data); -/* TODO: - * ol_named_pipe_handle_new - * ol_file_handle_new - * ol_tty_handle_new +/* + * TODO: + * - ol_(pipe|pipe_tty)_handle_init + * - ol_bind_pipe(char *name) + * - ol_continuous_read(ol_handle *handle, ol_continuous_read_cb *cb) + * - A way to list cancelled ol_reqs after before/on ol_close_cb */ -/* TCP server methods. */ -int ol_bind(ol_handle* handle, struct sockaddr* addr); -int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb); - /* TCP socket methods. */ -int ol_connect(ol_handle* handle, ol_req* req, struct sockaddr* addr); -int ol_read(ol_handle* handle, ol_req* req, ol_buf* bufs, int bufcnt); -int ol_write(ol_handle* handle, ol_req* req, ol_buf* bufs, int bufcnt); -int ol_write2(ol_handle* handle, const char* msg); -int ol_shutdown(ol_handle* handle, ol_req* req); +/* Handle and callback bust be set by calling ol_req_init. */ +int ol_tcp_handle_init(ol_handle *handle, ol_close_cb close_cb, void* data); +int ol_bind(ol_handle* handle, struct sockaddr* addr); +int ol_connect(ol_req* req, struct sockaddr* addr); +int ol_shutdown(ol_req* req); -/* Request handle to be closed. close_cb will be made */ -/* synchronously during this call. */ +/* TCP server methods. */ +int ol_listen(ol_handle* handle, int backlog, ol_accept_cb cb); +int ol_tcp_handle_accept(ol_handle* server, ol_handle* client, ol_close_cb close_cb, void* data); + +/* Generic handle methods */ +int ol_read(ol_req* req, ol_buf* bufs, int bufcnt); +int ol_write(ol_req* req, ol_buf* bufs, int bufcnt); +int ol_write2(ol_req *req, const char* msg); + +/* Request handle to be closed. close_cb will be called */ +/* asynchronously after this call. */ int ol_close(ol_handle* handle); -/* Must be called for all handles after close_cb. Handles that arrive - * via the accept_cb must use ol_free(). - */ -void ol_free(ol_handle* handle); - - /* Utility */ struct sockaddr_in ol_ip4_addr(char* ip, int port); diff --git a/test/echo-server.c b/test/echo-server.c index 6e72ad82..f2fcaa02 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -3,50 +3,55 @@ #include #include - #define BUFSIZE 1024 - typedef struct { - ol_handle* handle; + ol_handle handle; ol_req req; ol_buf buf; char read_buffer[BUFSIZE]; } peer_t; +ol_handle server; void after_write(ol_req* req); void after_read(ol_req* req, size_t nread); void try_read(peer_t* peer); void on_close(ol_handle* peer, ol_err err); -void on_accept(ol_handle* server, ol_handle* new_client); - - -ol_handle *server = NULL; +void on_accept(ol_handle* handle); void after_write(ol_req* req) { - peer_t *peer = (peer_t*) req->data; + peer_t* peer = (peer_t*) req->data; try_read(peer); } void after_read(ol_req* req, size_t nread) { + peer_t* peer; + int r; + if (nread == 0) { ol_close(req->handle); } else { - peer_t *peer = (peer_t*) req->data; + peer = (peer_t*) req->data; peer->buf.len = nread; - peer->req.cb = after_write; - ol_write(peer->handle, &peer->req, &peer->buf, 1); + ol_req_init(&peer->req, &peer->handle, after_write); + peer->req.data = peer; + r = ol_write(&peer->req, &peer->buf, 1); + assert(!r); } } void try_read(peer_t* peer) { + int r; + peer->buf.len = BUFSIZE; - peer->req.cb = after_read; - ol_read(peer->handle, &peer->req, &peer->buf, 1); + ol_req_init(&peer->req, &peer->handle, after_read); + peer->req.data = peer; + r = ol_read(&peer->req, &peer->buf, 1); + assert(!r); } @@ -54,36 +59,28 @@ void on_close(ol_handle* peer, ol_err err) { if (err) { fprintf(stdout, "Socket error\n"); } - - ol_free(peer); } -void on_accept(ol_handle* server, ol_handle* new_client) { - peer_t* p; +void on_accept(ol_handle* server) { + peer_t* p = (peer_t*)malloc(sizeof(peer_t)); + int r; - new_client->close_cb = on_close; + r = ol_tcp_handle_accept(server, &p->handle, on_close, (void*)p); + assert(!r); - p = (peer_t*)malloc(sizeof(peer_t)); - p->handle = new_client; - p->buf.base = p->read_buffer; - p->buf.len = BUFSIZE; - p->req.data = p; - ol_req_init(&p->req, NULL); + p->buf.base = (char*)&p->read_buffer; try_read(p); } void on_server_close(ol_handle* handle, ol_err err) { - assert(handle == server); + assert(handle == &server); if (err) { fprintf(stdout, "Socket error\n"); } - - ol_free(server); - server = NULL; } @@ -91,17 +88,21 @@ int echo_start(int port) { struct sockaddr_in addr = ol_ip4_addr("0.0.0.0", port); int r; - assert(server == NULL); - server = ol_tcp_handle_new(&on_server_close, NULL); + r = ol_tcp_handle_init(&server, on_server_close, NULL); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Socket creation error\n"); + return 1; + } - r = ol_bind(server, (struct sockaddr*) &addr); + r = ol_bind(&server, (struct sockaddr*) &addr); if (r) { /* TODO: Error codes */ fprintf(stderr, "Bind error\n"); return 1; } - r = ol_listen(server, 128, on_accept); + r = ol_listen(&server, 128, on_accept); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error\n"); @@ -113,8 +114,7 @@ int echo_start(int port) { int echo_stop() { - assert(server != NULL); - return ol_close(server); + return ol_close(&server); } diff --git a/test/test-callback-stack.c b/test/test-callback-stack.c index 9e89b96c..1ab1e607 100644 --- a/test/test-callback-stack.c +++ b/test/test-callback-stack.c @@ -10,18 +10,21 @@ void close_cb(ol_handle *handle, ol_err e) { assert("ol_close error" && e == 0); assert("ol_close_cb not called from a fresh stack" && nested == 0); close_cb_called++; - ol_free(handle); } TEST_IMPL(close_cb_stack) { - ol_handle *handle; + ol_handle handle; + int r; ol_init(); - handle = ol_tcp_handle_new(&close_cb, NULL); + + r = ol_tcp_handle_init(&handle, &close_cb, NULL); + assert(!r); nested++; - ol_close(handle); + r = ol_close(&handle); + assert(!r); nested--; ol_run(); diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index 9f25d555..097db875 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -5,121 +5,132 @@ #include static int completed_pingers = 0; -static ol_req connect_req; -#define NUM_PINGS 50 +#define NUM_PINGS 1000 /* 64 bytes is enough for a pinger */ -#define BUFSIZE 64 +#define BUFSIZE 10240 + +static char PING[] = "PING\n"; -static char* PING = "PING\n"; typedef struct { int pongs; int state; - ol_handle* handle; - ol_req req; + ol_handle handle; + ol_req connect_req; + ol_req read_req; ol_buf buf; char read_buffer[BUFSIZE]; -} pinger; +} pinger_t; + +void pinger_try_read(pinger_t* pinger); -void pinger_try_read(pinger* pinger); void pinger_on_close(ol_handle* handle, ol_err err) { - pinger* p; + pinger_t* pinger = (pinger_t*)handle->data; assert(!err); - p = (pinger*)handle->data; - assert(NUM_PINGS == p->pongs); - free(p); - ol_free(handle); + assert(NUM_PINGS == pinger->pongs); + + free(pinger); + completed_pingers++; } + +void pinger_after_write(ol_req *req) { + free(req); +} + + +void pinger_write_ping(pinger_t* pinger) { + ol_req *req; + int r; + + req = (ol_req*)malloc(sizeof(*req)); + ol_req_init(req, &pinger->handle, pinger_after_write); + r = ol_write2(req, (char*)&PING); + assert(!r); +} + void pinger_after_read(ol_req* req, size_t nread) { unsigned int i; - int r; - pinger* p; + pinger_t* pinger; + + pinger = (pinger_t*)req->handle->data; if (nread == 0) { - ol_close(req->handle); + ol_close(&pinger->handle); return; } - p = (pinger*)req->data; - /* Now we count the pings */ for (i = 0; i < nread; i++) { - assert(p->buf.base[i] == PING[p->state]); - /* 5 = strlen(PING) */ - p->state = (p->state + 1) % 5; - if (p->state == 0) { - p->pongs++; - if (p->pongs < NUM_PINGS) { - r = ol_write2(p->handle, PING); - assert(!r); + assert(pinger->buf.base[i] == PING[pinger->state]); + pinger->state = (pinger->state + 1) % (sizeof(PING) - 1); + if (pinger->state == 0) { + pinger->pongs++; + if (pinger->pongs < NUM_PINGS) { + pinger_write_ping(pinger); } else { - ol_close(p->handle); + ol_close(&pinger->handle); return; } } } - pinger_try_read(p); + pinger_try_read(pinger); } -void pinger_try_read(pinger* pinger) { - pinger->buf.len = BUFSIZE; - pinger->req.cb = pinger_after_read; - ol_read(pinger->handle, &pinger->req, &pinger->buf, 1); +void pinger_try_read(pinger_t* pinger) { + ol_read(&pinger->read_req, &pinger->buf, 1); } void pinger_on_connect(ol_req *req, ol_err err) { - ol_handle *handle = req->handle; - pinger *p; - int r; + pinger_t *pinger = (pinger_t*)req->handle->data; if (err) { - /* error */ assert(0); } - p = calloc(sizeof(pinger), 1); - p->handle = handle; - p->buf.base = p->read_buffer; - p->buf.len = BUFSIZE; - p->req.data = p; - - handle->data = p; - - pinger_try_read(p); - - r = ol_write2(handle, PING); - if (r < 0) { - /* error */ - assert(0); - } + pinger_try_read(pinger); + pinger_write_ping(pinger); } -int pinger_connect(int port) { - /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - ol_handle* handle = ol_tcp_handle_new(pinger_on_close, NULL); +int pinger_new(int port) { struct sockaddr_in client_addr = ol_ip4_addr("0.0.0.0", 0); - struct sockaddr_in server_addr = ol_ip4_addr("127.0.0.1", TEST_PORT); + struct sockaddr_in server_addr = ol_ip4_addr("145.94.50.9", TEST_PORT); + pinger_t *pinger; - ol_bind(handle, (struct sockaddr*)&client_addr); - ol_req_init(&connect_req, &pinger_on_connect); - return ol_connect(handle, &connect_req, (struct sockaddr*)&server_addr); + pinger = (pinger_t*)malloc(sizeof(*pinger)); + pinger->state = 0; + pinger->pongs = 0; + pinger->buf.len = sizeof(pinger->read_buffer); + pinger->buf.base = (char*)&pinger->read_buffer; + + /* Try to connec to the server and do NUM_PINGS ping-pongs. */ + if (ol_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger)) { + return -1; + } + + /* We are never doing multiple reads/connects at a time anyway. */ + /* so these handles can be pre-initialized. */ + ol_req_init(&pinger->connect_req, &pinger->handle, pinger_on_connect); + ol_req_init(&pinger->read_req, &pinger->handle, pinger_after_read); + + ol_bind(&pinger->handle, (struct sockaddr*)&client_addr); + return ol_connect(&pinger->connect_req, (struct sockaddr*)&server_addr); } TEST_IMPL(ping_pong) { ol_init(); - if (pinger_connect(8000)) { + if (pinger_new(8000)) { return 2; } diff --git a/test/test.h b/test/test.h index f65b0bb7..dc855ee9 100644 --- a/test/test.h +++ b/test/test.h @@ -8,5 +8,6 @@ int run_##name() #define TEST_PORT 8123 +#define TEST_PORT_2 8124 #endif /* TEST_H_ */ \ No newline at end of file