From 52c8a8617de020a14b87477a15ddaf107ff34445 Mon Sep 17 00:00:00 2001 From: Ben Noordhuis Date: Mon, 26 Nov 2012 03:12:38 +0100 Subject: [PATCH] unix: add uv_cancel() --- include/uv-private/uv-unix.h | 2 +- include/uv.h | 13 ++ src/unix/fs.c | 9 +- src/unix/getaddrinfo.c | 9 +- src/unix/internal.h | 2 +- src/unix/threadpool.c | 67 +++++++++- src/win/threadpool.c | 5 + test/test-list.h | 6 + test/test-threadpool-cancel.c | 226 ++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 10 files changed, 331 insertions(+), 9 deletions(-) create mode 100644 test/test-threadpool-cancel.c diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 683a9c92..a92e8e67 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -60,7 +60,7 @@ struct uv__io_s { struct uv__work { void (*work)(struct uv__work *w); - void (*done)(struct uv__work *w); + void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; ngx_queue_t wq; }; diff --git a/include/uv.h b/include/uv.h index d2a5e09c..b4fc3333 100644 --- a/include/uv.h +++ b/include/uv.h @@ -1386,6 +1386,19 @@ struct uv_work_s { UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb); +/* Cancel a pending request. Fails if the request is executing or has finished + * executing. + * + * Returns 0 on success, -1 on error. The loop error code is not touched. + * + * Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is + * currently supported. + * + * This function is currently only implemented on UNIX platforms. On Windows, + * it always returns -1. + */ +UV_EXTERN int uv_cancel(uv_req_t* req); + struct uv_cpu_info_s { char* model; diff --git a/src/unix/fs.c b/src/unix/fs.c index 1957fc14..493bdc2f 100644 --- a/src/unix/fs.c +++ b/src/unix/fs.c @@ -90,7 +90,7 @@ } \ else { \ uv__fs_work(&(req)->work_req); \ - uv__fs_done(&(req)->work_req); \ + uv__fs_done(&(req)->work_req, 0); \ return (req)->result; \ } \ } \ @@ -516,12 +516,17 @@ static void uv__fs_work(struct uv__work* w) { } -static void uv__fs_done(struct uv__work* w) { +static void uv__fs_done(struct uv__work* w, int status) { uv_fs_t* req; req = container_of(w, uv_fs_t, work_req); uv__req_unregister(req->loop, req); + if (status != 0) { + uv_fs_req_cleanup(req); + return; + } + if (req->errorno != 0) { req->errorno = uv_translate_sys_error(req->errorno); uv__set_artificial_error(req->loop, req->errorno); diff --git a/src/unix/getaddrinfo.c b/src/unix/getaddrinfo.c index bf2fef42..d021f46f 100644 --- a/src/unix/getaddrinfo.c +++ b/src/unix/getaddrinfo.c @@ -37,7 +37,7 @@ static void uv__getaddrinfo_work(struct uv__work* w) { } -static void uv__getaddrinfo_done(struct uv__work* w) { +static void uv__getaddrinfo_done(struct uv__work* w, int status) { uv_getaddrinfo_t* req = container_of(w, uv_getaddrinfo_t, work_req); struct addrinfo *res = req->res; #if __sun @@ -63,6 +63,13 @@ static void uv__getaddrinfo_done(struct uv__work* w) { else assert(0); + req->hints = NULL; + req->service = NULL; + req->hostname = NULL; + + if (status != 0) + return; + if (req->retcode == 0) { /* OK */ #if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */ diff --git a/src/unix/internal.h b/src/unix/internal.h index a82b8619..7561427b 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -174,7 +174,7 @@ void uv__signal_loop_cleanup(); void uv__work_submit(uv_loop_t* loop, struct uv__work *w, void (*work)(struct uv__work *w), - void (*done)(struct uv__work *w)); + void (*done)(struct uv__work *w, int status)); void uv__work_done(uv_async_t* handle, int status); /* platform specific */ diff --git a/src/unix/threadpool.c b/src/unix/threadpool.c index 9f56229c..3f6fe380 100644 --- a/src/unix/threadpool.c +++ b/src/unix/threadpool.c @@ -30,6 +30,9 @@ static ngx_queue_t wq; static volatile int initialized; +/* To avoid deadlock with uv_cancel() it's crucial that the worker + * never holds the global mutex and the loop-local mutex at the same time. + */ static void worker(void* arg) { struct uv__work* w; ngx_queue_t* q; @@ -46,8 +49,11 @@ static void worker(void* arg) { if (q == &exit_message) uv_cond_signal(&cond); - else + else { ngx_queue_remove(q); + ngx_queue_init(q); /* Signal uv_cancel() that the work req is + executing. */ + } uv_mutex_unlock(&mutex); @@ -58,6 +64,8 @@ static void worker(void* arg) { w->work(w); uv_mutex_lock(&w->loop->wq_mutex); + w->work = NULL; /* Signal uv_cancel() that the work req is done + executing. */ ngx_queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -116,7 +124,7 @@ static void cleanup(void) { void uv__work_submit(uv_loop_t* loop, struct uv__work* w, void (*work)(struct uv__work* w), - void (*done)(struct uv__work* w)) { + void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; @@ -125,6 +133,29 @@ void uv__work_submit(uv_loop_t* loop, } +int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { + int cancelled; + + uv_mutex_lock(&mutex); + uv_mutex_lock(&w->loop->wq_mutex); + + cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL; + if (cancelled) + ngx_queue_remove(&w->wq); + + uv_mutex_unlock(&w->loop->wq_mutex); + uv_mutex_unlock(&mutex); + + if (!cancelled) + return -1; + + ngx_queue_init(&w->wq); + w->done(w, -UV_ECANCELED); + + return 0; +} + + void uv__work_done(uv_async_t* handle, int status) { struct uv__work* w; uv_loop_t* loop; @@ -146,7 +177,7 @@ void uv__work_done(uv_async_t* handle, int status) { ngx_queue_remove(q); w = container_of(q, struct uv__work, wq); - w->done(w); + w->done(w, 0); } } @@ -158,11 +189,14 @@ static void uv__queue_work(struct uv__work* w) { } -static void uv__queue_done(struct uv__work* w) { +static void uv__queue_done(struct uv__work* w, int status) { uv_work_t* req = container_of(w, uv_work_t, work_req); uv__req_unregister(req->loop, req); + if (status != 0) + return; + if (req->after_work_cb) req->after_work_cb(req); } @@ -182,3 +216,28 @@ int uv_queue_work(uv_loop_t* loop, uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done); return 0; } + + +int uv_cancel(uv_req_t* req) { + struct uv__work* wreq; + uv_loop_t* loop; + + switch (req->type) { + case UV_FS: + loop = ((uv_fs_t*) req)->loop; + wreq = &((uv_fs_t*) req)->work_req; + break; + case UV_GETADDRINFO: + loop = ((uv_getaddrinfo_t*) req)->loop; + wreq = &((uv_getaddrinfo_t*) req)->work_req; + break; + case UV_WORK: + loop = ((uv_work_t*) req)->loop; + wreq = &((uv_work_t*) req)->work_req; + break; + default: + return -1; + } + + return uv__work_cancel(loop, req, wreq); +} diff --git a/src/win/threadpool.c b/src/win/threadpool.c index 5118fd94..2452bdb0 100644 --- a/src/win/threadpool.c +++ b/src/win/threadpool.c @@ -70,6 +70,11 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, } +int uv_cancel(uv_req_t* req) { + return -1; +} + + void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) { uv__req_unregister(loop, req); if(req->after_work_cb) diff --git a/test/test-list.h b/test/test-list.h index f7f3c190..6b43ea19 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -187,6 +187,9 @@ TEST_DECLARE (fs_rename_to_existing_file) TEST_DECLARE (threadpool_queue_work_simple) TEST_DECLARE (threadpool_queue_work_einval) TEST_DECLARE (threadpool_multiple_event_loops) +TEST_DECLARE (threadpool_cancel_getaddrinfo) +TEST_DECLARE (threadpool_cancel_work) +TEST_DECLARE (threadpool_cancel_fs) TEST_DECLARE (thread_mutex) TEST_DECLARE (thread_rwlock) TEST_DECLARE (thread_create) @@ -454,6 +457,9 @@ TASK_LIST_START TEST_ENTRY (threadpool_queue_work_simple) TEST_ENTRY (threadpool_queue_work_einval) TEST_ENTRY (threadpool_multiple_event_loops) + TEST_ENTRY (threadpool_cancel_getaddrinfo) + TEST_ENTRY (threadpool_cancel_work) + TEST_ENTRY (threadpool_cancel_fs) TEST_ENTRY (thread_mutex) TEST_ENTRY (thread_rwlock) TEST_ENTRY (thread_create) diff --git a/test/test-threadpool-cancel.c b/test/test-threadpool-cancel.c new file mode 100644 index 00000000..e009e0c0 --- /dev/null +++ b/test/test-threadpool-cancel.c @@ -0,0 +1,226 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#define INIT_CANCEL_INFO(ci, what) \ + do { \ + (ci)->reqs = (what); \ + (ci)->nreqs = ARRAY_SIZE(what); \ + (ci)->stride = sizeof((what)[0]); \ + } \ + while (0) + +struct cancel_info { + void* reqs; + unsigned nreqs; + unsigned stride; + uv_timer_t timer_handle; +}; + +static uv_cond_t signal_cond; +static uv_mutex_t signal_mutex; +static uv_mutex_t wait_mutex; +static unsigned num_threads; +static unsigned done_cb_called; +static unsigned timer_cb_called; + + +static void work_cb(uv_work_t* req) { + uv_mutex_lock(&signal_mutex); + uv_cond_signal(&signal_cond); + uv_mutex_unlock(&signal_mutex); + + uv_mutex_lock(&wait_mutex); + uv_mutex_unlock(&wait_mutex); +} + + +static void done_cb(uv_work_t* req) { + done_cb_called++; + free(req); +} + + +static void saturate_threadpool(void) { + uv_work_t* req; + + ASSERT(0 == uv_cond_init(&signal_cond)); + ASSERT(0 == uv_mutex_init(&signal_mutex)); + ASSERT(0 == uv_mutex_init(&wait_mutex)); + + uv_mutex_lock(&signal_mutex); + uv_mutex_lock(&wait_mutex); + + for (num_threads = 0; /* empty */; num_threads++) { + req = malloc(sizeof(*req)); + ASSERT(req != NULL); + ASSERT(0 == uv_queue_work(uv_default_loop(), req, work_cb, done_cb)); + + /* Expect to get signalled within 350 ms, otherwise assume that + * the thread pool is saturated. As with any timing dependent test, + * this is obviously not ideal. + */ + if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) { + ASSERT(0 == uv_cancel((uv_req_t*) req)); + free(req); + break; + } + } +} + + +static void unblock_threadpool(void) { + uv_mutex_unlock(&signal_mutex); + uv_mutex_unlock(&wait_mutex); +} + + +static void cleanup_threadpool(void) { + ASSERT(done_cb_called == num_threads); + uv_cond_destroy(&signal_cond); + uv_mutex_destroy(&signal_mutex); + uv_mutex_destroy(&wait_mutex); +} + + +static void fail_cb(/* empty */) { + ASSERT(0 && "fail_cb called"); +} + + +static void timer_cb(uv_timer_t* handle, int status) { + struct cancel_info* ci; + uv_req_t* req; + unsigned i; + + ci = container_of(handle, struct cancel_info, timer_handle); + + for (i = 0; i < ci->nreqs; i++) { + req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride); + ASSERT(0 == uv_cancel(req)); + } + + uv_close((uv_handle_t*) &ci->timer_handle, NULL); + unblock_threadpool(); + timer_cb_called++; +} + + +TEST_IMPL(threadpool_cancel_getaddrinfo) { + uv_getaddrinfo_t reqs[4]; + struct cancel_info ci; + struct addrinfo hints; + uv_loop_t* loop; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + ASSERT(0 == uv_getaddrinfo(loop, reqs + 0, fail_cb, "fail", NULL, NULL)); + ASSERT(0 == uv_getaddrinfo(loop, reqs + 1, fail_cb, NULL, "fail", NULL)); + ASSERT(0 == uv_getaddrinfo(loop, reqs + 2, fail_cb, "fail", "fail", NULL)); + ASSERT(0 == uv_getaddrinfo(loop, reqs + 3, fail_cb, "fail", NULL, &hints)); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(1 == timer_cb_called); + + cleanup_threadpool(); + + return 0; +} + + +TEST_IMPL(threadpool_cancel_work) { + struct cancel_info ci; + uv_work_t reqs[16]; + uv_loop_t* loop; + unsigned i; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + for (i = 0; i < ARRAY_SIZE(reqs); i++) + ASSERT(0 == uv_queue_work(loop, reqs + i, fail_cb, NULL)); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(1 == timer_cb_called); + + cleanup_threadpool(); + + return 0; +} + + +TEST_IMPL(threadpool_cancel_fs) { + struct cancel_info ci; + uv_fs_t reqs[25]; + uv_loop_t* loop; + unsigned n; + + INIT_CANCEL_INFO(&ci, reqs); + loop = uv_default_loop(); + saturate_threadpool(); + + /* Needs to match ARRAY_SIZE(fs_reqs). */ + n = 0; + ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fail_cb)); + ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fail_cb)); + ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fail_cb)); + ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fail_cb)); + ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fail_cb)); + ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fail_cb)); + ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fail_cb)); + ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fail_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb)); + ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fail_cb)); + ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fail_cb)); + ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fail_cb)); + ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fail_cb)); + ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb)); + ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fail_cb)); + ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fail_cb)); + ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fail_cb)); + ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fail_cb)); + ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fail_cb)); + ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fail_cb)); + ASSERT(n == ARRAY_SIZE(reqs)); + + ASSERT(0 == uv_timer_init(loop, &ci.timer_handle)); + ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0)); + ASSERT(0 == uv_run(loop)); + ASSERT(1 == timer_cb_called); + + cleanup_threadpool(); + + return 0; +} diff --git a/uv.gyp b/uv.gyp index 93683291..17f3a51b 100644 --- a/uv.gyp +++ b/uv.gyp @@ -300,6 +300,7 @@ 'test/test-tcp-writealot.c', 'test/test-tcp-unexpected-read.c', 'test/test-threadpool.c', + 'test/test-threadpool-cancel.c', 'test/test-mutexes.c', 'test/test-thread.c', 'test/test-barrier.c',