From 431195c94483d9de61bdd0825a60659480a5f023 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Thu, 8 Sep 2011 19:10:00 +0200 Subject: [PATCH] unix: run callbacks of pending writes when handle is closed --- src/unix/core.c | 2 ++ src/unix/internal.h | 1 + src/unix/stream.c | 29 +++++++++++++++++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/src/unix/core.c b/src/unix/core.c index 45fef16b..8569eae7 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -235,6 +235,8 @@ void uv__finish_close(uv_handle_t* handle) { case UV_TCP: assert(!ev_is_active(&((uv_stream_t*)handle)->read_watcher)); assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher)); + assert(((uv_stream_t*)handle)->fd == -1); + uv__stream_destroy((uv_stream_t*)handle); break; case UV_UDP: diff --git a/src/unix/internal.h b/src/unix/internal.h index 024a9b37..9d62e6f3 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -81,6 +81,7 @@ void uv_fatal_error(const int errorno, const char* syscall); /* stream */ int uv__stream_open(uv_stream_t*, int fd, int flags); +void uv__stream_destroy(uv_stream_t* stream); void uv__stream_io(EV_P_ ev_io* watcher, int revents); void uv__server_io(EV_P_ ev_io* watcher, int revents); int uv__accept(int sockfd, struct sockaddr* saddr, socklen_t len); diff --git a/src/unix/stream.c b/src/unix/stream.c index 0b5a2a4c..c6aa56c2 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -75,6 +75,35 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { } +/* Clears out the write queue, invokes the callbacks attached + * to each write request. Used when a stream is destroyed. + */ +static void uv__clear_queue(ngx_queue_t* wq, int status, uv_err_code code) { + uv_write_t* req; + ngx_queue_t* q; + + while (!ngx_queue_empty(wq)) { + q = ngx_queue_head(wq); + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_write_t, queue); + if (req->cb) { + uv_err_new(req->handle->loop, code); + req->cb(req, status); + } + + if (req->bufs != req->bufsml) + free(req->bufs); + } +} + + +void uv__stream_destroy(uv_stream_t* stream) { + uv__clear_queue(&stream->write_queue, -1, UV_EINTR); + uv__clear_queue(&stream->write_completed_queue, 0, UV_OK); +} + + void uv__server_io(EV_P_ ev_io* watcher, int revents) { int fd; struct sockaddr_storage addr;