unix,win: limit concurrent DNS calls to nthreads/2
If `nthreads / 2` (rounded up) DNS calls are outstanding, queue more work of that kind instead of letting it take over more positions in the thread pool, blocking other work such as the (usually much faster) file system I/O or user-scheduled work. Fixes: https://github.com/nodejs/node/issues/8436 PR-URL: https://github.com/libuv/libuv/pull/1845 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: Santiago Gimeno <santiago.gimeno@gmail.com>
This commit is contained in:
parent
69c43d987b
commit
90891b4232
@ -33,12 +33,18 @@ static uv_once_t once = UV_ONCE_INIT;
|
||||
static uv_cond_t cond;
|
||||
static uv_mutex_t mutex;
|
||||
static unsigned int idle_threads;
|
||||
static unsigned int slow_io_work_running;
|
||||
static unsigned int nthreads;
|
||||
static uv_thread_t* threads;
|
||||
static uv_thread_t default_threads[4];
|
||||
static QUEUE exit_message;
|
||||
static QUEUE wq;
|
||||
static QUEUE run_slow_work_message;
|
||||
static QUEUE slow_io_pending_wq;
|
||||
|
||||
static unsigned int slow_work_thread_threshold(void) {
|
||||
return (nthreads + 1) / 2;
|
||||
}
|
||||
|
||||
static void uv__cancelled(struct uv__work* w) {
|
||||
abort();
|
||||
@ -51,6 +57,7 @@ static void uv__cancelled(struct uv__work* w) {
|
||||
static void worker(void* arg) {
|
||||
struct uv__work* w;
|
||||
QUEUE* q;
|
||||
int is_slow_work;
|
||||
|
||||
uv_sem_post((uv_sem_t*) arg);
|
||||
arg = NULL;
|
||||
@ -58,31 +65,65 @@ static void worker(void* arg) {
|
||||
for (;;) {
|
||||
uv_mutex_lock(&mutex);
|
||||
|
||||
while (QUEUE_EMPTY(&wq)) {
|
||||
wait_for_work:
|
||||
/* Keep waiting while either no work is present or only slow I/O
|
||||
and we're at the threshold for that. */
|
||||
while (QUEUE_EMPTY(&wq) ||
|
||||
(QUEUE_HEAD(&wq) == &run_slow_work_message &&
|
||||
QUEUE_NEXT(&run_slow_work_message) == &wq &&
|
||||
slow_io_work_running >= slow_work_thread_threshold())) {
|
||||
idle_threads += 1;
|
||||
uv_cond_wait(&cond, &mutex);
|
||||
idle_threads -= 1;
|
||||
}
|
||||
|
||||
q = QUEUE_HEAD(&wq);
|
||||
|
||||
if (q == &exit_message)
|
||||
if (q == &exit_message) {
|
||||
uv_cond_signal(&cond);
|
||||
else {
|
||||
uv_mutex_unlock(&mutex);
|
||||
break;
|
||||
}
|
||||
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */
|
||||
|
||||
is_slow_work = 0;
|
||||
if (q == &run_slow_work_message) {
|
||||
/* If we're at the slow I/O threshold, re-schedule until after all
|
||||
other work in the queue is done. */
|
||||
if (slow_io_work_running >= slow_work_thread_threshold()) {
|
||||
QUEUE_INSERT_TAIL(&wq, q);
|
||||
goto wait_for_work;
|
||||
}
|
||||
|
||||
/* If we encountered a request to run slow I/O work but there is none
|
||||
to run, that means it's cancelled => Start over. */
|
||||
if (QUEUE_EMPTY(&slow_io_pending_wq))
|
||||
goto wait_for_work;
|
||||
|
||||
is_slow_work = 1;
|
||||
slow_io_work_running++;
|
||||
|
||||
q = QUEUE_HEAD(&slow_io_pending_wq);
|
||||
QUEUE_REMOVE(q);
|
||||
QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
|
||||
executing. */
|
||||
QUEUE_INIT(q);
|
||||
|
||||
/* If there is more slow I/O work, schedule it to be run as well. */
|
||||
if (!QUEUE_EMPTY(&slow_io_pending_wq)) {
|
||||
QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);
|
||||
if (idle_threads > 0)
|
||||
uv_cond_signal(&cond);
|
||||
}
|
||||
}
|
||||
|
||||
uv_mutex_unlock(&mutex);
|
||||
|
||||
if (q == &exit_message)
|
||||
break;
|
||||
|
||||
w = QUEUE_DATA(q, struct uv__work, wq);
|
||||
w->work(w);
|
||||
|
||||
uv_mutex_lock(&w->loop->wq_mutex);
|
||||
if (is_slow_work)
|
||||
slow_io_work_running--;
|
||||
w->work = NULL; /* Signal uv_cancel() that the work req is done
|
||||
executing. */
|
||||
QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
|
||||
@ -92,8 +133,20 @@ static void worker(void* arg) {
|
||||
}
|
||||
|
||||
|
||||
static void post(QUEUE* q) {
|
||||
static void post(QUEUE* q, enum uv__work_kind kind) {
|
||||
uv_mutex_lock(&mutex);
|
||||
if (kind == UV__WORK_SLOW_IO) {
|
||||
/* Insert into a separate queue. */
|
||||
QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);
|
||||
if (!QUEUE_EMPTY(&run_slow_work_message)) {
|
||||
/* Running slow I/O tasks is already scheduled => Nothing to do here.
|
||||
The worker that runs said other task will schedule this one as well. */
|
||||
uv_mutex_unlock(&mutex);
|
||||
return;
|
||||
}
|
||||
q = &run_slow_work_message;
|
||||
}
|
||||
|
||||
QUEUE_INSERT_TAIL(&wq, q);
|
||||
if (idle_threads > 0)
|
||||
uv_cond_signal(&cond);
|
||||
@ -108,7 +161,7 @@ UV_DESTRUCTOR(static void cleanup(void)) {
|
||||
if (nthreads == 0)
|
||||
return;
|
||||
|
||||
post(&exit_message);
|
||||
post(&exit_message, UV__WORK_CPU);
|
||||
|
||||
for (i = 0; i < nthreads; i++)
|
||||
if (uv_thread_join(threads + i))
|
||||
@ -156,6 +209,8 @@ static void init_threads(void) {
|
||||
abort();
|
||||
|
||||
QUEUE_INIT(&wq);
|
||||
QUEUE_INIT(&slow_io_pending_wq);
|
||||
QUEUE_INIT(&run_slow_work_message);
|
||||
|
||||
if (uv_sem_init(&sem, 0))
|
||||
abort();
|
||||
@ -194,13 +249,14 @@ static void init_once(void) {
|
||||
|
||||
void uv__work_submit(uv_loop_t* loop,
|
||||
struct uv__work* w,
|
||||
enum uv__work_kind kind,
|
||||
void (*work)(struct uv__work* w),
|
||||
void (*done)(struct uv__work* w, int status)) {
|
||||
uv_once(&once, init_once);
|
||||
w->loop = loop;
|
||||
w->work = work;
|
||||
w->done = done;
|
||||
post(&w->wq);
|
||||
post(&w->wq, kind);
|
||||
}
|
||||
|
||||
|
||||
@ -284,7 +340,11 @@ int uv_queue_work(uv_loop_t* loop,
|
||||
req->loop = loop;
|
||||
req->work_cb = work_cb;
|
||||
req->after_work_cb = after_work_cb;
|
||||
uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);
|
||||
uv__work_submit(loop,
|
||||
&req->work_req,
|
||||
UV__WORK_CPU,
|
||||
uv__queue_work,
|
||||
uv__queue_done);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
@ -120,7 +120,11 @@
|
||||
do { \
|
||||
if (cb != NULL) { \
|
||||
uv__req_register(loop, req); \
|
||||
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
|
||||
uv__work_submit(loop, \
|
||||
&req->work_req, \
|
||||
UV__WORK_FAST_IO, \
|
||||
uv__fs_work, \
|
||||
uv__fs_done); \
|
||||
return 0; \
|
||||
} \
|
||||
else { \
|
||||
|
||||
@ -186,6 +186,7 @@ int uv_getaddrinfo(uv_loop_t* loop,
|
||||
if (cb) {
|
||||
uv__work_submit(loop,
|
||||
&req->work_req,
|
||||
UV__WORK_SLOW_IO,
|
||||
uv__getaddrinfo_work,
|
||||
uv__getaddrinfo_done);
|
||||
return 0;
|
||||
|
||||
@ -109,6 +109,7 @@ int uv_getnameinfo(uv_loop_t* loop,
|
||||
if (getnameinfo_cb) {
|
||||
uv__work_submit(loop,
|
||||
&req->work_req,
|
||||
UV__WORK_SLOW_IO,
|
||||
uv__getnameinfo_work,
|
||||
uv__getnameinfo_done);
|
||||
return 0;
|
||||
|
||||
@ -164,8 +164,15 @@ void uv__fs_poll_close(uv_fs_poll_t* handle);
|
||||
|
||||
int uv__getaddrinfo_translate_error(int sys_err); /* EAI_* error. */
|
||||
|
||||
enum uv__work_kind {
|
||||
UV__WORK_CPU,
|
||||
UV__WORK_FAST_IO,
|
||||
UV__WORK_SLOW_IO
|
||||
};
|
||||
|
||||
void uv__work_submit(uv_loop_t* loop,
|
||||
struct uv__work *w,
|
||||
enum uv__work_kind kind,
|
||||
void (*work)(struct uv__work *w),
|
||||
void (*done)(struct uv__work *w, int status));
|
||||
|
||||
|
||||
@ -55,7 +55,11 @@
|
||||
do { \
|
||||
if (cb != NULL) { \
|
||||
uv__req_register(loop, req); \
|
||||
uv__work_submit(loop, &req->work_req, uv__fs_work, uv__fs_done); \
|
||||
uv__work_submit(loop, \
|
||||
&req->work_req, \
|
||||
UV__WORK_FAST_IO, \
|
||||
uv__fs_work, \
|
||||
uv__fs_done); \
|
||||
return 0; \
|
||||
} else { \
|
||||
uv__fs_work(&req->work_req); \
|
||||
|
||||
@ -368,6 +368,7 @@ int uv_getaddrinfo(uv_loop_t* loop,
|
||||
if (getaddrinfo_cb) {
|
||||
uv__work_submit(loop,
|
||||
&req->work_req,
|
||||
UV__WORK_SLOW_IO,
|
||||
uv__getaddrinfo_work,
|
||||
uv__getaddrinfo_done);
|
||||
return 0;
|
||||
|
||||
@ -145,6 +145,7 @@ int uv_getnameinfo(uv_loop_t* loop,
|
||||
if (getnameinfo_cb) {
|
||||
uv__work_submit(loop,
|
||||
&req->work_req,
|
||||
UV__WORK_SLOW_IO,
|
||||
uv__getnameinfo_work,
|
||||
uv__getnameinfo_done);
|
||||
return 0;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user