From 42cefd934e4275e3614d4900f2f5c0d35cdfbdf2 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Sat, 16 Apr 2011 14:04:45 -0700 Subject: [PATCH] linux: ping-pong test passes. --- Makefile | 14 ++- oio-unix.c | 193 +++++++++++++++++++++++------------------- test/echo-server.c | 12 +-- test/test-ping-pong.c | 32 ++++--- test/test-runner.c | 1 - 5 files changed, 135 insertions(+), 117 deletions(-) diff --git a/Makefile b/Makefile index e2c3e3a0..6c7e9b5b 100644 --- a/Makefile +++ b/Makefile @@ -1,3 +1,5 @@ +CFLAGS=-ansi -g -Wall +LINKFLAGS=-g -lm all: oio.a test/test-runner TESTS=test/echo-server.c \ @@ -8,22 +10,16 @@ TESTS=test/echo-server.c \ test/test-timeout.c test/test-runner: test/*.h test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a - $(CC) -ansi -g -lm -o test/test-runner test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a - -test/echo-demo: test/echo-demo.c test/echo.o oio.a - $(CC) -ansi -g -o test/echo-demo test/echo-demo.c test/echo.o oio.a -lm - -test/test-ping-pong: test/test-ping-pong.c test/echo.o oio.a - $(CC) -ansi -g -o test/test-ping-pong test/test-ping-pong.c test/echo.o oio.a -lm + $(CC) $(CFLAGS) $(LINKFLAGS) -o test/test-runner test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a oio.a: oio-unix.o ev/ev.o $(AR) rcs oio.a oio-unix.o ev/ev.o oio-unix.o: oio-unix.c oio.h oio-unix.h - $(CC) -ansi -g -c oio-unix.c -o oio-unix.o + $(CC) $(CFLAGS) -c oio-unix.c -o oio-unix.o test/echo.o: test/echo.c test/echo.h - $(CC) -ansi -g -c test/echo.c -o test/echo.o + $(CC) $(CFLAGS) -c test/echo.c -o test/echo.o ev/ev.o: ev/config.h ev/ev.c $(MAKE) -C ev diff --git a/oio-unix.c b/oio-unix.c index d2549013..2b146a9b 100644 --- a/oio-unix.c +++ b/oio-unix.c @@ -26,7 +26,7 @@ size_t strnlen (register const char* s, size_t maxlen) { void oio_tcp_io(EV_P_ ev_io* watcher, int revents); -void oio_tcp_connect(oio_handle* handle, oio_req* req); +void oio_tcp_connect(oio_handle* handle); int oio_tcp_open(oio_handle*, int fd); void oio_finish_close(oio_handle* handle); @@ -49,7 +49,7 @@ void oio_flag_unset(oio_handle* handle, int flag) { int oio_flag_is_set(oio_handle* handle, int flag) { - return handle->flags & flag; + return (handle->flags & flag) != 0; } @@ -78,12 +78,12 @@ struct sockaddr_in oio_ip4_addr(char *ip, int port) { int oio_close(oio_handle* handle) { oio_flag_set(handle, OIO_CLOSING); - if (!ev_is_active(&handle->read_watcher)) { - ev_io_init(&handle->read_watcher, oio_tcp_io, handle->fd, EV_READ); - ev_io_start(EV_DEFAULT_ &handle->read_watcher); + if (!ev_is_active(&handle->write_watcher)) { + ev_io_start(EV_DEFAULT_ &handle->write_watcher); } - ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_READ | EV_WRITE); + ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_WRITE); + assert(ev_is_pending(&handle->write_watcher)); return 0; } @@ -104,22 +104,14 @@ int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb, handle->type = OIO_TCP; handle->close_cb = close_cb; handle->data = data; - handle->accepted_fd = -1; handle->flags = 0; + handle->connect_req = NULL; + handle->accepted_fd = -1; + handle->fd = -1; + ngx_queue_init(&handle->read_reqs); - int fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - oio_err_new(handle, errno); - return -1; - } - - if (oio_tcp_open(handle, fd)) { - close(fd); - return -2; - } - return 0; } @@ -129,6 +121,19 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) { int domain; int r; + if (handle->fd <= 0) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + oio_err_new(handle, errno); + return -1; + } + + if (oio_tcp_open(handle, fd)) { + close(fd); + return -2; + } + } + assert(handle->fd >= 0); if (addr->sa_family == AF_INET) { @@ -148,20 +153,13 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) { } -int oio_tcp_init_fd(int fd) { - int r; +int oio_tcp_open(oio_handle* handle, int fd) { + /* Set non-blocking, etc */ int yes = 1; - r = fcntl(fd, F_SETFL, O_NONBLOCK); + int r = fcntl(fd, F_SETFL, O_NONBLOCK); assert(r == 0); r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)); assert(r == 0); - return 0; -} - - -int oio_tcp_open(oio_handle* handle, int fd) { - /* Set non-blocking, etc */ - oio_tcp_init_fd(fd); handle->fd = fd; @@ -179,7 +177,8 @@ int oio_tcp_open(oio_handle* handle, int fd) { void oio_server_io(EV_P_ ev_io* watcher, int revents) { oio_handle* handle = watcher->data; - + assert(watcher == &handle->read_watcher || + watcher == &handle->write_watcher); assert(revents == EV_READ); if (oio_flag_is_set(handle, OIO_CLOSING)) { @@ -193,8 +192,8 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { } while (1) { - struct sockaddr addr; - socklen_t addrlen; + struct sockaddr addr = { 0 }; + socklen_t addrlen = 0; assert(handle->accepted_fd < 0); int fd = accept(handle->fd, &addr, &addrlen); @@ -212,16 +211,12 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) { } } else { - if (!handle->accept_cb) { - close(fd); - } else { - handle->accepted_fd = fd; - handle->accept_cb(handle); - if (handle->accepted_fd >= 0) { - /* The user hasn't yet accepted called oio_tcp_handle_accept() */ - ev_io_stop(EV_DEFAULT_ &handle->read_watcher); - return; - } + handle->accepted_fd = fd; + handle->accept_cb(handle); + if (handle->accepted_fd >= 0) { + /* The user hasn't yet accepted called oio_tcp_handle_accept() */ + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); + return; } } } @@ -304,12 +299,15 @@ int oio_read_reqs_empty(oio_handle* handle) { void oio__read(oio_handle* handle) { + int errorno; assert(handle->fd >= 0); + /* TODO: should probably while(1) here until EAGAIN */ + /* Get the request at the head of the read_reqs queue. */ oio_req* req = oio_read_reqs_head(handle); if (!req) { - ev_io_stop(EV_DEFAULT_ &(handle->read_watcher)); + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); return; } @@ -325,28 +323,30 @@ void oio__read(oio_handle* handle) { /* Now do the actual read. */ ssize_t nread = readv(handle->fd, iov, iovcnt); + errorno = errno; oio_read_cb cb = req->cb; if (nread < 0) { - if (errno == EAGAIN) { + if (errorno == EAGAIN) { /* Just wait for the next one. */ - assert(ev_is_active(&(handle->read_watcher))); + assert(ev_is_active(&handle->read_watcher)); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); } else { - oio_err err = oio_err_new(handle, errno); + oio_err err = oio_err_new(handle, errorno); if (cb) { cb(req, 0); } - handle->err = errno; + handle->err = err; oio_close(handle); } } else { /* Successful read */ /* First pop the req off handle->read_reqs */ - ngx_queue_remove(&(req->read_reqs)); + ngx_queue_remove(&req->read_reqs); - free(req->read_bufs); + free(req->read_bufs); /* FIXME: we should not be allocing for each read */ req->read_bufs = NULL; /* NOTE: call callback AFTER freeing the request data. */ @@ -355,7 +355,7 @@ void oio__read(oio_handle* handle) { } if (oio_read_reqs_empty(handle)) { - ev_io_stop(EV_DEFAULT_ &(handle->read_watcher)); + ev_io_stop(EV_DEFAULT_ &handle->read_watcher); } } } @@ -363,6 +363,8 @@ void oio__read(oio_handle* handle) { void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { oio_handle* handle = watcher->data; + assert(watcher == &handle->read_watcher || + watcher == &handle->write_watcher); assert(handle->fd >= 0); @@ -372,14 +374,15 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { } if (handle->connect_req) { - oio_tcp_connect(handle, handle->connect_req); + oio_tcp_connect(handle); } else { if (revents & EV_READ) { oio__read(handle); } if (revents & EV_WRITE) { - assert(0 && "Queued writes are not supported"); + /* ignore for now */ + ev_io_stop(EV_DEFAULT_ &handle->write_watcher); } } } @@ -390,26 +393,26 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) { * In order to determine if we've errored out or succeeded must call * getsockopt. */ -void oio_tcp_connect(oio_handle* handle, oio_req* req) { +void oio_tcp_connect(oio_handle* handle) { assert(handle->fd >= 0); + + oio_req* req = handle->connect_req; assert(req); int error; - int errorsize = sizeof(int); + socklen_t errorsize = sizeof(int); getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize); if (!error) { - ev_io_init(&handle->write_watcher, oio_tcp_io, handle->fd, EV_WRITE); - ev_set_cb(&handle->read_watcher, oio_tcp_io); + ev_io_start(EV_DEFAULT_ &handle->read_watcher); /* Successful connection */ + handle->connect_req = NULL; oio_connect_cb connect_cb = req->cb; if (connect_cb) { - connect_cb(req, oio_err_new(handle, 0)); + connect_cb(req, 0); } - req = NULL; - } else if (error == EINPROGRESS) { /* Still connecting. */ return; @@ -430,8 +433,21 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) { int oio_connect(oio_req *req, struct sockaddr* addr) { oio_handle* handle = req->handle; + if (handle->fd <= 0) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + oio_err_new(handle, errno); + return -1; + } + + if (oio_tcp_open(handle, fd)) { + close(fd); + return -2; + } + } + req->type = OIO_CONNECT; - ngx_queue_init(&(req->read_reqs)); + ngx_queue_init(&req->read_reqs); if (handle->connect_req) { return oio_err_new(handle, EALREADY); @@ -443,31 +459,17 @@ int oio_connect(oio_req *req, struct sockaddr* addr) { handle->connect_req = req; - int addrsize; - - if (addr->sa_family == AF_INET) { - addrsize = sizeof(struct sockaddr_in); - handle->fd = socket(AF_INET, SOCK_STREAM, 0); - } else if (addr->sa_family == AF_INET6) { - addrsize = sizeof(struct sockaddr_in6); - handle->fd = socket(AF_INET6, SOCK_STREAM, 0); - } else { - assert(0); - return -1; - } - - /* socket(2) failed */ - if (handle->fd < 0) { - return oio_err_new(handle, errno); - } + int addrsize = sizeof(struct sockaddr_in); int r = connect(handle->fd, addr, addrsize); + if (r != 0 && errno != EINPROGRESS) { + return oio_err_new(handle, r); + } - ev_io_init(&(handle->read_watcher), oio_tcp_io, handle->fd, EV_READ); - ev_io_init(&(handle->write_watcher), oio_tcp_io, handle->fd, EV_WRITE); - ev_io_start(EV_DEFAULT_ &(handle->read_watcher)); + assert(handle->write_watcher.data == handle); + ev_io_start(EV_DEFAULT_ &handle->write_watcher); - return oio_err_new(handle, r); + return 0; } @@ -475,13 +477,16 @@ int oio_write(oio_req *req, oio_buf* bufs, int bufcnt) { oio_handle* handle = req->handle; assert(handle->fd >= 0); ssize_t r; + int errorno; ngx_queue_init(&(req->read_reqs)); req->type = OIO_WRITE; r = writev(handle->fd, (struct iovec*)bufs, bufcnt); + errorno = errno; if (r < 0) { + assert(errorno != EAGAIN && "write queueing not yet supported"); return oio_err_new(handle, r); } else { if (req && req->cb) { @@ -510,22 +515,27 @@ int oio_timeout(oio_req *req, int64_t timeout) { int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) { oio_handle* handle = req->handle; ssize_t nread = -1; - errno = EAGAIN; + int errorno = EAGAIN; oio_read_cb cb = req->cb; assert(handle->fd >= 0); if (ngx_queue_empty(&handle->read_reqs)) { /* Attempt to read immediately. */ - ssize_t nread = readv(handle->fd, (struct iovec*) bufs, bufcnt); + nread = readv(handle->fd, (struct iovec*) bufs, bufcnt); + errorno = errno; } + /* The request should have been just initialized. Therefore the + * ngx_queue_t for read_reqs should be empty. + */ + assert(ngx_queue_empty(&req->read_reqs)); + assert(req->type == OIO_UNKNOWN_REQ); req->type = OIO_READ; - ngx_queue_init(&(req->read_reqs)); - if (nread < 0 && errno != EAGAIN) { + if (nread < 0 && errorno != EAGAIN) { /* Real error. */ - oio_err err = oio_err_new(handle, errno); + oio_err err = oio_err_new(handle, errorno); if (cb) { cb(req, nread); @@ -544,7 +554,7 @@ int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) { /* Either we never read anything, or we got EAGAIN. */ assert(!ngx_queue_empty(&handle->read_reqs) || - (nread < 0 && errno == EAGAIN)); + (nread < 0 && errorno == EAGAIN)); /* Two possible states: * - EAGAIN, meaning the socket is not wriable currently. We must wait for @@ -558,13 +568,20 @@ int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) { /* Copy the bufs data over into our oio_req struct. This is so the user can * free the oio_buf array. The actual data inside the oio_bufs is however * owned by the user and cannot be deallocated until the read completes. + * + * TODO: instead of mallocing here - just have a fixed number of oio_bufs + * included in the oio_req object. */ req->read_bufs = malloc(sizeof(oio_buf) * bufcnt); memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf)); req->read_bufcnt = bufcnt; /* Append the request to read_reqs. */ - ngx_queue_insert_tail(&(handle->read_reqs), &(req->read_reqs)); + ngx_queue_insert_tail(&handle->read_reqs, &req->read_reqs); + + assert(!ngx_queue_empty(&handle->read_reqs)); + assert(handle->read_watcher.data == handle); + assert(handle->read_watcher.fd == handle->fd); ev_io_start(EV_DEFAULT_ &handle->read_watcher); @@ -583,5 +600,5 @@ void oio_req_init(oio_req* req, oio_handle* handle, void* cb) { req->type = OIO_UNKNOWN_REQ; req->cb = cb; req->handle = handle; - ngx_queue_init(&(req->read_reqs)); + ngx_queue_init(&req->read_reqs); } diff --git a/test/echo-server.c b/test/echo-server.c index 531000d5..3f2b2d83 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -10,6 +10,7 @@ typedef struct { oio_req req; oio_buf buf; char read_buffer[BUFSIZE]; + int msg; } peer_t; oio_handle server; @@ -28,17 +29,17 @@ void after_write(oio_req* req) { void after_read(oio_req* req, size_t nread) { - peer_t* peer; + peer_t* peer = req->data; if (nread == 0) { oio_close(req->handle); } else { - peer = (peer_t*) req->data; peer->buf.len = nread; oio_req_init(&peer->req, &peer->handle, after_write); peer->req.data = peer; - if (oio_write(&peer->req, &peer->buf, 1)) + if (oio_write(&peer->req, &peer->buf, 1)) { FATAL(oio_write failed) + } } } @@ -47,8 +48,9 @@ void try_read(peer_t* peer) { peer->buf.len = BUFSIZE; oio_req_init(&peer->req, &peer->handle, after_read); peer->req.data = peer; - if (oio_read(&peer->req, &peer->buf, 1)) + if (oio_read(&peer->req, &peer->buf, 1)) { FATAL(oio_read failed) + } } @@ -60,7 +62,7 @@ void on_close(oio_handle* peer, oio_err err) { void on_accept(oio_handle* server) { - peer_t* p = (peer_t*)malloc(sizeof(peer_t)); + peer_t* p = (peer_t*)calloc(sizeof(peer_t), 1); if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p)) FATAL(oio_tcp_handle_accept failed) diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index c8d08a7e..f87af56a 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -43,23 +43,28 @@ void pinger_after_write(oio_req *req) { } -void pinger_write_ping(pinger_t* pinger) { +static void pinger_write_ping(pinger_t* pinger) { oio_req *req; req = (oio_req*)malloc(sizeof(*req)); oio_req_init(req, &pinger->handle, pinger_after_write); - if (oio_write2(req, (char*)&PING)) + if (oio_write2(req, (char*)&PING)) { FATAL(oio_write2 failed) + } + + puts("PING"); } -void pinger_after_read(oio_req* req, size_t nread) { + +static void pinger_after_read(oio_req* req, size_t nread) { unsigned int i; pinger_t* pinger; pinger = (pinger_t*)req->handle->data; if (nread == 0) { + puts("got EOF"); oio_close(&pinger->handle); return; } @@ -69,6 +74,7 @@ void pinger_after_read(oio_req* req, size_t nread) { ASSERT(pinger->buf.base[i] == PING[pinger->state]) pinger->state = (pinger->state + 1) % (sizeof(PING) - 1); if (pinger->state == 0) { + printf("PONG %d\n", pinger->pongs); pinger->pongs++; if (pinger->pongs < NUM_PINGS) { pinger_write_ping(pinger); @@ -84,6 +90,7 @@ void pinger_after_read(oio_req* req, size_t nread) { void pinger_try_read(pinger_t* pinger) { + oio_req_init(&pinger->read_req, &pinger->handle, pinger_after_read); oio_read(&pinger->read_req, &pinger->buf, 1); } @@ -98,7 +105,8 @@ void pinger_on_connect(oio_req *req, oio_err err) { } -int pinger_new() { +void pinger_new() { + int r; struct sockaddr_in client_addr = oio_ip4_addr("0.0.0.0", 0); struct sockaddr_in server_addr = oio_ip4_addr("127.0.0.1", TEST_PORT); pinger_t *pinger; @@ -106,31 +114,27 @@ int pinger_new() { pinger = (pinger_t*)malloc(sizeof(*pinger)); pinger->state = 0; pinger->pongs = 0; - pinger->buf.len = sizeof(pinger->read_buffer); + pinger->buf.len = BUFSIZE; pinger->buf.base = (char*)&pinger->read_buffer; /* Try to connec to the server and do NUM_PINGS ping-pongs. */ - if (oio_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger)) { - return -1; - } + r = oio_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger); + ASSERT(!r) /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ oio_req_init(&pinger->connect_req, &pinger->handle, pinger_on_connect); - oio_req_init(&pinger->read_req, &pinger->handle, pinger_after_read); oio_bind(&pinger->handle, (struct sockaddr*)&client_addr); - return oio_connect(&pinger->connect_req, (struct sockaddr*)&server_addr); + r = oio_connect(&pinger->connect_req, (struct sockaddr*)&server_addr); + ASSERT(!r) } TEST_IMPL(ping_pong) { oio_init(); - if (pinger_new()) { - return 2; - } - + pinger_new(); oio_run(); ASSERT(completed_pingers == 1) diff --git a/test/test-runner.c b/test/test-runner.c index 18bb7e03..94197a9e 100644 --- a/test/test-runner.c +++ b/test/test-runner.c @@ -2,7 +2,6 @@ #include "test.h" #include "test-runner.h" -#include #include #include