linux: ping-pong test passes.

This commit is contained in:
Ryan Dahl 2011-04-16 14:04:45 -07:00
parent c8bdf15f5d
commit 42cefd934e
5 changed files with 135 additions and 117 deletions

View File

@ -1,3 +1,5 @@
CFLAGS=-ansi -g -Wall
LINKFLAGS=-g -lm
all: oio.a test/test-runner
TESTS=test/echo-server.c \
@ -8,22 +10,16 @@ TESTS=test/echo-server.c \
test/test-timeout.c
test/test-runner: test/*.h test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a
$(CC) -ansi -g -lm -o test/test-runner test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a
test/echo-demo: test/echo-demo.c test/echo.o oio.a
$(CC) -ansi -g -o test/echo-demo test/echo-demo.c test/echo.o oio.a -lm
test/test-ping-pong: test/test-ping-pong.c test/echo.o oio.a
$(CC) -ansi -g -o test/test-ping-pong test/test-ping-pong.c test/echo.o oio.a -lm
$(CC) $(CFLAGS) $(LINKFLAGS) -o test/test-runner test/test-runner.c test/test-runner-unix.c $(TESTS) oio.a
oio.a: oio-unix.o ev/ev.o
$(AR) rcs oio.a oio-unix.o ev/ev.o
oio-unix.o: oio-unix.c oio.h oio-unix.h
$(CC) -ansi -g -c oio-unix.c -o oio-unix.o
$(CC) $(CFLAGS) -c oio-unix.c -o oio-unix.o
test/echo.o: test/echo.c test/echo.h
$(CC) -ansi -g -c test/echo.c -o test/echo.o
$(CC) $(CFLAGS) -c test/echo.c -o test/echo.o
ev/ev.o: ev/config.h ev/ev.c
$(MAKE) -C ev

View File

@ -26,7 +26,7 @@ size_t strnlen (register const char* s, size_t maxlen) {
void oio_tcp_io(EV_P_ ev_io* watcher, int revents);
void oio_tcp_connect(oio_handle* handle, oio_req* req);
void oio_tcp_connect(oio_handle* handle);
int oio_tcp_open(oio_handle*, int fd);
void oio_finish_close(oio_handle* handle);
@ -49,7 +49,7 @@ void oio_flag_unset(oio_handle* handle, int flag) {
int oio_flag_is_set(oio_handle* handle, int flag) {
return handle->flags & flag;
return (handle->flags & flag) != 0;
}
@ -78,12 +78,12 @@ struct sockaddr_in oio_ip4_addr(char *ip, int port) {
int oio_close(oio_handle* handle) {
oio_flag_set(handle, OIO_CLOSING);
if (!ev_is_active(&handle->read_watcher)) {
ev_io_init(&handle->read_watcher, oio_tcp_io, handle->fd, EV_READ);
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
if (!ev_is_active(&handle->write_watcher)) {
ev_io_start(EV_DEFAULT_ &handle->write_watcher);
}
ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_READ | EV_WRITE);
ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_WRITE);
assert(ev_is_pending(&handle->write_watcher));
return 0;
}
@ -104,22 +104,14 @@ int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb,
handle->type = OIO_TCP;
handle->close_cb = close_cb;
handle->data = data;
handle->accepted_fd = -1;
handle->flags = 0;
handle->connect_req = NULL;
handle->accepted_fd = -1;
handle->fd = -1;
ngx_queue_init(&handle->read_reqs);
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
oio_err_new(handle, errno);
return -1;
}
if (oio_tcp_open(handle, fd)) {
close(fd);
return -2;
}
return 0;
}
@ -129,6 +121,19 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) {
int domain;
int r;
if (handle->fd <= 0) {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
oio_err_new(handle, errno);
return -1;
}
if (oio_tcp_open(handle, fd)) {
close(fd);
return -2;
}
}
assert(handle->fd >= 0);
if (addr->sa_family == AF_INET) {
@ -148,20 +153,13 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) {
}
int oio_tcp_init_fd(int fd) {
int r;
int oio_tcp_open(oio_handle* handle, int fd) {
/* Set non-blocking, etc */
int yes = 1;
r = fcntl(fd, F_SETFL, O_NONBLOCK);
int r = fcntl(fd, F_SETFL, O_NONBLOCK);
assert(r == 0);
r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
assert(r == 0);
return 0;
}
int oio_tcp_open(oio_handle* handle, int fd) {
/* Set non-blocking, etc */
oio_tcp_init_fd(fd);
handle->fd = fd;
@ -179,7 +177,8 @@ int oio_tcp_open(oio_handle* handle, int fd) {
void oio_server_io(EV_P_ ev_io* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->read_watcher ||
watcher == &handle->write_watcher);
assert(revents == EV_READ);
if (oio_flag_is_set(handle, OIO_CLOSING)) {
@ -193,8 +192,8 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) {
}
while (1) {
struct sockaddr addr;
socklen_t addrlen;
struct sockaddr addr = { 0 };
socklen_t addrlen = 0;
assert(handle->accepted_fd < 0);
int fd = accept(handle->fd, &addr, &addrlen);
@ -212,16 +211,12 @@ void oio_server_io(EV_P_ ev_io* watcher, int revents) {
}
} else {
if (!handle->accept_cb) {
close(fd);
} else {
handle->accepted_fd = fd;
handle->accept_cb(handle);
if (handle->accepted_fd >= 0) {
/* The user hasn't yet accepted called oio_tcp_handle_accept() */
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
return;
}
handle->accepted_fd = fd;
handle->accept_cb(handle);
if (handle->accepted_fd >= 0) {
/* The user hasn't yet accepted called oio_tcp_handle_accept() */
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
return;
}
}
}
@ -304,12 +299,15 @@ int oio_read_reqs_empty(oio_handle* handle) {
void oio__read(oio_handle* handle) {
int errorno;
assert(handle->fd >= 0);
/* TODO: should probably while(1) here until EAGAIN */
/* Get the request at the head of the read_reqs queue. */
oio_req* req = oio_read_reqs_head(handle);
if (!req) {
ev_io_stop(EV_DEFAULT_ &(handle->read_watcher));
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
return;
}
@ -325,28 +323,30 @@ void oio__read(oio_handle* handle) {
/* Now do the actual read. */
ssize_t nread = readv(handle->fd, iov, iovcnt);
errorno = errno;
oio_read_cb cb = req->cb;
if (nread < 0) {
if (errno == EAGAIN) {
if (errorno == EAGAIN) {
/* Just wait for the next one. */
assert(ev_is_active(&(handle->read_watcher)));
assert(ev_is_active(&handle->read_watcher));
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
} else {
oio_err err = oio_err_new(handle, errno);
oio_err err = oio_err_new(handle, errorno);
if (cb) {
cb(req, 0);
}
handle->err = errno;
handle->err = err;
oio_close(handle);
}
} else {
/* Successful read */
/* First pop the req off handle->read_reqs */
ngx_queue_remove(&(req->read_reqs));
ngx_queue_remove(&req->read_reqs);
free(req->read_bufs);
free(req->read_bufs); /* FIXME: we should not be allocing for each read */
req->read_bufs = NULL;
/* NOTE: call callback AFTER freeing the request data. */
@ -355,7 +355,7 @@ void oio__read(oio_handle* handle) {
}
if (oio_read_reqs_empty(handle)) {
ev_io_stop(EV_DEFAULT_ &(handle->read_watcher));
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
}
}
}
@ -363,6 +363,8 @@ void oio__read(oio_handle* handle) {
void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->read_watcher ||
watcher == &handle->write_watcher);
assert(handle->fd >= 0);
@ -372,14 +374,15 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
}
if (handle->connect_req) {
oio_tcp_connect(handle, handle->connect_req);
oio_tcp_connect(handle);
} else {
if (revents & EV_READ) {
oio__read(handle);
}
if (revents & EV_WRITE) {
assert(0 && "Queued writes are not supported");
/* ignore for now */
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
}
}
}
@ -390,26 +393,26 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
* In order to determine if we've errored out or succeeded must call
* getsockopt.
*/
void oio_tcp_connect(oio_handle* handle, oio_req* req) {
void oio_tcp_connect(oio_handle* handle) {
assert(handle->fd >= 0);
oio_req* req = handle->connect_req;
assert(req);
int error;
int errorsize = sizeof(int);
socklen_t errorsize = sizeof(int);
getsockopt(handle->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
if (!error) {
ev_io_init(&handle->write_watcher, oio_tcp_io, handle->fd, EV_WRITE);
ev_set_cb(&handle->read_watcher, oio_tcp_io);
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
/* Successful connection */
handle->connect_req = NULL;
oio_connect_cb connect_cb = req->cb;
if (connect_cb) {
connect_cb(req, oio_err_new(handle, 0));
connect_cb(req, 0);
}
req = NULL;
} else if (error == EINPROGRESS) {
/* Still connecting. */
return;
@ -430,8 +433,21 @@ void oio_tcp_connect(oio_handle* handle, oio_req* req) {
int oio_connect(oio_req *req, struct sockaddr* addr) {
oio_handle* handle = req->handle;
if (handle->fd <= 0) {
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
oio_err_new(handle, errno);
return -1;
}
if (oio_tcp_open(handle, fd)) {
close(fd);
return -2;
}
}
req->type = OIO_CONNECT;
ngx_queue_init(&(req->read_reqs));
ngx_queue_init(&req->read_reqs);
if (handle->connect_req) {
return oio_err_new(handle, EALREADY);
@ -443,31 +459,17 @@ int oio_connect(oio_req *req, struct sockaddr* addr) {
handle->connect_req = req;
int addrsize;
if (addr->sa_family == AF_INET) {
addrsize = sizeof(struct sockaddr_in);
handle->fd = socket(AF_INET, SOCK_STREAM, 0);
} else if (addr->sa_family == AF_INET6) {
addrsize = sizeof(struct sockaddr_in6);
handle->fd = socket(AF_INET6, SOCK_STREAM, 0);
} else {
assert(0);
return -1;
}
/* socket(2) failed */
if (handle->fd < 0) {
return oio_err_new(handle, errno);
}
int addrsize = sizeof(struct sockaddr_in);
int r = connect(handle->fd, addr, addrsize);
if (r != 0 && errno != EINPROGRESS) {
return oio_err_new(handle, r);
}
ev_io_init(&(handle->read_watcher), oio_tcp_io, handle->fd, EV_READ);
ev_io_init(&(handle->write_watcher), oio_tcp_io, handle->fd, EV_WRITE);
ev_io_start(EV_DEFAULT_ &(handle->read_watcher));
assert(handle->write_watcher.data == handle);
ev_io_start(EV_DEFAULT_ &handle->write_watcher);
return oio_err_new(handle, r);
return 0;
}
@ -475,13 +477,16 @@ int oio_write(oio_req *req, oio_buf* bufs, int bufcnt) {
oio_handle* handle = req->handle;
assert(handle->fd >= 0);
ssize_t r;
int errorno;
ngx_queue_init(&(req->read_reqs));
req->type = OIO_WRITE;
r = writev(handle->fd, (struct iovec*)bufs, bufcnt);
errorno = errno;
if (r < 0) {
assert(errorno != EAGAIN && "write queueing not yet supported");
return oio_err_new(handle, r);
} else {
if (req && req->cb) {
@ -510,22 +515,27 @@ int oio_timeout(oio_req *req, int64_t timeout) {
int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) {
oio_handle* handle = req->handle;
ssize_t nread = -1;
errno = EAGAIN;
int errorno = EAGAIN;
oio_read_cb cb = req->cb;
assert(handle->fd >= 0);
if (ngx_queue_empty(&handle->read_reqs)) {
/* Attempt to read immediately. */
ssize_t nread = readv(handle->fd, (struct iovec*) bufs, bufcnt);
nread = readv(handle->fd, (struct iovec*) bufs, bufcnt);
errorno = errno;
}
/* The request should have been just initialized. Therefore the
* ngx_queue_t for read_reqs should be empty.
*/
assert(ngx_queue_empty(&req->read_reqs));
assert(req->type == OIO_UNKNOWN_REQ);
req->type = OIO_READ;
ngx_queue_init(&(req->read_reqs));
if (nread < 0 && errno != EAGAIN) {
if (nread < 0 && errorno != EAGAIN) {
/* Real error. */
oio_err err = oio_err_new(handle, errno);
oio_err err = oio_err_new(handle, errorno);
if (cb) {
cb(req, nread);
@ -544,7 +554,7 @@ int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) {
/* Either we never read anything, or we got EAGAIN. */
assert(!ngx_queue_empty(&handle->read_reqs) ||
(nread < 0 && errno == EAGAIN));
(nread < 0 && errorno == EAGAIN));
/* Two possible states:
* - EAGAIN, meaning the socket is not wriable currently. We must wait for
@ -558,13 +568,20 @@ int oio_read(oio_req *req, oio_buf* bufs, int bufcnt) {
/* Copy the bufs data over into our oio_req struct. This is so the user can
* free the oio_buf array. The actual data inside the oio_bufs is however
* owned by the user and cannot be deallocated until the read completes.
*
* TODO: instead of mallocing here - just have a fixed number of oio_bufs
* included in the oio_req object.
*/
req->read_bufs = malloc(sizeof(oio_buf) * bufcnt);
memcpy(req->read_bufs, bufs, bufcnt * sizeof(oio_buf));
req->read_bufcnt = bufcnt;
/* Append the request to read_reqs. */
ngx_queue_insert_tail(&(handle->read_reqs), &(req->read_reqs));
ngx_queue_insert_tail(&handle->read_reqs, &req->read_reqs);
assert(!ngx_queue_empty(&handle->read_reqs));
assert(handle->read_watcher.data == handle);
assert(handle->read_watcher.fd == handle->fd);
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
@ -583,5 +600,5 @@ void oio_req_init(oio_req* req, oio_handle* handle, void* cb) {
req->type = OIO_UNKNOWN_REQ;
req->cb = cb;
req->handle = handle;
ngx_queue_init(&(req->read_reqs));
ngx_queue_init(&req->read_reqs);
}

View File

@ -10,6 +10,7 @@ typedef struct {
oio_req req;
oio_buf buf;
char read_buffer[BUFSIZE];
int msg;
} peer_t;
oio_handle server;
@ -28,17 +29,17 @@ void after_write(oio_req* req) {
void after_read(oio_req* req, size_t nread) {
peer_t* peer;
peer_t* peer = req->data;
if (nread == 0) {
oio_close(req->handle);
} else {
peer = (peer_t*) req->data;
peer->buf.len = nread;
oio_req_init(&peer->req, &peer->handle, after_write);
peer->req.data = peer;
if (oio_write(&peer->req, &peer->buf, 1))
if (oio_write(&peer->req, &peer->buf, 1)) {
FATAL(oio_write failed)
}
}
}
@ -47,8 +48,9 @@ void try_read(peer_t* peer) {
peer->buf.len = BUFSIZE;
oio_req_init(&peer->req, &peer->handle, after_read);
peer->req.data = peer;
if (oio_read(&peer->req, &peer->buf, 1))
if (oio_read(&peer->req, &peer->buf, 1)) {
FATAL(oio_read failed)
}
}
@ -60,7 +62,7 @@ void on_close(oio_handle* peer, oio_err err) {
void on_accept(oio_handle* server) {
peer_t* p = (peer_t*)malloc(sizeof(peer_t));
peer_t* p = (peer_t*)calloc(sizeof(peer_t), 1);
if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p))
FATAL(oio_tcp_handle_accept failed)

View File

@ -43,23 +43,28 @@ void pinger_after_write(oio_req *req) {
}
void pinger_write_ping(pinger_t* pinger) {
static void pinger_write_ping(pinger_t* pinger) {
oio_req *req;
req = (oio_req*)malloc(sizeof(*req));
oio_req_init(req, &pinger->handle, pinger_after_write);
if (oio_write2(req, (char*)&PING))
if (oio_write2(req, (char*)&PING)) {
FATAL(oio_write2 failed)
}
puts("PING");
}
void pinger_after_read(oio_req* req, size_t nread) {
static void pinger_after_read(oio_req* req, size_t nread) {
unsigned int i;
pinger_t* pinger;
pinger = (pinger_t*)req->handle->data;
if (nread == 0) {
puts("got EOF");
oio_close(&pinger->handle);
return;
}
@ -69,6 +74,7 @@ void pinger_after_read(oio_req* req, size_t nread) {
ASSERT(pinger->buf.base[i] == PING[pinger->state])
pinger->state = (pinger->state + 1) % (sizeof(PING) - 1);
if (pinger->state == 0) {
printf("PONG %d\n", pinger->pongs);
pinger->pongs++;
if (pinger->pongs < NUM_PINGS) {
pinger_write_ping(pinger);
@ -84,6 +90,7 @@ void pinger_after_read(oio_req* req, size_t nread) {
void pinger_try_read(pinger_t* pinger) {
oio_req_init(&pinger->read_req, &pinger->handle, pinger_after_read);
oio_read(&pinger->read_req, &pinger->buf, 1);
}
@ -98,7 +105,8 @@ void pinger_on_connect(oio_req *req, oio_err err) {
}
int pinger_new() {
void pinger_new() {
int r;
struct sockaddr_in client_addr = oio_ip4_addr("0.0.0.0", 0);
struct sockaddr_in server_addr = oio_ip4_addr("127.0.0.1", TEST_PORT);
pinger_t *pinger;
@ -106,31 +114,27 @@ int pinger_new() {
pinger = (pinger_t*)malloc(sizeof(*pinger));
pinger->state = 0;
pinger->pongs = 0;
pinger->buf.len = sizeof(pinger->read_buffer);
pinger->buf.len = BUFSIZE;
pinger->buf.base = (char*)&pinger->read_buffer;
/* Try to connec to the server and do NUM_PINGS ping-pongs. */
if (oio_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger)) {
return -1;
}
r = oio_tcp_handle_init(&pinger->handle, pinger_on_close, (void*)pinger);
ASSERT(!r)
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
oio_req_init(&pinger->connect_req, &pinger->handle, pinger_on_connect);
oio_req_init(&pinger->read_req, &pinger->handle, pinger_after_read);
oio_bind(&pinger->handle, (struct sockaddr*)&client_addr);
return oio_connect(&pinger->connect_req, (struct sockaddr*)&server_addr);
r = oio_connect(&pinger->connect_req, (struct sockaddr*)&server_addr);
ASSERT(!r)
}
TEST_IMPL(ping_pong) {
oio_init();
if (pinger_new()) {
return 2;
}
pinger_new();
oio_run();
ASSERT(completed_pingers == 1)

View File

@ -2,7 +2,6 @@
#include "test.h"
#include "test-runner.h"
#include <stdio.h>
#include <stdio.h>
#include <string.h>