unix: add uv_cancel()

This commit is contained in:
Ben Noordhuis 2012-11-26 03:12:38 +01:00
parent a385ae4f59
commit 52c8a8617d
10 changed files with 331 additions and 9 deletions

View File

@ -60,7 +60,7 @@ struct uv__io_s {
struct uv__work {
void (*work)(struct uv__work *w);
void (*done)(struct uv__work *w);
void (*done)(struct uv__work *w, int status);
struct uv_loop_s* loop;
ngx_queue_t wq;
};

View File

@ -1386,6 +1386,19 @@ struct uv_work_s {
UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req,
uv_work_cb work_cb, uv_after_work_cb after_work_cb);
/* Cancel a pending request. Fails if the request is executing or has finished
* executing.
*
* Returns 0 on success, -1 on error. The loop error code is not touched.
*
* Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is
* currently supported.
*
* This function is currently only implemented on UNIX platforms. On Windows,
* it always returns -1.
*/
UV_EXTERN int uv_cancel(uv_req_t* req);
struct uv_cpu_info_s {
char* model;

View File

@ -90,7 +90,7 @@
} \
else { \
uv__fs_work(&(req)->work_req); \
uv__fs_done(&(req)->work_req); \
uv__fs_done(&(req)->work_req, 0); \
return (req)->result; \
} \
} \
@ -516,12 +516,17 @@ static void uv__fs_work(struct uv__work* w) {
}
static void uv__fs_done(struct uv__work* w) {
static void uv__fs_done(struct uv__work* w, int status) {
uv_fs_t* req;
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status != 0) {
uv_fs_req_cleanup(req);
return;
}
if (req->errorno != 0) {
req->errorno = uv_translate_sys_error(req->errorno);
uv__set_artificial_error(req->loop, req->errorno);

View File

@ -37,7 +37,7 @@ static void uv__getaddrinfo_work(struct uv__work* w) {
}
static void uv__getaddrinfo_done(struct uv__work* w) {
static void uv__getaddrinfo_done(struct uv__work* w, int status) {
uv_getaddrinfo_t* req = container_of(w, uv_getaddrinfo_t, work_req);
struct addrinfo *res = req->res;
#if __sun
@ -63,6 +63,13 @@ static void uv__getaddrinfo_done(struct uv__work* w) {
else
assert(0);
req->hints = NULL;
req->service = NULL;
req->hostname = NULL;
if (status != 0)
return;
if (req->retcode == 0) {
/* OK */
#if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */

View File

@ -174,7 +174,7 @@ void uv__signal_loop_cleanup();
void uv__work_submit(uv_loop_t* loop,
struct uv__work *w,
void (*work)(struct uv__work *w),
void (*done)(struct uv__work *w));
void (*done)(struct uv__work *w, int status));
void uv__work_done(uv_async_t* handle, int status);
/* platform specific */

View File

@ -30,6 +30,9 @@ static ngx_queue_t wq;
static volatile int initialized;
/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
static void worker(void* arg) {
struct uv__work* w;
ngx_queue_t* q;
@ -46,8 +49,11 @@ static void worker(void* arg) {
if (q == &exit_message)
uv_cond_signal(&cond);
else
else {
ngx_queue_remove(q);
ngx_queue_init(q); /* Signal uv_cancel() that the work req is
executing. */
}
uv_mutex_unlock(&mutex);
@ -58,6 +64,8 @@ static void worker(void* arg) {
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
ngx_queue_insert_tail(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
@ -116,7 +124,7 @@ static void cleanup(void) {
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
void (*done)(struct uv__work* w)) {
void (*done)(struct uv__work* w, int status)) {
uv_once(&once, init_once);
w->loop = loop;
w->work = work;
@ -125,6 +133,29 @@ void uv__work_submit(uv_loop_t* loop,
}
int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
int cancelled;
uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);
cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL;
if (cancelled)
ngx_queue_remove(&w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
if (!cancelled)
return -1;
ngx_queue_init(&w->wq);
w->done(w, -UV_ECANCELED);
return 0;
}
void uv__work_done(uv_async_t* handle, int status) {
struct uv__work* w;
uv_loop_t* loop;
@ -146,7 +177,7 @@ void uv__work_done(uv_async_t* handle, int status) {
ngx_queue_remove(q);
w = container_of(q, struct uv__work, wq);
w->done(w);
w->done(w, 0);
}
}
@ -158,11 +189,14 @@ static void uv__queue_work(struct uv__work* w) {
}
static void uv__queue_done(struct uv__work* w) {
static void uv__queue_done(struct uv__work* w, int status) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
uv__req_unregister(req->loop, req);
if (status != 0)
return;
if (req->after_work_cb)
req->after_work_cb(req);
}
@ -182,3 +216,28 @@ int uv_queue_work(uv_loop_t* loop,
uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
return 0;
}
int uv_cancel(uv_req_t* req) {
struct uv__work* wreq;
uv_loop_t* loop;
switch (req->type) {
case UV_FS:
loop = ((uv_fs_t*) req)->loop;
wreq = &((uv_fs_t*) req)->work_req;
break;
case UV_GETADDRINFO:
loop = ((uv_getaddrinfo_t*) req)->loop;
wreq = &((uv_getaddrinfo_t*) req)->work_req;
break;
case UV_WORK:
loop = ((uv_work_t*) req)->loop;
wreq = &((uv_work_t*) req)->work_req;
break;
default:
return -1;
}
return uv__work_cancel(loop, req, wreq);
}

View File

@ -70,6 +70,11 @@ int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb,
}
int uv_cancel(uv_req_t* req) {
return -1;
}
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)

View File

@ -187,6 +187,9 @@ TEST_DECLARE (fs_rename_to_existing_file)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
TEST_DECLARE (threadpool_cancel_getaddrinfo)
TEST_DECLARE (threadpool_cancel_work)
TEST_DECLARE (threadpool_cancel_fs)
TEST_DECLARE (thread_mutex)
TEST_DECLARE (thread_rwlock)
TEST_DECLARE (thread_create)
@ -454,6 +457,9 @@ TASK_LIST_START
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_multiple_event_loops)
TEST_ENTRY (threadpool_cancel_getaddrinfo)
TEST_ENTRY (threadpool_cancel_work)
TEST_ENTRY (threadpool_cancel_fs)
TEST_ENTRY (thread_mutex)
TEST_ENTRY (thread_rwlock)
TEST_ENTRY (thread_create)

View File

@ -0,0 +1,226 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#define INIT_CANCEL_INFO(ci, what) \
do { \
(ci)->reqs = (what); \
(ci)->nreqs = ARRAY_SIZE(what); \
(ci)->stride = sizeof((what)[0]); \
} \
while (0)
struct cancel_info {
void* reqs;
unsigned nreqs;
unsigned stride;
uv_timer_t timer_handle;
};
static uv_cond_t signal_cond;
static uv_mutex_t signal_mutex;
static uv_mutex_t wait_mutex;
static unsigned num_threads;
static unsigned done_cb_called;
static unsigned timer_cb_called;
static void work_cb(uv_work_t* req) {
uv_mutex_lock(&signal_mutex);
uv_cond_signal(&signal_cond);
uv_mutex_unlock(&signal_mutex);
uv_mutex_lock(&wait_mutex);
uv_mutex_unlock(&wait_mutex);
}
static void done_cb(uv_work_t* req) {
done_cb_called++;
free(req);
}
static void saturate_threadpool(void) {
uv_work_t* req;
ASSERT(0 == uv_cond_init(&signal_cond));
ASSERT(0 == uv_mutex_init(&signal_mutex));
ASSERT(0 == uv_mutex_init(&wait_mutex));
uv_mutex_lock(&signal_mutex);
uv_mutex_lock(&wait_mutex);
for (num_threads = 0; /* empty */; num_threads++) {
req = malloc(sizeof(*req));
ASSERT(req != NULL);
ASSERT(0 == uv_queue_work(uv_default_loop(), req, work_cb, done_cb));
/* Expect to get signalled within 350 ms, otherwise assume that
* the thread pool is saturated. As with any timing dependent test,
* this is obviously not ideal.
*/
if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) {
ASSERT(0 == uv_cancel((uv_req_t*) req));
free(req);
break;
}
}
}
static void unblock_threadpool(void) {
uv_mutex_unlock(&signal_mutex);
uv_mutex_unlock(&wait_mutex);
}
static void cleanup_threadpool(void) {
ASSERT(done_cb_called == num_threads);
uv_cond_destroy(&signal_cond);
uv_mutex_destroy(&signal_mutex);
uv_mutex_destroy(&wait_mutex);
}
static void fail_cb(/* empty */) {
ASSERT(0 && "fail_cb called");
}
static void timer_cb(uv_timer_t* handle, int status) {
struct cancel_info* ci;
uv_req_t* req;
unsigned i;
ci = container_of(handle, struct cancel_info, timer_handle);
for (i = 0; i < ci->nreqs; i++) {
req = (uv_req_t*) ((char*) ci->reqs + i * ci->stride);
ASSERT(0 == uv_cancel(req));
}
uv_close((uv_handle_t*) &ci->timer_handle, NULL);
unblock_threadpool();
timer_cb_called++;
}
TEST_IMPL(threadpool_cancel_getaddrinfo) {
uv_getaddrinfo_t reqs[4];
struct cancel_info ci;
struct addrinfo hints;
uv_loop_t* loop;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
ASSERT(0 == uv_getaddrinfo(loop, reqs + 0, fail_cb, "fail", NULL, NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 1, fail_cb, NULL, "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 2, fail_cb, "fail", "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 3, fail_cb, "fail", NULL, &hints));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
cleanup_threadpool();
return 0;
}
TEST_IMPL(threadpool_cancel_work) {
struct cancel_info ci;
uv_work_t reqs[16];
uv_loop_t* loop;
unsigned i;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
for (i = 0; i < ARRAY_SIZE(reqs); i++)
ASSERT(0 == uv_queue_work(loop, reqs + i, fail_cb, NULL));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
cleanup_threadpool();
return 0;
}
TEST_IMPL(threadpool_cancel_fs) {
struct cancel_info ci;
uv_fs_t reqs[25];
uv_loop_t* loop;
unsigned n;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
/* Needs to match ARRAY_SIZE(fs_reqs). */
n = 0;
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fail_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(n == ARRAY_SIZE(reqs));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
cleanup_threadpool();
return 0;
}

1
uv.gyp
View File

@ -300,6 +300,7 @@
'test/test-tcp-writealot.c',
'test/test-tcp-unexpected-read.c',
'test/test-threadpool.c',
'test/test-threadpool-cancel.c',
'test/test-mutexes.c',
'test/test-thread.c',
'test/test-barrier.c',