shutdown half-implemented on unix

This commit is contained in:
Ryan Dahl 2011-05-03 12:38:15 -07:00
parent a8e4c0bc6b
commit 7de6861146
4 changed files with 74 additions and 13 deletions

View File

@ -47,9 +47,11 @@ void oio_finish_close(oio_handle* handle);
/* flags */
enum {
OIO_CLOSING = 0x00000001,
OIO_CLOSED = 0x00000002,
OIO_READING = 0x00000004,
OIO_CLOSING = 0x00000001, /* oio_close() called but not finished. */
OIO_CLOSED = 0x00000002, /* close(2) finished. */
OIO_READING = 0x00000004, /* oio_read_start() called. */
OIO_SHUTTING = 0x00000008, /* oio_shutdown() called but not complete. */
OIO_SHUT = 0x00000010, /* Write side closed. */
};
@ -471,13 +473,10 @@ void oio__write(oio_handle* handle) {
handle->write_queue_size -= n;
n = 0;
assert(buf->base > 0x100);
/* There is more to write. Break and ensure the watcher is pending. */
break;
} else {
assert(buf->base > 0x100);
/* Finished writing the buf at index req->write_index. */
req->write_index++;
@ -501,11 +500,32 @@ void oio__write(oio_handle* handle) {
cb(req, 0);
}
if (ngx_queue_empty(&handle->write_queue)) {
if (!ngx_queue_empty(&handle->write_queue)) {
assert(handle->write_queue_size > 0);
} else {
/* Write queue drained. */
assert(handle->write_queue_size == 0);
ev_io_stop(EV_DEFAULT_ &handle->write_watcher);
} else {
assert(handle->write_queue_size > 0);
if (oio_flag_is_set(handle, OIO_SHUTTING) &&
!oio_flag_is_set(handle, OIO_CLOSING) &&
!oio_flag_is_set(handle, OIO_SHUT)) {
assert(handle->shutdown_req);
req = handle->shutdown_req;
oio_shutdown_cb cb = req->cb;
if (shutdown(handle->fd, SHUT_WR)) {
/* Error. Nothing we can do, close the handle. */
oio_err_new(req, errno);
oio_close(handle);
if (cb) cb(req, -1);
} else {
oio_err_new(handle, 0);
oio_flag_set(handle, OIO_SHUT);
if (cb) cb(req, 0);
}
}
}
return;
@ -569,6 +589,25 @@ void oio__read(oio_handle* handle) {
}
int oio_shutdown(oio_req* req) {
oio_handle* handle = req->handle;
assert(handle->fd >= 0);
if (oio_flag_is_set(handle, OIO_SHUT) ||
oio_flag_is_set(handle, OIO_CLOSED) ||
oio_flag_is_set(handle, OIO_CLOSING)) {
return -1;
}
handle->shutdown_req = req;
req->type = OIO_SHUTDOWN;
oio_flag_set(handle, OIO_SHUTTING);
return 0;
}
void oio__tcp_io(EV_P_ ev_io* watcher, int revents) {
oio_handle* handle = watcher->data;
assert(watcher == &handle->read_watcher ||

View File

@ -53,6 +53,7 @@ typedef struct {
oio_accept_cb accept_cb; \
int accepted_fd; \
oio_req *connect_req; \
oio_req *shutdown_req; \
ev_io read_watcher; \
ev_io write_watcher; \
ev_idle next_watcher; \

1
oio.h
View File

@ -52,6 +52,7 @@ typedef oio_buf (*oio_alloc_cb)(oio_handle* handle, size_t suggested_size);
typedef void (*oio_read_cb)(oio_handle *handle, int nread, oio_buf buf);
typedef void (*oio_write_cb)(oio_req* req, int status);
typedef void (*oio_connect_cb)(oio_req* req, int status);
typedef void (*oio_shutdown_cb)(oio_req* req, int status);
typedef void (*oio_accept_cb)(oio_handle* handle);
typedef void (*oio_close_cb)(oio_handle* handle, int status);
typedef void (*oio_timer_cb)(oio_req* req, int64_t skew, int status);

View File

@ -35,6 +35,7 @@
static char* send_buffer;
static int shutdown_cb_called = 0;
static int connect_cb_called = 0;
static int write_cb_called = 0;
static int close_cb_called = 0;
@ -54,11 +55,26 @@ static void close_cb(oio_handle* handle, int status) {
}
static void shutdown_cb(oio_req* req, int status) {
ASSERT(req);
ASSERT(status == 0);
/* Now we wait for the EOF */
shutdown_cb_called++;
/* We should have had all the writes called already. */
ASSERT(write_cb_called == WRITES);
free(req);
}
static void read_cb(oio_handle* handle, int nread, oio_buf buf) {
ASSERT(handle != NULL);
if (nread < 0) {
ASSERT(oio_last_error().code == OIO_EOF);
printf("GOT EOF\n");
if (buf.base) {
free(buf.base);
@ -70,10 +86,6 @@ static void read_cb(oio_handle* handle, int nread, oio_buf buf) {
bytes_received_done += nread;
/* TODO: fix this when we support sending EOF. */
if (bytes_received_done == TOTAL_BYTES)
oio_close(handle);
free(buf.base);
}
@ -123,6 +135,13 @@ static void connect_cb(oio_req* req, int status) {
ASSERT(r == 0);
}
/* Shutdown on drain. FIXME: dealloc req? */
req = (oio_req*) malloc(sizeof(oio_req));
ASSERT(req != NULL);
oio_req_init(req, handle, shutdown_cb);
r = oio_shutdown(req);
ASSERT(r == 0);
/* Start reading */
req = (oio_req*)malloc(sizeof *req);
ASSERT(req != NULL);
@ -165,6 +184,7 @@ TEST_IMPL(tcp_writealot) {
oio_run();
ASSERT(shutdown_cb_called == 1);
ASSERT(connect_cb_called == 1);
ASSERT(write_cb_called == WRITES);
ASSERT(close_cb_called == 1);