unix: fix test-callback-stack

This commit is contained in:
Ryan Dahl 2011-05-04 13:56:50 -07:00
parent 6a4ead0eac
commit 54c9436501
2 changed files with 53 additions and 26 deletions

View File

@ -388,6 +388,35 @@ void oio__next(EV_P_ ev_idle* watcher, int revents) {
} }
static void oio__drain(oio_handle* handle) {
assert(!oio_write_queue_head(handle));
assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
/* Shutdown? */
if (oio_flag_is_set(handle, OIO_SHUTTING) &&
!oio_flag_is_set(handle, OIO_CLOSING) &&
!oio_flag_is_set(handle, OIO_SHUT)) {
assert(handle->shutdown_req);
oio_req* req = handle->shutdown_req;
oio_shutdown_cb cb = req->cb;
if (shutdown(handle->fd, SHUT_WR)) {
/* Error. Nothing we can do, close the handle. */
oio_err_new(handle, errno);
oio_close(handle);
if (cb) cb(req, -1);
} else {
oio_err_new(handle, 0);
oio_flag_set(handle, OIO_SHUT);
if (cb) cb(req, 0);
}
}
}
void oio__write(oio_handle* handle) { void oio__write(oio_handle* handle) {
assert(handle->fd >= 0); assert(handle->fd >= 0);
@ -396,8 +425,8 @@ void oio__write(oio_handle* handle) {
/* Get the request at the head of the queue. */ /* Get the request at the head of the queue. */
oio_req* req = oio_write_queue_head(handle); oio_req* req = oio_write_queue_head(handle);
if (!req) { if (!req) {
/* This probably shouldn't happen. Maybe assert(0) here. */ assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher); oio__drain(handle);
return; return;
} }
@ -477,28 +506,7 @@ void oio__write(oio_handle* handle) {
assert(handle->write_queue_size > 0); assert(handle->write_queue_size > 0);
} else { } else {
/* Write queue drained. */ /* Write queue drained. */
assert(handle->write_queue_size == 0); oio__drain(handle);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
if (oio_flag_is_set(handle, OIO_SHUTTING) &&
!oio_flag_is_set(handle, OIO_CLOSING) &&
!oio_flag_is_set(handle, OIO_SHUT)) {
assert(handle->shutdown_req);
req = handle->shutdown_req;
oio_shutdown_cb cb = req->cb;
if (shutdown(handle->fd, SHUT_WR)) {
/* Error. Nothing we can do, close the handle. */
oio_err_new(handle, errno);
oio_close(handle);
if (cb) cb(req, -1);
} else {
oio_err_new(handle, 0);
oio_flag_set(handle, OIO_SHUT);
if (cb) cb(req, 0);
}
}
} }
return; return;
@ -577,6 +585,8 @@ int oio_shutdown(oio_req* req) {
oio_flag_set(handle, OIO_SHUTTING); oio_flag_set(handle, OIO_SHUTTING);
ev_io_start(EV_DEFAULT_UC_ &handle->write_watcher);
return 0; return 0;
} }

View File

@ -61,7 +61,14 @@ void shutdown_cb(oio_req* req, int status) {
void read_cb(oio_handle* handle, int nread, oio_buf buf) { void read_cb(oio_handle* handle, int nread, oio_buf buf) {
ASSERT(nested == 0 && "read_cb must be called from a fresh stack"); ASSERT(nested == 0 && "read_cb must be called from a fresh stack");
if (nread == -1) { printf("Read. nread == %d\n", nread);
free(buf.base);
if (nread == 0) {
ASSERT(oio_last_error().code == OIO_EAGAIN);
return;
} else if (nread == -1) {
ASSERT(oio_last_error().code == OIO_EOF); ASSERT(oio_last_error().code == OIO_EOF);
nested++; nested++;
@ -74,7 +81,6 @@ void read_cb(oio_handle* handle, int nread, oio_buf buf) {
} }
bytes_received += nread; bytes_received += nread;
free(buf.base);
/* We call shutdown here because when bytes_received == sizeof MESSAGE */ /* We call shutdown here because when bytes_received == sizeof MESSAGE */
/* there will be no more data sent nor received, so here it would be */ /* there will be no more data sent nor received, so here it would be */
@ -83,6 +89,9 @@ void read_cb(oio_handle* handle, int nread, oio_buf buf) {
if (bytes_received == sizeof MESSAGE) { if (bytes_received == sizeof MESSAGE) {
nested++; nested++;
oio_req_init(&shutdown_req, handle, shutdown_cb); oio_req_init(&shutdown_req, handle, shutdown_cb);
puts("Shutdown");
if (oio_shutdown(&shutdown_req)) { if (oio_shutdown(&shutdown_req)) {
FATAL("oio_shutdown failed"); FATAL("oio_shutdown failed");
} }
@ -95,6 +104,8 @@ void timeout_cb(oio_req* req, int64_t skew, int status) {
ASSERT(status == 0); ASSERT(status == 0);
ASSERT(nested == 0 && "timeout_cb must be called from a fresh stack"); ASSERT(nested == 0 && "timeout_cb must be called from a fresh stack");
puts("Timeout complete. Now read data...");
nested++; nested++;
if (oio_read_start(&client, read_cb)) { if (oio_read_start(&client, read_cb)) {
FATAL("oio_read_start failed"); FATAL("oio_read_start failed");
@ -109,6 +120,8 @@ void write_cb(oio_req* req, int status) {
ASSERT(status == 0); ASSERT(status == 0);
ASSERT(nested == 0 && "write_cb must be called from a fresh stack"); ASSERT(nested == 0 && "write_cb must be called from a fresh stack");
puts("Data written. 500ms timeout...");
/* After the data has been sent, we're going to wait for a while, then */ /* After the data has been sent, we're going to wait for a while, then */
/* start reading. This makes us certain that the message has been echoed */ /* start reading. This makes us certain that the message has been echoed */
/* back to our receive buffer when we start reading. This maximizes the */ /* back to our receive buffer when we start reading. This maximizes the */
@ -127,6 +140,8 @@ void write_cb(oio_req* req, int status) {
void connect_cb(oio_req* req, int status) { void connect_cb(oio_req* req, int status) {
oio_buf buf; oio_buf buf;
puts("Connected. Write some data to echo server...");
ASSERT(status == 0); ASSERT(status == 0);
ASSERT(nested == 0 && "connect_cb must be called from a fresh stack"); ASSERT(nested == 0 && "connect_cb must be called from a fresh stack");
@ -165,6 +180,8 @@ TEST_IMPL(callback_stack) {
FATAL("oio_tcp_init failed"); FATAL("oio_tcp_init failed");
} }
puts("Connecting...");
nested++; nested++;
oio_req_init(&connect_req, &client, connect_cb); oio_req_init(&connect_req, &client, connect_cb);
if (oio_connect(&connect_req, (struct sockaddr*) &addr)) { if (oio_connect(&connect_req, (struct sockaddr*) &addr)) {