unix: rework uv_cancel() api

Bert Belder informs me the current approach where a request is immediately
cancelled, is impossible to implement on Windows.

Rework the API to always invoke the "done" callback with an UV_ECANCELED error
code.
This commit is contained in:
Ben Noordhuis 2012-12-12 15:12:20 +01:00
parent 731adacad2
commit 92fb84b751
7 changed files with 122 additions and 53 deletions

View File

@ -330,7 +330,7 @@ typedef void (*uv_exit_cb)(uv_process_t*, int exit_status, int term_signal);
typedef void (*uv_walk_cb)(uv_handle_t* handle, void* arg);
typedef void (*uv_fs_cb)(uv_fs_t* req);
typedef void (*uv_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req);
typedef void (*uv_after_work_cb)(uv_work_t* req, int status);
typedef void (*uv_getaddrinfo_cb)(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res);
@ -1394,6 +1394,17 @@ UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req,
* Only cancellation of uv_fs_t, uv_getaddrinfo_t and uv_work_t requests is
* currently supported.
*
* Cancelled requests have their callbacks invoked some time in the future.
* It's _not_ safe to free the memory associated with the request until your
* callback is called.
*
* Here is how cancellation is reported to your callback:
*
* - A uv_fs_t request has its req->errorno field set to UV_ECANCELED.
*
* - A uv_work_t or uv_getaddrinfo_t request has its callback invoked with
* status == -1 and uv_last_error(loop).code == UV_ECANCELED.
*
* This function is currently only implemented on UNIX platforms. On Windows,
* it always returns -1.
*/

View File

@ -522,16 +522,17 @@ static void uv__fs_done(struct uv__work* w, int status) {
req = container_of(w, uv_fs_t, work_req);
uv__req_unregister(req->loop, req);
if (status != 0) {
uv_fs_req_cleanup(req);
return;
}
if (req->errorno != 0) {
req->errorno = uv_translate_sys_error(req->errorno);
uv__set_artificial_error(req->loop, req->errorno);
}
if (status == -UV_ECANCELED) {
assert(req->errorno == 0);
req->errorno = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}
if (req->cb != NULL)
req->cb(req);
}

View File

@ -67,9 +67,6 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) {
req->service = NULL;
req->hostname = NULL;
if (status != 0)
return;
if (req->retcode == 0) {
/* OK */
#if EAI_NODATA /* FreeBSD deprecated EAI_NODATA */
@ -87,6 +84,12 @@ static void uv__getaddrinfo_done(struct uv__work* w, int status) {
req->loop->last_err.sys_errno_ = req->retcode;
}
if (status == -UV_ECANCELED) {
assert(req->retcode == 0);
req->retcode = UV_ECANCELED;
uv__set_artificial_error(req->loop, UV_ECANCELED);
}
req->cb(req, req->retcode, res);
}

View File

@ -20,6 +20,7 @@
*/
#include "internal.h"
#include <stdlib.h>
static uv_once_t once = UV_ONCE_INIT;
static uv_cond_t cond;
@ -30,6 +31,11 @@ static ngx_queue_t wq;
static volatile int initialized;
static void uv__cancelled(struct uv__work* w) {
abort();
}
/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
@ -149,8 +155,10 @@ int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {
if (!cancelled)
return -1;
ngx_queue_init(&w->wq);
w->done(w, -UV_ECANCELED);
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
ngx_queue_insert_tail(&loop->wq, &w->wq);
uv_mutex_unlock(&loop->wq_mutex);
return 0;
}
@ -161,6 +169,7 @@ void uv__work_done(uv_async_t* handle, int status) {
uv_loop_t* loop;
ngx_queue_t* q;
ngx_queue_t wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
ngx_queue_init(&wq);
@ -177,7 +186,8 @@ void uv__work_done(uv_async_t* handle, int status) {
ngx_queue_remove(q);
w = container_of(q, struct uv__work, wq);
w->done(w, 0);
err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0;
w->done(w, err);
}
}
@ -190,15 +200,18 @@ static void uv__queue_work(struct uv__work* w) {
static void uv__queue_done(struct uv__work* w, int status) {
uv_work_t* req = container_of(w, uv_work_t, work_req);
uv_work_t* req;
req = container_of(w, uv_work_t, work_req);
uv__req_unregister(req->loop, req);
if (status != 0)
if (req->after_work_cb == NULL)
return;
if (req->after_work_cb)
req->after_work_cb(req);
if (status == -UV_ECANCELED)
uv__set_artificial_error(req->loop, UV_ECANCELED);
req->after_work_cb(req, status ? -1 : 0);
}

View File

@ -78,5 +78,5 @@ int uv_cancel(uv_req_t* req) {
void uv_process_work_req(uv_loop_t* loop, uv_work_t* req) {
uv__req_unregister(loop, req);
if(req->after_work_cb)
req->after_work_cb(req);
req->after_work_cb(req, 0);
}

View File

@ -41,8 +41,12 @@ static uv_cond_t signal_cond;
static uv_mutex_t signal_mutex;
static uv_mutex_t wait_mutex;
static unsigned num_threads;
static unsigned fs_cb_called;
static unsigned work_cb_called;
static unsigned done_cb_called;
static unsigned done2_cb_called;
static unsigned timer_cb_called;
static unsigned getaddrinfo_cb_called;
static void work_cb(uv_work_t* req) {
@ -52,10 +56,12 @@ static void work_cb(uv_work_t* req) {
uv_mutex_lock(&wait_mutex);
uv_mutex_unlock(&wait_mutex);
work_cb_called++;
}
static void done_cb(uv_work_t* req) {
static void done_cb(uv_work_t* req, int status) {
done_cb_called++;
free(req);
}
@ -82,7 +88,6 @@ static void saturate_threadpool(void) {
*/
if (uv_cond_timedwait(&signal_cond, &signal_mutex, 350 * 1e6)) {
ASSERT(0 == uv_cancel((uv_req_t*) req));
free(req);
break;
}
}
@ -96,15 +101,40 @@ static void unblock_threadpool(void) {
static void cleanup_threadpool(void) {
ASSERT(done_cb_called == num_threads);
ASSERT(done_cb_called == num_threads + 1); /* +1 == cancelled work req. */
ASSERT(work_cb_called == num_threads);
uv_cond_destroy(&signal_cond);
uv_mutex_destroy(&signal_mutex);
uv_mutex_destroy(&wait_mutex);
}
static void fail_cb(/* empty */) {
ASSERT(0 && "fail_cb called");
static void fs_cb(uv_fs_t* req) {
ASSERT(req->errorno == UV_ECANCELED);
uv_fs_req_cleanup(req);
fs_cb_called++;
}
static void getaddrinfo_cb(uv_getaddrinfo_t* req,
int status,
struct addrinfo* res) {
ASSERT(UV_ECANCELED == uv_last_error(req->loop).code);
ASSERT(UV_ECANCELED == status);
getaddrinfo_cb_called++;
}
static void work2_cb(uv_work_t* req) {
ASSERT(0 && "work2_cb called");
}
static void done2_cb(uv_work_t* req, int status) {
ASSERT(uv_last_error(req->loop).code == UV_ECANCELED);
ASSERT(status == -1);
done2_cb_called++;
}
@ -131,15 +161,23 @@ TEST_IMPL(threadpool_cancel_getaddrinfo) {
struct cancel_info ci;
struct addrinfo hints;
uv_loop_t* loop;
int r;
INIT_CANCEL_INFO(&ci, reqs);
loop = uv_default_loop();
saturate_threadpool();
ASSERT(0 == uv_getaddrinfo(loop, reqs + 0, fail_cb, "fail", NULL, NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 1, fail_cb, NULL, "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 2, fail_cb, "fail", "fail", NULL));
ASSERT(0 == uv_getaddrinfo(loop, reqs + 3, fail_cb, "fail", NULL, &hints));
r = uv_getaddrinfo(loop, reqs + 0, getaddrinfo_cb, "fail", NULL, NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 1, getaddrinfo_cb, NULL, "fail", NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 2, getaddrinfo_cb, "fail", "fail", NULL);
ASSERT(r == 0);
r = uv_getaddrinfo(loop, reqs + 3, getaddrinfo_cb, "fail", NULL, &hints);
ASSERT(r == 0);
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
@ -163,12 +201,13 @@ TEST_IMPL(threadpool_cancel_work) {
saturate_threadpool();
for (i = 0; i < ARRAY_SIZE(reqs); i++)
ASSERT(0 == uv_queue_work(loop, reqs + i, fail_cb, NULL));
ASSERT(0 == uv_queue_work(loop, reqs + i, work2_cb, done2_cb));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(1 == timer_cb_called);
ASSERT(ARRAY_SIZE(reqs) == done2_cb_called);
cleanup_threadpool();
@ -188,36 +227,37 @@ TEST_IMPL(threadpool_cancel_fs) {
/* Needs to match ARRAY_SIZE(fs_reqs). */
n = 0;
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fail_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fail_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fail_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fail_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fail_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fail_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fail_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fail_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fail_cb));
ASSERT(0 == uv_fs_chmod(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_chown(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_close(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fchmod(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fchown(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_fdatasync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fstat(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_fsync(loop, reqs + n++, 0, fs_cb));
ASSERT(0 == uv_fs_ftruncate(loop, reqs + n++, 0, 0, fs_cb));
ASSERT(0 == uv_fs_futime(loop, reqs + n++, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_link(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_lstat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_open(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_read(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(0 == uv_fs_readdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_readlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_rename(loop, reqs + n++, "/", "/", fs_cb));
ASSERT(0 == uv_fs_mkdir(loop, reqs + n++, "/", 0, fs_cb));
ASSERT(0 == uv_fs_sendfile(loop, reqs + n++, 0, 0, 0, 0, fs_cb));
ASSERT(0 == uv_fs_stat(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_symlink(loop, reqs + n++, "/", "/", 0, fs_cb));
ASSERT(0 == uv_fs_unlink(loop, reqs + n++, "/", fs_cb));
ASSERT(0 == uv_fs_utime(loop, reqs + n++, "/", 0, 0, fs_cb));
ASSERT(0 == uv_fs_write(loop, reqs + n++, 0, NULL, 0, 0, fs_cb));
ASSERT(n == ARRAY_SIZE(reqs));
ASSERT(0 == uv_timer_init(loop, &ci.timer_handle));
ASSERT(0 == uv_timer_start(&ci.timer_handle, timer_cb, 10, 0));
ASSERT(0 == uv_run(loop));
ASSERT(n == fs_cb_called);
ASSERT(1 == timer_cb_called);
cleanup_threadpool();

View File

@ -35,7 +35,8 @@ static void work_cb(uv_work_t* req) {
}
static void after_work_cb(uv_work_t* req) {
static void after_work_cb(uv_work_t* req, int status) {
ASSERT(status == 0);
ASSERT(req == &work_req);
ASSERT(req->data == &data);
after_work_cb_count++;