API change: oio_tcp_handle_init() required before oio_accept()

Fixes close_cb_stack on UNIX.
This commit is contained in:
Ryan Dahl 2011-04-17 22:49:58 -07:00
parent d3ba74a681
commit 2ba25c8249
5 changed files with 59 additions and 34 deletions

9
TODO
View File

@ -1,9 +0,0 @@
- "test-runner ping_pong" should run just the ping pong test with the echo
server process.
- FATAL should take a string
- ASSERT and FATAL should require semicolons.
- Should oio_req need to be initialized before each use? Currently in Linux
this is necessary.

View File

@ -26,6 +26,7 @@ size_t strnlen (register const char* s, size_t maxlen) {
void oio_tcp_io(EV_P_ ev_io* watcher, int revents);
void oio__next(EV_P_ ev_idle* watcher, int revents);
void oio_tcp_connect(oio_handle* handle);
int oio_tcp_open(oio_handle*, int fd);
void oio_finish_close(oio_handle* handle);
@ -78,12 +79,12 @@ struct sockaddr_in oio_ip4_addr(char *ip, int port) {
int oio_close(oio_handle* handle) {
oio_flag_set(handle, OIO_CLOSING);
if (!ev_is_active(&handle->write_watcher)) {
ev_io_start(EV_DEFAULT_ &handle->write_watcher);
}
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
ev_feed_fd_event(EV_DEFAULT_ handle->fd, EV_WRITE);
assert(ev_is_pending(&handle->write_watcher));
ev_idle_start(EV_DEFAULT_ &handle->next_watcher);
ev_feed_event(EV_DEFAULT_ &handle->next_watcher, EV_IDLE);
assert(ev_is_pending(&handle->next_watcher));
return 0;
}
@ -105,13 +106,21 @@ int oio_tcp_handle_init(oio_handle *handle, oio_close_cb close_cb,
handle->close_cb = close_cb;
handle->data = data;
handle->flags = 0;
handle->connect_req = NULL;
handle->accepted_fd = -1;
handle->fd = -1;
ngx_queue_init(&handle->read_reqs);
ev_init(&handle->next_watcher, oio__next);
handle->next_watcher.data = handle;
ev_init(&handle->read_watcher, oio_tcp_io);
handle->read_watcher.data = handle;
ev_init(&handle->write_watcher, oio_tcp_io);
handle->write_watcher.data = handle;
return 0;
}
@ -154,37 +163,41 @@ int oio_bind(oio_handle* handle, struct sockaddr* addr) {
int oio_tcp_open(oio_handle* handle, int fd) {
/* Set non-blocking, etc */
assert(fd >= 0);
handle->fd = fd;
/* Set non-blocking. */
int yes = 1;
int r = fcntl(fd, F_SETFL, O_NONBLOCK);
assert(r == 0);
/* Reuse the port address. */
r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int));
assert(r == 0);
handle->fd = fd;
/* Initialize the queue structure for oio_read() requests. */
ngx_queue_init(&handle->read_reqs);
ev_io_init(&handle->read_watcher, oio_tcp_io, fd, EV_READ);
ev_io_init(&handle->write_watcher, oio_tcp_io, fd, EV_WRITE);
/* Associate the fd with each ev_io watcher. */
ev_io_set(&handle->read_watcher, fd, EV_READ);
ev_io_set(&handle->write_watcher, fd, EV_WRITE);
handle->read_watcher.data = handle;
handle->write_watcher.data = handle;
/* These should have been set up by oio_tcp_handle_init. */
assert(handle->next_watcher.data == handle);
assert(handle->write_watcher.data == handle);
assert(handle->read_watcher.data == handle);
return 0;
}
void oio_server_io(EV_P_ ev_io* watcher, int revents) {
void oio__server_io(EV_P_ ev_io* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->read_watcher ||
watcher == &handle->write_watcher);
assert(revents == EV_READ);
if (oio_flag_is_set(handle, OIO_CLOSING)) {
oio_finish_close(handle);
return;
}
assert(!oio_flag_is_set(handle, OIO_CLOSING));
if (handle->accepted_fd >= 0) {
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
@ -251,20 +264,25 @@ int oio_listen(oio_handle* handle, int backlog, oio_accept_cb cb) {
}
handle->accept_cb = cb;
ev_io_init(&handle->read_watcher, oio_server_io, handle->fd, EV_READ);
/* Start listening for connections. */
ev_io_set(&handle->read_watcher, handle->fd, EV_READ);
ev_set_cb(&handle->read_watcher, oio__server_io);
ev_io_start(EV_DEFAULT_ &handle->read_watcher);
handle->read_watcher.data = handle;
return 0;
}
void oio_finish_close(oio_handle* handle) {
assert(oio_flag_is_set(handle, OIO_CLOSING));
assert(!oio_flag_is_set(handle, OIO_CLOSED));
oio_flag_set(handle, OIO_CLOSED);
ev_io_stop(EV_DEFAULT_ &handle->read_watcher);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
ev_idle_stop(EV_DEFAULT_ &handle->next_watcher);
close(handle->fd);
handle->fd = -1;
@ -298,6 +316,19 @@ int oio_read_reqs_empty(oio_handle* handle) {
}
void oio__next(EV_P_ ev_idle* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->next_watcher);
assert(revents == EV_IDLE);
/* For now this function is only to handle the closing event, but we might
* put more stuff here later.
*/
assert(oio_flag_is_set(handle, OIO_CLOSING));
oio_finish_close(handle);
}
void oio__read(oio_handle* handle) {
int errorno;
assert(handle->fd >= 0);
@ -368,10 +399,7 @@ void oio_tcp_io(EV_P_ ev_io* watcher, int revents) {
assert(handle->fd >= 0);
if (oio_flag_is_set(handle, OIO_CLOSING)) {
oio_finish_close(handle);
return;
}
assert(!oio_flag_is_set(handle, OIO_CLOSING));
if (handle->connect_req) {
oio_tcp_connect(handle);

View File

@ -35,6 +35,7 @@ typedef struct {
oio_req *connect_req; \
ev_io read_watcher; \
ev_io write_watcher; \
ev_idle next_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t read_reqs;

1
oio.h
View File

@ -60,6 +60,7 @@ struct oio_handle_s {
oio_handle_private_fields
};
struct oio_req_s {
/* read-only */
oio_req_type type;

View File

@ -64,8 +64,12 @@ void on_close(oio_handle* peer, oio_err err) {
void on_accept(oio_handle* server) {
peer_t* p = (peer_t*)calloc(sizeof(peer_t), 1);
if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p))
int r = oio_tcp_handle_init(&p->handle, on_close, (void*)p);
ASSERT(!r)
if (oio_tcp_handle_accept(server, &p->handle, on_close, (void*)p)) {
FATAL(oio_tcp_handle_accept failed)
}
p->buf.base = (char*)&p->read_buffer;