diff --git a/include/uv.h b/include/uv.h index f3d70231..a68a42f1 100644 --- a/include/uv.h +++ b/include/uv.h @@ -59,6 +59,12 @@ extern "C" { #include #include +/* Internal type, do not use. */ +struct uv__queue { + struct uv__queue* next; + struct uv__queue* prev; +}; + #if defined(_WIN32) # include "uv/win.h" #else @@ -459,7 +465,7 @@ struct uv_shutdown_s { uv_handle_type type; \ /* private */ \ uv_close_cb close_cb; \ - void* handle_queue[2]; \ + struct uv__queue handle_queue; \ union { \ int fd; \ void* reserved[4]; \ @@ -1849,7 +1855,7 @@ struct uv_loop_s { void* data; /* Loop reference counting. */ unsigned int active_handles; - void* handle_queue[2]; + struct uv__queue handle_queue; union { void* unused; unsigned int count; diff --git a/include/uv/darwin.h b/include/uv/darwin.h index d2264158..06962bfd 100644 --- a/include/uv/darwin.h +++ b/include/uv/darwin.h @@ -40,7 +40,7 @@ void* cf_state; \ uv_mutex_t cf_mutex; \ uv_sem_t cf_sem; \ - void* cf_signals[2]; \ + struct uv__queue cf_signals; \ #define UV_PLATFORM_FS_EVENT_FIELDS \ uv__io_t event_watcher; \ @@ -48,8 +48,8 @@ int realpath_len; \ int cf_flags; \ uv_async_t* cf_cb; \ - void* cf_events[2]; \ - void* cf_member[2]; \ + struct uv__queue cf_events; \ + struct uv__queue cf_member; \ int cf_error; \ uv_mutex_t cf_mutex; \ diff --git a/include/uv/linux.h b/include/uv/linux.h index 9b38405a..9f22f8cf 100644 --- a/include/uv/linux.h +++ b/include/uv/linux.h @@ -28,7 +28,7 @@ int inotify_fd; \ #define UV_PLATFORM_FS_EVENT_FIELDS \ - void* watchers[2]; \ + struct uv__queue watchers; \ int wd; \ #endif /* UV_LINUX_H */ diff --git a/include/uv/threadpool.h b/include/uv/threadpool.h index 9708ebdd..24ce916f 100644 --- a/include/uv/threadpool.h +++ b/include/uv/threadpool.h @@ -31,7 +31,7 @@ struct uv__work { void (*work)(struct uv__work *w); void (*done)(struct uv__work *w, int status); struct uv_loop_s* loop; - void* wq[2]; + struct uv__queue wq; }; #endif /* UV_THREADPOOL_H_ */ diff --git a/include/uv/unix.h b/include/uv/unix.h index 0b1fdac2..d1a5f043 100644 --- a/include/uv/unix.h +++ b/include/uv/unix.h @@ -92,8 +92,8 @@ typedef struct uv__io_s uv__io_t; struct uv__io_s { uv__io_cb cb; - void* pending_queue[2]; - void* watcher_queue[2]; + struct uv__queue pending_queue; + struct uv__queue watcher_queue; unsigned int pevents; /* Pending event mask i.e. mask at next tick. */ unsigned int events; /* Current event mask. */ int fd; @@ -220,21 +220,21 @@ typedef struct { #define UV_LOOP_PRIVATE_FIELDS \ unsigned long flags; \ int backend_fd; \ - void* pending_queue[2]; \ - void* watcher_queue[2]; \ + struct uv__queue pending_queue; \ + struct uv__queue watcher_queue; \ uv__io_t** watchers; \ unsigned int nwatchers; \ unsigned int nfds; \ - void* wq[2]; \ + struct uv__queue wq; \ uv_mutex_t wq_mutex; \ uv_async_t wq_async; \ uv_rwlock_t cloexec_lock; \ uv_handle_t* closing_handles; \ - void* process_handles[2]; \ - void* prepare_handles[2]; \ - void* check_handles[2]; \ - void* idle_handles[2]; \ - void* async_handles[2]; \ + struct uv__queue process_handles; \ + struct uv__queue prepare_handles; \ + struct uv__queue check_handles; \ + struct uv__queue idle_handles; \ + struct uv__queue async_handles; \ void (*async_unused)(void); /* TODO(bnoordhuis) Remove in libuv v2. */ \ uv__io_t async_io_watcher; \ int async_wfd; \ @@ -257,7 +257,7 @@ typedef struct { #define UV_PRIVATE_REQ_TYPES /* empty */ #define UV_WRITE_PRIVATE_FIELDS \ - void* queue[2]; \ + struct uv__queue queue; \ unsigned int write_index; \ uv_buf_t* bufs; \ unsigned int nbufs; \ @@ -265,12 +265,12 @@ typedef struct { uv_buf_t bufsml[4]; \ #define UV_CONNECT_PRIVATE_FIELDS \ - void* queue[2]; \ + struct uv__queue queue; \ #define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */ #define UV_UDP_SEND_PRIVATE_FIELDS \ - void* queue[2]; \ + struct uv__queue queue; \ struct sockaddr_storage addr; \ unsigned int nbufs; \ uv_buf_t* bufs; \ @@ -286,8 +286,8 @@ typedef struct { uv_connect_t *connect_req; \ uv_shutdown_t *shutdown_req; \ uv__io_t io_watcher; \ - void* write_queue[2]; \ - void* write_completed_queue[2]; \ + struct uv__queue write_queue; \ + struct uv__queue write_completed_queue; \ uv_connection_cb connection_cb; \ int delayed_error; \ int accepted_fd; \ @@ -300,8 +300,8 @@ typedef struct { uv_alloc_cb alloc_cb; \ uv_udp_recv_cb recv_cb; \ uv__io_t io_watcher; \ - void* write_queue[2]; \ - void* write_completed_queue[2]; \ + struct uv__queue write_queue; \ + struct uv__queue write_completed_queue; \ #define UV_PIPE_PRIVATE_FIELDS \ const char* pipe_fname; /* strdup'ed */ @@ -311,19 +311,19 @@ typedef struct { #define UV_PREPARE_PRIVATE_FIELDS \ uv_prepare_cb prepare_cb; \ - void* queue[2]; \ + struct uv__queue queue; \ #define UV_CHECK_PRIVATE_FIELDS \ uv_check_cb check_cb; \ - void* queue[2]; \ + struct uv__queue queue; \ #define UV_IDLE_PRIVATE_FIELDS \ uv_idle_cb idle_cb; \ - void* queue[2]; \ + struct uv__queue queue; \ #define UV_ASYNC_PRIVATE_FIELDS \ uv_async_cb async_cb; \ - void* queue[2]; \ + struct uv__queue queue; \ int pending; \ #define UV_TIMER_PRIVATE_FIELDS \ @@ -352,7 +352,7 @@ typedef struct { int retcode; #define UV_PROCESS_PRIVATE_FIELDS \ - void* queue[2]; \ + struct uv__queue queue; \ int status; \ #define UV_FS_PRIVATE_FIELDS \ diff --git a/include/uv/win.h b/include/uv/win.h index 92a95fa1..6f8c4729 100644 --- a/include/uv/win.h +++ b/include/uv/win.h @@ -357,7 +357,7 @@ typedef struct { /* Counter to started timer */ \ uint64_t timer_counter; \ /* Threadpool */ \ - void* wq[2]; \ + struct uv__queue wq; \ uv_mutex_t wq_mutex; \ uv_async_t wq_async; @@ -486,7 +486,7 @@ typedef struct { uint32_t payload_remaining; \ uint64_t dummy; /* TODO: retained for ABI compat; remove this in v2.x. */ \ } ipc_data_frame; \ - void* ipc_xfer_queue[2]; \ + struct uv__queue ipc_xfer_queue; \ int ipc_xfer_queue_length; \ uv_write_t* non_overlapped_writes_tail; \ CRITICAL_SECTION readfile_thread_lock; \ diff --git a/src/queue.h b/src/queue.h index ff3540a0..5f8489e9 100644 --- a/src/queue.h +++ b/src/queue.h @@ -18,91 +18,73 @@ #include -typedef void *QUEUE[2]; +#define uv__queue_data(pointer, type, field) \ + ((type*) ((char*) (pointer) - offsetof(type, field))) -/* Private macros. */ -#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) -#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) +#define uv__queue_foreach(q, h) \ + for ((q) = (h)->next; (q) != (h); (q) = (q)->next) -/* Public macros. */ -#define QUEUE_DATA(ptr, type, field) \ - ((type *) ((char *) (ptr) - offsetof(type, field))) +static inline void uv__queue_init(struct uv__queue* q) { + q->next = q; + q->prev = q; +} -/* Important note: mutating the list while QUEUE_FOREACH is - * iterating over its elements results in undefined behavior. - */ -#define QUEUE_FOREACH(q, h) \ - for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) +static inline int uv__queue_empty(const struct uv__queue* q) { + return q == q->next; +} -#define QUEUE_EMPTY(q) \ - ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) +static inline struct uv__queue* uv__queue_head(const struct uv__queue* q) { + return q->next; +} -#define QUEUE_HEAD(q) \ - (QUEUE_NEXT(q)) +static inline struct uv__queue* uv__queue_next(const struct uv__queue* q) { + return q->next; +} -#define QUEUE_INIT(q) \ - do { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } \ - while (0) +static inline void uv__queue_add(struct uv__queue* h, struct uv__queue* n) { + h->prev->next = n->next; + n->next->prev = h->prev; + h->prev = n->prev; + h->prev->next = h; +} -#define QUEUE_ADD(h, n) \ - do { \ - QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ - QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV(h) = QUEUE_PREV(n); \ - QUEUE_PREV_NEXT(h) = (h); \ - } \ - while (0) +static inline void uv__queue_split(struct uv__queue* h, + struct uv__queue* q, + struct uv__queue* n) { + n->prev = h->prev; + n->prev->next = n; + n->next = q; + h->prev = q->prev; + h->prev->next = h; + q->prev = n; +} -#define QUEUE_SPLIT(h, q, n) \ - do { \ - QUEUE_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(n) = (n); \ - QUEUE_NEXT(n) = (q); \ - QUEUE_PREV(h) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(h) = (h); \ - QUEUE_PREV(q) = (n); \ - } \ - while (0) +static inline void uv__queue_move(struct uv__queue* h, struct uv__queue* n) { + if (uv__queue_empty(h)) + uv__queue_init(n); + else + uv__queue_split(h, h->next, n); +} -#define QUEUE_MOVE(h, n) \ - do { \ - if (QUEUE_EMPTY(h)) \ - QUEUE_INIT(n); \ - else { \ - QUEUE* q = QUEUE_HEAD(h); \ - QUEUE_SPLIT(h, q, n); \ - } \ - } \ - while (0) +static inline void uv__queue_insert_head(struct uv__queue* h, + struct uv__queue* q) { + q->next = h->next; + q->prev = h; + q->next->prev = q; + h->next = q; +} -#define QUEUE_INSERT_HEAD(h, q) \ - do { \ - QUEUE_NEXT(q) = QUEUE_NEXT(h); \ - QUEUE_PREV(q) = (h); \ - QUEUE_NEXT_PREV(q) = (q); \ - QUEUE_NEXT(h) = (q); \ - } \ - while (0) +static inline void uv__queue_insert_tail(struct uv__queue* h, + struct uv__queue* q) { + q->next = h; + q->prev = h->prev; + q->prev->next = q; + h->prev = q; +} -#define QUEUE_INSERT_TAIL(h, q) \ - do { \ - QUEUE_NEXT(q) = (h); \ - QUEUE_PREV(q) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(q) = (q); \ - QUEUE_PREV(h) = (q); \ - } \ - while (0) - -#define QUEUE_REMOVE(q) \ - do { \ - QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ - QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ - } \ - while (0) +static inline void uv__queue_remove(struct uv__queue* q) { + q->prev->next = q->next; + q->next->prev = q->prev; +} #endif /* QUEUE_H_ */ diff --git a/src/threadpool.c b/src/threadpool.c index 51962bf0..dbef67f2 100644 --- a/src/threadpool.c +++ b/src/threadpool.c @@ -37,10 +37,10 @@ static unsigned int slow_io_work_running; static unsigned int nthreads; static uv_thread_t* threads; static uv_thread_t default_threads[4]; -static QUEUE exit_message; -static QUEUE wq; -static QUEUE run_slow_work_message; -static QUEUE slow_io_pending_wq; +static struct uv__queue exit_message; +static struct uv__queue wq; +static struct uv__queue run_slow_work_message; +static struct uv__queue slow_io_pending_wq; static unsigned int slow_work_thread_threshold(void) { return (nthreads + 1) / 2; @@ -56,7 +56,7 @@ static void uv__cancelled(struct uv__work* w) { */ static void worker(void* arg) { struct uv__work* w; - QUEUE* q; + struct uv__queue* q; int is_slow_work; uv_sem_post((uv_sem_t*) arg); @@ -68,49 +68,49 @@ static void worker(void* arg) { /* Keep waiting while either no work is present or only slow I/O and we're at the threshold for that. */ - while (QUEUE_EMPTY(&wq) || - (QUEUE_HEAD(&wq) == &run_slow_work_message && - QUEUE_NEXT(&run_slow_work_message) == &wq && + while (uv__queue_empty(&wq) || + (uv__queue_head(&wq) == &run_slow_work_message && + uv__queue_next(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } - q = QUEUE_HEAD(&wq); + q = uv__queue_head(&wq); if (q == &exit_message) { uv_cond_signal(&cond); uv_mutex_unlock(&mutex); break; } - QUEUE_REMOVE(q); - QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ + uv__queue_remove(q); + uv__queue_init(q); /* Signal uv_cancel() that the work req is executing. */ is_slow_work = 0; if (q == &run_slow_work_message) { /* If we're at the slow I/O threshold, re-schedule until after all other work in the queue is done. */ if (slow_io_work_running >= slow_work_thread_threshold()) { - QUEUE_INSERT_TAIL(&wq, q); + uv__queue_insert_tail(&wq, q); continue; } /* If we encountered a request to run slow I/O work but there is none to run, that means it's cancelled => Start over. */ - if (QUEUE_EMPTY(&slow_io_pending_wq)) + if (uv__queue_empty(&slow_io_pending_wq)) continue; is_slow_work = 1; slow_io_work_running++; - q = QUEUE_HEAD(&slow_io_pending_wq); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + q = uv__queue_head(&slow_io_pending_wq); + uv__queue_remove(q); + uv__queue_init(q); /* If there is more slow I/O work, schedule it to be run as well. */ - if (!QUEUE_EMPTY(&slow_io_pending_wq)) { - QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); + if (!uv__queue_empty(&slow_io_pending_wq)) { + uv__queue_insert_tail(&wq, &run_slow_work_message); if (idle_threads > 0) uv_cond_signal(&cond); } @@ -118,13 +118,13 @@ static void worker(void* arg) { uv_mutex_unlock(&mutex); - w = QUEUE_DATA(q, struct uv__work, wq); + w = uv__queue_data(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ - QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); + uv__queue_insert_tail(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); @@ -139,12 +139,12 @@ static void worker(void* arg) { } -static void post(QUEUE* q, enum uv__work_kind kind) { +static void post(struct uv__queue* q, enum uv__work_kind kind) { uv_mutex_lock(&mutex); if (kind == UV__WORK_SLOW_IO) { /* Insert into a separate queue. */ - QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); - if (!QUEUE_EMPTY(&run_slow_work_message)) { + uv__queue_insert_tail(&slow_io_pending_wq, q); + if (!uv__queue_empty(&run_slow_work_message)) { /* Running slow I/O tasks is already scheduled => Nothing to do here. The worker that runs said other task will schedule this one as well. */ uv_mutex_unlock(&mutex); @@ -153,7 +153,7 @@ static void post(QUEUE* q, enum uv__work_kind kind) { q = &run_slow_work_message; } - QUEUE_INSERT_TAIL(&wq, q); + uv__queue_insert_tail(&wq, q); if (idle_threads > 0) uv_cond_signal(&cond); uv_mutex_unlock(&mutex); @@ -220,9 +220,9 @@ static void init_threads(void) { if (uv_mutex_init(&mutex)) abort(); - QUEUE_INIT(&wq); - QUEUE_INIT(&slow_io_pending_wq); - QUEUE_INIT(&run_slow_work_message); + uv__queue_init(&wq); + uv__queue_init(&slow_io_pending_wq); + uv__queue_init(&run_slow_work_message); if (uv_sem_init(&sem, 0)) abort(); @@ -285,9 +285,9 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { uv_mutex_lock(&mutex); uv_mutex_lock(&w->loop->wq_mutex); - cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; + cancelled = !uv__queue_empty(&w->wq) && w->work != NULL; if (cancelled) - QUEUE_REMOVE(&w->wq); + uv__queue_remove(&w->wq); uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_unlock(&mutex); @@ -297,7 +297,7 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { w->work = uv__cancelled; uv_mutex_lock(&loop->wq_mutex); - QUEUE_INSERT_TAIL(&loop->wq, &w->wq); + uv__queue_insert_tail(&loop->wq, &w->wq); uv_async_send(&loop->wq_async); uv_mutex_unlock(&loop->wq_mutex); @@ -308,21 +308,21 @@ static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; - QUEUE* q; - QUEUE wq; + struct uv__queue* q; + struct uv__queue wq; int err; int nevents; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); - QUEUE_MOVE(&loop->wq, &wq); + uv__queue_move(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); nevents = 0; - while (!QUEUE_EMPTY(&wq)) { - q = QUEUE_HEAD(&wq); - QUEUE_REMOVE(q); + while (!uv__queue_empty(&wq)) { + q = uv__queue_head(&wq); + uv__queue_remove(q); w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; diff --git a/src/unix/aix.c b/src/unix/aix.c index f1afbed4..3af3009a 100644 --- a/src/unix/aix.c +++ b/src/unix/aix.c @@ -136,7 +136,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { struct pollfd pqry; struct pollfd* pe; struct poll_ctl pc; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; uint64_t base; uint64_t diff; @@ -151,18 +151,18 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; if (loop->nfds == 0) { - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); return; } lfields = uv__get_internal_fields(loop); - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + while (!uv__queue_empty(&loop->watcher_queue)) { + q = uv__queue_head(&loop->watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); + w = uv__queue_data(q, uv__io_t, watcher_queue); assert(w->pevents != 0); assert(w->fd >= 0); assert(w->fd < (int) loop->nwatchers); diff --git a/src/unix/async.c b/src/unix/async.c index 5751b6d0..0ff2669e 100644 --- a/src/unix/async.c +++ b/src/unix/async.c @@ -55,7 +55,7 @@ int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { handle->pending = 0; handle->u.fd = 0; /* This will be used as a busy flag. */ - QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); + uv__queue_insert_tail(&loop->async_handles, &handle->queue); uv__handle_start(handle); return 0; @@ -124,7 +124,7 @@ static void uv__async_spin(uv_async_t* handle) { void uv__async_close(uv_async_t* handle) { uv__async_spin(handle); - QUEUE_REMOVE(&handle->queue); + uv__queue_remove(&handle->queue); uv__handle_stop(handle); } @@ -132,8 +132,8 @@ void uv__async_close(uv_async_t* handle) { static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_async_t* h; _Atomic int *pending; @@ -157,13 +157,13 @@ static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { abort(); } - QUEUE_MOVE(&loop->async_handles, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_async_t, queue); + uv__queue_move(&loop->async_handles, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_async_t, queue); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&loop->async_handles, q); + uv__queue_remove(q); + uv__queue_insert_tail(&loop->async_handles, q); /* Atomically fetch and clear pending flag */ pending = (_Atomic int*) &h->pending; @@ -241,8 +241,8 @@ static int uv__async_start(uv_loop_t* loop) { void uv__async_stop(uv_loop_t* loop) { - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_async_t* h; if (loop->async_io_watcher.fd == -1) @@ -251,13 +251,13 @@ void uv__async_stop(uv_loop_t* loop) { /* Make sure no other thread is accessing the async handle fd after the loop * cleanup. */ - QUEUE_MOVE(&loop->async_handles, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_async_t, queue); + uv__queue_move(&loop->async_handles, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_async_t, queue); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&loop->async_handles, q); + uv__queue_remove(q); + uv__queue_insert_tail(&loop->async_handles, q); uv__async_spin(h); } @@ -275,20 +275,20 @@ void uv__async_stop(uv_loop_t* loop) { int uv__async_fork(uv_loop_t* loop) { - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_async_t* h; if (loop->async_io_watcher.fd == -1) /* never started */ return 0; - QUEUE_MOVE(&loop->async_handles, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_async_t, queue); + uv__queue_move(&loop->async_handles, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_async_t, queue); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&loop->async_handles, q); + uv__queue_remove(q); + uv__queue_insert_tail(&loop->async_handles, q); /* The state of any thread that set pending is now likely corrupt in this * child because the user called fork, so just clear these flags and move diff --git a/src/unix/core.c b/src/unix/core.c index 9c0b3f99..2f8395f2 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -344,7 +344,7 @@ static void uv__finish_close(uv_handle_t* handle) { } uv__handle_unref(handle); - QUEUE_REMOVE(&handle->handle_queue); + uv__queue_remove(&handle->handle_queue); if (handle->close_cb) { handle->close_cb(handle); @@ -380,7 +380,7 @@ int uv_backend_fd(const uv_loop_t* loop) { static int uv__loop_alive(const uv_loop_t* loop) { return uv__has_active_handles(loop) || uv__has_active_reqs(loop) || - !QUEUE_EMPTY(&loop->pending_queue) || + !uv__queue_empty(&loop->pending_queue) || loop->closing_handles != NULL; } @@ -389,8 +389,8 @@ static int uv__backend_timeout(const uv_loop_t* loop) { if (loop->stop_flag == 0 && /* uv__loop_alive(loop) && */ (uv__has_active_handles(loop) || uv__has_active_reqs(loop)) && - QUEUE_EMPTY(&loop->pending_queue) && - QUEUE_EMPTY(&loop->idle_handles) && + uv__queue_empty(&loop->pending_queue) && + uv__queue_empty(&loop->idle_handles) && (loop->flags & UV_LOOP_REAP_CHILDREN) == 0 && loop->closing_handles == NULL) return uv__next_timeout(loop); @@ -399,7 +399,7 @@ static int uv__backend_timeout(const uv_loop_t* loop) { int uv_backend_timeout(const uv_loop_t* loop) { - if (QUEUE_EMPTY(&loop->watcher_queue)) + if (uv__queue_empty(&loop->watcher_queue)) return uv__backend_timeout(loop); /* Need to call uv_run to update the backend fd state. */ return 0; @@ -432,7 +432,8 @@ int uv_run(uv_loop_t* loop, uv_run_mode mode) { while (r != 0 && loop->stop_flag == 0) { can_sleep = - QUEUE_EMPTY(&loop->pending_queue) && QUEUE_EMPTY(&loop->idle_handles); + uv__queue_empty(&loop->pending_queue) && + uv__queue_empty(&loop->idle_handles); uv__run_pending(loop); uv__run_idle(loop); @@ -448,7 +449,7 @@ int uv_run(uv_loop_t* loop, uv_run_mode mode) { /* Process immediate callbacks (e.g. write_cb) a small fixed number of * times to avoid loop starvation.*/ - for (r = 0; r < 8 && !QUEUE_EMPTY(&loop->pending_queue); r++) + for (r = 0; r < 8 && !uv__queue_empty(&loop->pending_queue); r++) uv__run_pending(loop); /* Run one final update on the provider_idle_time in case uv__io_poll @@ -827,17 +828,17 @@ int uv_fileno(const uv_handle_t* handle, uv_os_fd_t* fd) { static void uv__run_pending(uv_loop_t* loop) { - QUEUE* q; - QUEUE pq; + struct uv__queue* q; + struct uv__queue pq; uv__io_t* w; - QUEUE_MOVE(&loop->pending_queue, &pq); + uv__queue_move(&loop->pending_queue, &pq); - while (!QUEUE_EMPTY(&pq)) { - q = QUEUE_HEAD(&pq); - QUEUE_REMOVE(q); - QUEUE_INIT(q); - w = QUEUE_DATA(q, uv__io_t, pending_queue); + while (!uv__queue_empty(&pq)) { + q = uv__queue_head(&pq); + uv__queue_remove(q); + uv__queue_init(q); + w = uv__queue_data(q, uv__io_t, pending_queue); w->cb(loop, w, POLLOUT); } } @@ -892,8 +893,8 @@ static void maybe_resize(uv_loop_t* loop, unsigned int len) { void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) { assert(cb != NULL); assert(fd >= -1); - QUEUE_INIT(&w->pending_queue); - QUEUE_INIT(&w->watcher_queue); + uv__queue_init(&w->pending_queue); + uv__queue_init(&w->watcher_queue); w->cb = cb; w->fd = fd; w->events = 0; @@ -919,8 +920,8 @@ void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) { return; #endif - if (QUEUE_EMPTY(&w->watcher_queue)) - QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue); + if (uv__queue_empty(&w->watcher_queue)) + uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); if (loop->watchers[w->fd] == NULL) { loop->watchers[w->fd] = w; @@ -945,8 +946,8 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { w->pevents &= ~events; if (w->pevents == 0) { - QUEUE_REMOVE(&w->watcher_queue); - QUEUE_INIT(&w->watcher_queue); + uv__queue_remove(&w->watcher_queue); + uv__queue_init(&w->watcher_queue); w->events = 0; if (w == loop->watchers[w->fd]) { @@ -955,14 +956,14 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) { loop->nfds--; } } - else if (QUEUE_EMPTY(&w->watcher_queue)) - QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue); + else if (uv__queue_empty(&w->watcher_queue)) + uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } void uv__io_close(uv_loop_t* loop, uv__io_t* w) { uv__io_stop(loop, w, POLLIN | POLLOUT | UV__POLLRDHUP | UV__POLLPRI); - QUEUE_REMOVE(&w->pending_queue); + uv__queue_remove(&w->pending_queue); /* Remove stale events for this file descriptor */ if (w->fd != -1) @@ -971,8 +972,8 @@ void uv__io_close(uv_loop_t* loop, uv__io_t* w) { void uv__io_feed(uv_loop_t* loop, uv__io_t* w) { - if (QUEUE_EMPTY(&w->pending_queue)) - QUEUE_INSERT_TAIL(&loop->pending_queue, &w->pending_queue); + if (uv__queue_empty(&w->pending_queue)) + uv__queue_insert_tail(&loop->pending_queue, &w->pending_queue); } diff --git a/src/unix/fsevents.c b/src/unix/fsevents.c index 0535b454..df703f36 100644 --- a/src/unix/fsevents.c +++ b/src/unix/fsevents.c @@ -80,13 +80,13 @@ enum uv__cf_loop_signal_type_e { typedef enum uv__cf_loop_signal_type_e uv__cf_loop_signal_type_t; struct uv__cf_loop_signal_s { - QUEUE member; + struct uv__queue member; uv_fs_event_t* handle; uv__cf_loop_signal_type_t type; }; struct uv__fsevents_event_s { - QUEUE member; + struct uv__queue member; int events; char path[1]; }; @@ -98,7 +98,7 @@ struct uv__cf_loop_state_s { FSEventStreamRef fsevent_stream; uv_sem_t fsevent_sem; uv_mutex_t fsevent_mutex; - void* fsevent_handles[2]; + struct uv__queue fsevent_handles; unsigned int fsevent_handle_count; }; @@ -150,22 +150,22 @@ static void (*pFSEventStreamStop)(FSEventStreamRef); #define UV__FSEVENTS_PROCESS(handle, block) \ do { \ - QUEUE events; \ - QUEUE* q; \ + struct uv__queue events; \ + struct uv__queue* q; \ uv__fsevents_event_t* event; \ int err; \ uv_mutex_lock(&(handle)->cf_mutex); \ /* Split-off all events and empty original queue */ \ - QUEUE_MOVE(&(handle)->cf_events, &events); \ + uv__queue_move(&(handle)->cf_events, &events); \ /* Get error (if any) and zero original one */ \ err = (handle)->cf_error; \ (handle)->cf_error = 0; \ uv_mutex_unlock(&(handle)->cf_mutex); \ /* Loop through events, deallocating each after processing */ \ - while (!QUEUE_EMPTY(&events)) { \ - q = QUEUE_HEAD(&events); \ - event = QUEUE_DATA(q, uv__fsevents_event_t, member); \ - QUEUE_REMOVE(q); \ + while (!uv__queue_empty(&events)) { \ + q = uv__queue_head(&events); \ + event = uv__queue_data(q, uv__fsevents_event_t, member); \ + uv__queue_remove(q); \ /* NOTE: Checking uv__is_active() is required here, because handle \ * callback may close handle and invoking it after it will lead to \ * incorrect behaviour */ \ @@ -193,14 +193,14 @@ static void uv__fsevents_cb(uv_async_t* cb) { /* Runs in CF thread, pushed event into handle's event list */ static void uv__fsevents_push_event(uv_fs_event_t* handle, - QUEUE* events, + struct uv__queue* events, int err) { assert(events != NULL || err != 0); uv_mutex_lock(&handle->cf_mutex); /* Concatenate two queues */ if (events != NULL) - QUEUE_ADD(&handle->cf_events, events); + uv__queue_add(&handle->cf_events, events); /* Propagate error */ if (err != 0) @@ -224,12 +224,12 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef, char* path; char* pos; uv_fs_event_t* handle; - QUEUE* q; + struct uv__queue* q; uv_loop_t* loop; uv__cf_loop_state_t* state; uv__fsevents_event_t* event; FSEventStreamEventFlags flags; - QUEUE head; + struct uv__queue head; loop = info; state = loop->cf_state; @@ -238,9 +238,9 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef, /* For each handle */ uv_mutex_lock(&state->fsevent_mutex); - QUEUE_FOREACH(q, &state->fsevent_handles) { - handle = QUEUE_DATA(q, uv_fs_event_t, cf_member); - QUEUE_INIT(&head); + uv__queue_foreach(q, &state->fsevent_handles) { + handle = uv__queue_data(q, uv_fs_event_t, cf_member); + uv__queue_init(&head); /* Process and filter out events */ for (i = 0; i < numEvents; i++) { @@ -318,10 +318,10 @@ static void uv__fsevents_event_cb(const FSEventStreamRef streamRef, event->events = UV_CHANGE; } - QUEUE_INSERT_TAIL(&head, &event->member); + uv__queue_insert_tail(&head, &event->member); } - if (!QUEUE_EMPTY(&head)) + if (!uv__queue_empty(&head)) uv__fsevents_push_event(handle, &head, 0); } uv_mutex_unlock(&state->fsevent_mutex); @@ -403,7 +403,7 @@ static void uv__fsevents_destroy_stream(uv__cf_loop_state_t* state) { static void uv__fsevents_reschedule(uv__cf_loop_state_t* state, uv_loop_t* loop, uv__cf_loop_signal_type_t type) { - QUEUE* q; + struct uv__queue* q; uv_fs_event_t* curr; CFArrayRef cf_paths; CFStringRef* paths; @@ -446,9 +446,9 @@ static void uv__fsevents_reschedule(uv__cf_loop_state_t* state, q = &state->fsevent_handles; for (; i < path_count; i++) { - q = QUEUE_NEXT(q); + q = uv__queue_next(q); assert(q != &state->fsevent_handles); - curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); + curr = uv__queue_data(q, uv_fs_event_t, cf_member); assert(curr->realpath != NULL); paths[i] = @@ -486,8 +486,8 @@ final: /* Broadcast error to all handles */ uv_mutex_lock(&state->fsevent_mutex); - QUEUE_FOREACH(q, &state->fsevent_handles) { - curr = QUEUE_DATA(q, uv_fs_event_t, cf_member); + uv__queue_foreach(q, &state->fsevent_handles) { + curr = uv__queue_data(q, uv_fs_event_t, cf_member); uv__fsevents_push_event(curr, NULL, err); } uv_mutex_unlock(&state->fsevent_mutex); @@ -606,7 +606,7 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { if (err) goto fail_sem_init; - QUEUE_INIT(&loop->cf_signals); + uv__queue_init(&loop->cf_signals); err = uv_sem_init(&state->fsevent_sem, 0); if (err) @@ -616,7 +616,7 @@ static int uv__fsevents_loop_init(uv_loop_t* loop) { if (err) goto fail_fsevent_mutex_init; - QUEUE_INIT(&state->fsevent_handles); + uv__queue_init(&state->fsevent_handles); state->fsevent_need_reschedule = 0; state->fsevent_handle_count = 0; @@ -675,7 +675,7 @@ fail_mutex_init: void uv__fsevents_loop_delete(uv_loop_t* loop) { uv__cf_loop_signal_t* s; uv__cf_loop_state_t* state; - QUEUE* q; + struct uv__queue* q; if (loop->cf_state == NULL) return; @@ -688,10 +688,10 @@ void uv__fsevents_loop_delete(uv_loop_t* loop) { uv_mutex_destroy(&loop->cf_mutex); /* Free any remaining data */ - while (!QUEUE_EMPTY(&loop->cf_signals)) { - q = QUEUE_HEAD(&loop->cf_signals); - s = QUEUE_DATA(q, uv__cf_loop_signal_t, member); - QUEUE_REMOVE(q); + while (!uv__queue_empty(&loop->cf_signals)) { + q = uv__queue_head(&loop->cf_signals); + s = uv__queue_data(q, uv__cf_loop_signal_t, member); + uv__queue_remove(q); uv__free(s); } @@ -735,22 +735,22 @@ static void* uv__cf_loop_runner(void* arg) { static void uv__cf_loop_cb(void* arg) { uv_loop_t* loop; uv__cf_loop_state_t* state; - QUEUE* item; - QUEUE split_head; + struct uv__queue* item; + struct uv__queue split_head; uv__cf_loop_signal_t* s; loop = arg; state = loop->cf_state; uv_mutex_lock(&loop->cf_mutex); - QUEUE_MOVE(&loop->cf_signals, &split_head); + uv__queue_move(&loop->cf_signals, &split_head); uv_mutex_unlock(&loop->cf_mutex); - while (!QUEUE_EMPTY(&split_head)) { - item = QUEUE_HEAD(&split_head); - QUEUE_REMOVE(item); + while (!uv__queue_empty(&split_head)) { + item = uv__queue_head(&split_head); + uv__queue_remove(item); - s = QUEUE_DATA(item, uv__cf_loop_signal_t, member); + s = uv__queue_data(item, uv__cf_loop_signal_t, member); /* This was a termination signal */ if (s->handle == NULL) @@ -778,7 +778,7 @@ int uv__cf_loop_signal(uv_loop_t* loop, item->type = type; uv_mutex_lock(&loop->cf_mutex); - QUEUE_INSERT_TAIL(&loop->cf_signals, &item->member); + uv__queue_insert_tail(&loop->cf_signals, &item->member); state = loop->cf_state; assert(state != NULL); @@ -807,7 +807,7 @@ int uv__fsevents_init(uv_fs_event_t* handle) { handle->realpath_len = strlen(handle->realpath); /* Initialize event queue */ - QUEUE_INIT(&handle->cf_events); + uv__queue_init(&handle->cf_events); handle->cf_error = 0; /* @@ -832,7 +832,7 @@ int uv__fsevents_init(uv_fs_event_t* handle) { /* Insert handle into the list */ state = handle->loop->cf_state; uv_mutex_lock(&state->fsevent_mutex); - QUEUE_INSERT_TAIL(&state->fsevent_handles, &handle->cf_member); + uv__queue_insert_tail(&state->fsevent_handles, &handle->cf_member); state->fsevent_handle_count++; state->fsevent_need_reschedule = 1; uv_mutex_unlock(&state->fsevent_mutex); @@ -872,7 +872,7 @@ int uv__fsevents_close(uv_fs_event_t* handle) { /* Remove handle from the list */ state = handle->loop->cf_state; uv_mutex_lock(&state->fsevent_mutex); - QUEUE_REMOVE(&handle->cf_member); + uv__queue_remove(&handle->cf_member); state->fsevent_handle_count--; state->fsevent_need_reschedule = 1; uv_mutex_unlock(&state->fsevent_mutex); diff --git a/src/unix/kqueue.c b/src/unix/kqueue.c index 82916d65..b78242d3 100644 --- a/src/unix/kqueue.c +++ b/src/unix/kqueue.c @@ -133,7 +133,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { struct timespec spec; unsigned int nevents; unsigned int revents; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; uv_process_t* process; sigset_t* pset; @@ -152,19 +152,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; if (loop->nfds == 0) { - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); return; } lfields = uv__get_internal_fields(loop); nevents = 0; - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + while (!uv__queue_empty(&loop->watcher_queue)) { + q = uv__queue_head(&loop->watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); + w = uv__queue_data(q, uv__io_t, watcher_queue); assert(w->pevents != 0); assert(w->fd >= 0); assert(w->fd < (int) loop->nwatchers); @@ -307,8 +307,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { /* Handle kevent NOTE_EXIT results */ if (ev->filter == EVFILT_PROC) { - QUEUE_FOREACH(q, &loop->process_handles) { - process = QUEUE_DATA(q, uv_process_t, queue); + uv__queue_foreach(q, &loop->process_handles) { + process = uv__queue_data(q, uv_process_t, queue); if (process->pid == fd) { process->flags |= UV_HANDLE_REAP; loop->flags |= UV_LOOP_REAP_CHILDREN; diff --git a/src/unix/linux.c b/src/unix/linux.c index 9432e854..183e781b 100644 --- a/src/unix/linux.c +++ b/src/unix/linux.c @@ -266,7 +266,7 @@ STATIC_ASSERT(EPOLL_CTL_MOD < 4); struct watcher_list { RB_ENTRY(watcher_list) entry; - QUEUE watchers; + struct uv__queue watchers; int iterating; char* path; int wd; @@ -701,7 +701,7 @@ static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou, req->work_req.loop = loop; req->work_req.work = NULL; req->work_req.done = NULL; - QUEUE_INIT(&req->work_req.wq); + uv__queue_init(&req->work_req.wq); uv__req_register(loop, req); iou->in_flight++; @@ -1224,7 +1224,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { struct uv__iou* ctl; struct uv__iou* iou; int real_timeout; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; sigset_t* sigmask; sigset_t sigset; @@ -1270,11 +1270,11 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { memset(&e, 0, sizeof(e)); - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + while (!uv__queue_empty(&loop->watcher_queue)) { + q = uv__queue_head(&loop->watcher_queue); + w = uv__queue_data(q, uv__io_t, watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); op = EPOLL_CTL_MOD; if (w->events == 0) @@ -2229,8 +2229,8 @@ static int uv__inotify_fork(uv_loop_t* loop, struct watcher_list* root) { struct watcher_list* tmp_watcher_list_iter; struct watcher_list* watcher_list; struct watcher_list tmp_watcher_list; - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_fs_event_t* handle; char* tmp_path; @@ -2242,41 +2242,41 @@ static int uv__inotify_fork(uv_loop_t* loop, struct watcher_list* root) { */ loop->inotify_watchers = root; - QUEUE_INIT(&tmp_watcher_list.watchers); + uv__queue_init(&tmp_watcher_list.watchers); /* Note that the queue we use is shared with the start and stop() - * functions, making QUEUE_FOREACH unsafe to use. So we use the - * QUEUE_MOVE trick to safely iterate. Also don't free the watcher + * functions, making uv__queue_foreach unsafe to use. So we use the + * uv__queue_move trick to safely iterate. Also don't free the watcher * list until we're done iterating. c.f. uv__inotify_read. */ RB_FOREACH_SAFE(watcher_list, watcher_root, uv__inotify_watchers(loop), tmp_watcher_list_iter) { watcher_list->iterating = 1; - QUEUE_MOVE(&watcher_list->watchers, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - handle = QUEUE_DATA(q, uv_fs_event_t, watchers); + uv__queue_move(&watcher_list->watchers, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + handle = uv__queue_data(q, uv_fs_event_t, watchers); /* It's critical to keep a copy of path here, because it * will be set to NULL by stop() and then deallocated by * maybe_free_watcher_list */ tmp_path = uv__strdup(handle->path); assert(tmp_path != NULL); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&watcher_list->watchers, q); + uv__queue_remove(q); + uv__queue_insert_tail(&watcher_list->watchers, q); uv_fs_event_stop(handle); - QUEUE_INSERT_TAIL(&tmp_watcher_list.watchers, &handle->watchers); + uv__queue_insert_tail(&tmp_watcher_list.watchers, &handle->watchers); handle->path = tmp_path; } watcher_list->iterating = 0; maybe_free_watcher_list(watcher_list, loop); } - QUEUE_MOVE(&tmp_watcher_list.watchers, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - QUEUE_REMOVE(q); - handle = QUEUE_DATA(q, uv_fs_event_t, watchers); + uv__queue_move(&tmp_watcher_list.watchers, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + uv__queue_remove(q); + handle = uv__queue_data(q, uv_fs_event_t, watchers); tmp_path = handle->path; handle->path = NULL; err = uv_fs_event_start(handle, handle->cb, tmp_path, 0); @@ -2298,7 +2298,7 @@ static struct watcher_list* find_watcher(uv_loop_t* loop, int wd) { static void maybe_free_watcher_list(struct watcher_list* w, uv_loop_t* loop) { /* if the watcher_list->watchers is being iterated over, we can't free it. */ - if ((!w->iterating) && QUEUE_EMPTY(&w->watchers)) { + if ((!w->iterating) && uv__queue_empty(&w->watchers)) { /* No watchers left for this path. Clean up. */ RB_REMOVE(watcher_root, uv__inotify_watchers(loop), w); inotify_rm_watch(loop->inotify_fd, w->wd); @@ -2313,8 +2313,8 @@ static void uv__inotify_read(uv_loop_t* loop, const struct inotify_event* e; struct watcher_list* w; uv_fs_event_t* h; - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; const char* path; ssize_t size; const char *p; @@ -2357,7 +2357,7 @@ static void uv__inotify_read(uv_loop_t* loop, * What can go wrong? * A callback could call uv_fs_event_stop() * and the queue can change under our feet. - * So, we use QUEUE_MOVE() trick to safely iterate over the queue. + * So, we use uv__queue_move() trick to safely iterate over the queue. * And we don't free the watcher_list until we're done iterating. * * First, @@ -2365,13 +2365,13 @@ static void uv__inotify_read(uv_loop_t* loop, * not to free watcher_list. */ w->iterating = 1; - QUEUE_MOVE(&w->watchers, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_fs_event_t, watchers); + uv__queue_move(&w->watchers, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_fs_event_t, watchers); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&w->watchers, q); + uv__queue_remove(q); + uv__queue_insert_tail(&w->watchers, q); h->cb(h, path, events, 0); } @@ -2433,13 +2433,13 @@ int uv_fs_event_start(uv_fs_event_t* handle, w->wd = wd; w->path = memcpy(w + 1, path, len); - QUEUE_INIT(&w->watchers); + uv__queue_init(&w->watchers); w->iterating = 0; RB_INSERT(watcher_root, uv__inotify_watchers(loop), w); no_insert: uv__handle_start(handle); - QUEUE_INSERT_TAIL(&w->watchers, &handle->watchers); + uv__queue_insert_tail(&w->watchers, &handle->watchers); handle->path = w->path; handle->cb = cb; handle->wd = wd; @@ -2460,7 +2460,7 @@ int uv_fs_event_stop(uv_fs_event_t* handle) { handle->wd = -1; handle->path = NULL; uv__handle_stop(handle); - QUEUE_REMOVE(&handle->watchers); + uv__queue_remove(&handle->watchers); maybe_free_watcher_list(w, handle->loop); diff --git a/src/unix/loop-watcher.c b/src/unix/loop-watcher.c index b8c1c2a7..2db8b515 100644 --- a/src/unix/loop-watcher.c +++ b/src/unix/loop-watcher.c @@ -32,7 +32,7 @@ int uv_##name##_start(uv_##name##_t* handle, uv_##name##_cb cb) { \ if (uv__is_active(handle)) return 0; \ if (cb == NULL) return UV_EINVAL; \ - QUEUE_INSERT_HEAD(&handle->loop->name##_handles, &handle->queue); \ + uv__queue_insert_head(&handle->loop->name##_handles, &handle->queue); \ handle->name##_cb = cb; \ uv__handle_start(handle); \ return 0; \ @@ -40,21 +40,21 @@ \ int uv_##name##_stop(uv_##name##_t* handle) { \ if (!uv__is_active(handle)) return 0; \ - QUEUE_REMOVE(&handle->queue); \ + uv__queue_remove(&handle->queue); \ uv__handle_stop(handle); \ return 0; \ } \ \ void uv__run_##name(uv_loop_t* loop) { \ uv_##name##_t* h; \ - QUEUE queue; \ - QUEUE* q; \ - QUEUE_MOVE(&loop->name##_handles, &queue); \ - while (!QUEUE_EMPTY(&queue)) { \ - q = QUEUE_HEAD(&queue); \ - h = QUEUE_DATA(q, uv_##name##_t, queue); \ - QUEUE_REMOVE(q); \ - QUEUE_INSERT_TAIL(&loop->name##_handles, q); \ + struct uv__queue queue; \ + struct uv__queue* q; \ + uv__queue_move(&loop->name##_handles, &queue); \ + while (!uv__queue_empty(&queue)) { \ + q = uv__queue_head(&queue); \ + h = uv__queue_data(q, uv_##name##_t, queue); \ + uv__queue_remove(q); \ + uv__queue_insert_tail(&loop->name##_handles, q); \ h->name##_cb(h); \ } \ } \ diff --git a/src/unix/loop.c b/src/unix/loop.c index 90a51b33..a9468e8e 100644 --- a/src/unix/loop.c +++ b/src/unix/loop.c @@ -50,20 +50,20 @@ int uv_loop_init(uv_loop_t* loop) { sizeof(lfields->loop_metrics.metrics)); heap_init((struct heap*) &loop->timer_heap); - QUEUE_INIT(&loop->wq); - QUEUE_INIT(&loop->idle_handles); - QUEUE_INIT(&loop->async_handles); - QUEUE_INIT(&loop->check_handles); - QUEUE_INIT(&loop->prepare_handles); - QUEUE_INIT(&loop->handle_queue); + uv__queue_init(&loop->wq); + uv__queue_init(&loop->idle_handles); + uv__queue_init(&loop->async_handles); + uv__queue_init(&loop->check_handles); + uv__queue_init(&loop->prepare_handles); + uv__queue_init(&loop->handle_queue); loop->active_handles = 0; loop->active_reqs.count = 0; loop->nfds = 0; loop->watchers = NULL; loop->nwatchers = 0; - QUEUE_INIT(&loop->pending_queue); - QUEUE_INIT(&loop->watcher_queue); + uv__queue_init(&loop->pending_queue); + uv__queue_init(&loop->watcher_queue); loop->closing_handles = NULL; uv__update_time(loop); @@ -85,7 +85,7 @@ int uv_loop_init(uv_loop_t* loop) { err = uv__process_init(loop); if (err) goto fail_signal_init; - QUEUE_INIT(&loop->process_handles); + uv__queue_init(&loop->process_handles); err = uv_rwlock_init(&loop->cloexec_lock); if (err) @@ -152,9 +152,9 @@ int uv_loop_fork(uv_loop_t* loop) { if (w == NULL) continue; - if (w->pevents != 0 && QUEUE_EMPTY(&w->watcher_queue)) { + if (w->pevents != 0 && uv__queue_empty(&w->watcher_queue)) { w->events = 0; /* Force re-registration in uv__io_poll. */ - QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue); + uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } } @@ -180,7 +180,7 @@ void uv__loop_close(uv_loop_t* loop) { } uv_mutex_lock(&loop->wq_mutex); - assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!"); + assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!"); assert(!uv__has_active_reqs(loop)); uv_mutex_unlock(&loop->wq_mutex); uv_mutex_destroy(&loop->wq_mutex); @@ -192,8 +192,8 @@ void uv__loop_close(uv_loop_t* loop) { uv_rwlock_destroy(&loop->cloexec_lock); #if 0 - assert(QUEUE_EMPTY(&loop->pending_queue)); - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->pending_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); assert(loop->nfds == 0); #endif diff --git a/src/unix/os390-syscalls.c b/src/unix/os390-syscalls.c index 5861aaaa..7f90c270 100644 --- a/src/unix/os390-syscalls.c +++ b/src/unix/os390-syscalls.c @@ -27,7 +27,7 @@ #include #include -static QUEUE global_epoll_queue; +static struct uv__queue global_epoll_queue; static uv_mutex_t global_epoll_lock; static uv_once_t once = UV_ONCE_INIT; @@ -178,18 +178,18 @@ static void after_fork(void) { static void child_fork(void) { - QUEUE* q; + struct uv__queue* q; uv_once_t child_once = UV_ONCE_INIT; /* reset once */ memcpy(&once, &child_once, sizeof(child_once)); /* reset epoll list */ - while (!QUEUE_EMPTY(&global_epoll_queue)) { + while (!uv__queue_empty(&global_epoll_queue)) { uv__os390_epoll* lst; - q = QUEUE_HEAD(&global_epoll_queue); - QUEUE_REMOVE(q); - lst = QUEUE_DATA(q, uv__os390_epoll, member); + q = uv__queue_head(&global_epoll_queue); + uv__queue_remove(q); + lst = uv__queue_data(q, uv__os390_epoll, member); uv__free(lst->items); lst->items = NULL; lst->size = 0; @@ -201,7 +201,7 @@ static void child_fork(void) { static void epoll_init(void) { - QUEUE_INIT(&global_epoll_queue); + uv__queue_init(&global_epoll_queue); if (uv_mutex_init(&global_epoll_lock)) abort(); @@ -225,7 +225,7 @@ uv__os390_epoll* epoll_create1(int flags) { lst->items[lst->size - 1].revents = 0; uv_once(&once, epoll_init); uv_mutex_lock(&global_epoll_lock); - QUEUE_INSERT_TAIL(&global_epoll_queue, &lst->member); + uv__queue_insert_tail(&global_epoll_queue, &lst->member); uv_mutex_unlock(&global_epoll_lock); } @@ -352,14 +352,14 @@ int epoll_wait(uv__os390_epoll* lst, struct epoll_event* events, int epoll_file_close(int fd) { - QUEUE* q; + struct uv__queue* q; uv_once(&once, epoll_init); uv_mutex_lock(&global_epoll_lock); - QUEUE_FOREACH(q, &global_epoll_queue) { + uv__queue_foreach(q, &global_epoll_queue) { uv__os390_epoll* lst; - lst = QUEUE_DATA(q, uv__os390_epoll, member); + lst = uv__queue_data(q, uv__os390_epoll, member); if (fd < lst->size && lst->items != NULL && lst->items[fd].fd != -1) lst->items[fd].fd = -1; } @@ -371,7 +371,7 @@ int epoll_file_close(int fd) { void epoll_queue_close(uv__os390_epoll* lst) { /* Remove epoll instance from global queue */ uv_mutex_lock(&global_epoll_lock); - QUEUE_REMOVE(&lst->member); + uv__queue_remove(&lst->member); uv_mutex_unlock(&global_epoll_lock); /* Free resources */ diff --git a/src/unix/os390-syscalls.h b/src/unix/os390-syscalls.h index 9f504171..d5f3bcf8 100644 --- a/src/unix/os390-syscalls.h +++ b/src/unix/os390-syscalls.h @@ -45,7 +45,7 @@ struct epoll_event { }; typedef struct { - QUEUE member; + struct uv__queue member; struct pollfd* items; unsigned long size; int msg_queue; diff --git a/src/unix/os390.c b/src/unix/os390.c index a87c2d77..bbd37692 100644 --- a/src/unix/os390.c +++ b/src/unix/os390.c @@ -815,7 +815,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { uv__os390_epoll* ep; int have_signals; int real_timeout; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; uint64_t base; int count; @@ -827,19 +827,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; if (loop->nfds == 0) { - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); return; } lfields = uv__get_internal_fields(loop); - while (!QUEUE_EMPTY(&loop->watcher_queue)) { + while (!uv__queue_empty(&loop->watcher_queue)) { uv_stream_t* stream; - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); + q = uv__queue_head(&loop->watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); + w = uv__queue_data(q, uv__io_t, watcher_queue); assert(w->pevents != 0); assert(w->fd >= 0); diff --git a/src/unix/pipe.c b/src/unix/pipe.c index 610b09b3..3b7bc881 100644 --- a/src/unix/pipe.c +++ b/src/unix/pipe.c @@ -230,7 +230,7 @@ out: uv__req_init(handle->loop, req, UV_CONNECT); req->handle = (uv_stream_t*)handle; req->cb = cb; - QUEUE_INIT(&req->queue); + uv__queue_init(&req->queue); /* Force callback to run on next tick in case of error. */ if (err) diff --git a/src/unix/posix-poll.c b/src/unix/posix-poll.c index 7e7de868..2e016c2f 100644 --- a/src/unix/posix-poll.c +++ b/src/unix/posix-poll.c @@ -137,7 +137,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { sigset_t set; uint64_t time_base; uint64_t time_diff; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; size_t i; unsigned int nevents; @@ -149,19 +149,19 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; if (loop->nfds == 0) { - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); return; } lfields = uv__get_internal_fields(loop); /* Take queued watchers and add their fds to our poll fds array. */ - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + while (!uv__queue_empty(&loop->watcher_queue)) { + q = uv__queue_head(&loop->watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); + w = uv__queue_data(q, uv__io_t, watcher_queue); assert(w->pevents != 0); assert(w->fd >= 0); assert(w->fd < (int) loop->nwatchers); diff --git a/src/unix/process.c b/src/unix/process.c index c4fb322d..dbb4a1dc 100644 --- a/src/unix/process.c +++ b/src/unix/process.c @@ -108,17 +108,17 @@ void uv__wait_children(uv_loop_t* loop) { int status; int options; pid_t pid; - QUEUE pending; - QUEUE* q; - QUEUE* h; + struct uv__queue pending; + struct uv__queue* q; + struct uv__queue* h; - QUEUE_INIT(&pending); + uv__queue_init(&pending); h = &loop->process_handles; - q = QUEUE_HEAD(h); + q = uv__queue_head(h); while (q != h) { - process = QUEUE_DATA(q, uv_process_t, queue); - q = QUEUE_NEXT(q); + process = uv__queue_data(q, uv_process_t, queue); + q = uv__queue_next(q); #ifndef UV_USE_SIGCHLD if ((process->flags & UV_HANDLE_REAP) == 0) @@ -149,18 +149,18 @@ void uv__wait_children(uv_loop_t* loop) { assert(pid == process->pid); process->status = status; - QUEUE_REMOVE(&process->queue); - QUEUE_INSERT_TAIL(&pending, &process->queue); + uv__queue_remove(&process->queue); + uv__queue_insert_tail(&pending, &process->queue); } h = &pending; - q = QUEUE_HEAD(h); + q = uv__queue_head(h); while (q != h) { - process = QUEUE_DATA(q, uv_process_t, queue); - q = QUEUE_NEXT(q); + process = uv__queue_data(q, uv_process_t, queue); + q = uv__queue_next(q); - QUEUE_REMOVE(&process->queue); - QUEUE_INIT(&process->queue); + uv__queue_remove(&process->queue); + uv__queue_init(&process->queue); uv__handle_stop(process); if (process->exit_cb == NULL) @@ -176,7 +176,7 @@ void uv__wait_children(uv_loop_t* loop) { process->exit_cb(process, exit_status, term_signal); } - assert(QUEUE_EMPTY(&pending)); + assert(uv__queue_empty(&pending)); } /* @@ -978,7 +978,7 @@ int uv_spawn(uv_loop_t* loop, UV_PROCESS_WINDOWS_VERBATIM_ARGUMENTS))); uv__handle_init(loop, (uv_handle_t*)process, UV_PROCESS); - QUEUE_INIT(&process->queue); + uv__queue_init(&process->queue); process->status = 0; stdio_count = options->stdio_count; @@ -1041,7 +1041,7 @@ int uv_spawn(uv_loop_t* loop, process->pid = pid; process->exit_cb = options->exit_cb; - QUEUE_INSERT_TAIL(&loop->process_handles, &process->queue); + uv__queue_insert_tail(&loop->process_handles, &process->queue); uv__handle_start(process); } @@ -1103,10 +1103,10 @@ int uv_kill(int pid, int signum) { void uv__process_close(uv_process_t* handle) { - QUEUE_REMOVE(&handle->queue); + uv__queue_remove(&handle->queue); uv__handle_stop(handle); #ifdef UV_USE_SIGCHLD - if (QUEUE_EMPTY(&handle->loop->process_handles)) + if (uv__queue_empty(&handle->loop->process_handles)) uv_signal_stop(&handle->loop->child_watcher); #endif } diff --git a/src/unix/signal.c b/src/unix/signal.c index bb70523f..63aba5a6 100644 --- a/src/unix/signal.c +++ b/src/unix/signal.c @@ -291,16 +291,16 @@ int uv__signal_loop_fork(uv_loop_t* loop) { void uv__signal_loop_cleanup(uv_loop_t* loop) { - QUEUE* q; + struct uv__queue* q; /* Stop all the signal watchers that are still attached to this loop. This * ensures that the (shared) signal tree doesn't contain any invalid entries * entries, and that signal handlers are removed when appropriate. - * It's safe to use QUEUE_FOREACH here because the handles and the handle + * It's safe to use uv__queue_foreach here because the handles and the handle * queue are not modified by uv__signal_stop(). */ - QUEUE_FOREACH(q, &loop->handle_queue) { - uv_handle_t* handle = QUEUE_DATA(q, uv_handle_t, handle_queue); + uv__queue_foreach(q, &loop->handle_queue) { + uv_handle_t* handle = uv__queue_data(q, uv_handle_t, handle_queue); if (handle->type == UV_SIGNAL) uv__signal_stop((uv_signal_t*) handle); diff --git a/src/unix/stream.c b/src/unix/stream.c index 03f92b50..28c4d546 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -94,8 +94,8 @@ void uv__stream_init(uv_loop_t* loop, stream->accepted_fd = -1; stream->queued_fds = NULL; stream->delayed_error = 0; - QUEUE_INIT(&stream->write_queue); - QUEUE_INIT(&stream->write_completed_queue); + uv__queue_init(&stream->write_queue); + uv__queue_init(&stream->write_completed_queue); stream->write_queue_size = 0; if (loop->emfile_fd == -1) { @@ -439,15 +439,15 @@ int uv__stream_open(uv_stream_t* stream, int fd, int flags) { void uv__stream_flush_write_queue(uv_stream_t* stream, int error) { uv_write_t* req; - QUEUE* q; - while (!QUEUE_EMPTY(&stream->write_queue)) { - q = QUEUE_HEAD(&stream->write_queue); - QUEUE_REMOVE(q); + struct uv__queue* q; + while (!uv__queue_empty(&stream->write_queue)) { + q = uv__queue_head(&stream->write_queue); + uv__queue_remove(q); - req = QUEUE_DATA(q, uv_write_t, queue); + req = uv__queue_data(q, uv_write_t, queue); req->error = error; - QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); + uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); } } @@ -627,7 +627,7 @@ static void uv__drain(uv_stream_t* stream) { uv_shutdown_t* req; int err; - assert(QUEUE_EMPTY(&stream->write_queue)); + assert(uv__queue_empty(&stream->write_queue)); if (!(stream->flags & UV_HANDLE_CLOSING)) { uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); uv__stream_osx_interrupt_select(stream); @@ -714,7 +714,7 @@ static void uv__write_req_finish(uv_write_t* req) { uv_stream_t* stream = req->handle; /* Pop the req off tcp->write_queue. */ - QUEUE_REMOVE(&req->queue); + uv__queue_remove(&req->queue); /* Only free when there was no error. On error, we touch up write_queue_size * right before making the callback. The reason we don't do that right away @@ -731,7 +731,7 @@ static void uv__write_req_finish(uv_write_t* req) { /* Add it to the write_completed_queue where it will have its * callback called in the near future. */ - QUEUE_INSERT_TAIL(&stream->write_completed_queue, &req->queue); + uv__queue_insert_tail(&stream->write_completed_queue, &req->queue); uv__io_feed(stream->loop, &stream->io_watcher); } @@ -837,7 +837,7 @@ static int uv__try_write(uv_stream_t* stream, } static void uv__write(uv_stream_t* stream) { - QUEUE* q; + struct uv__queue* q; uv_write_t* req; ssize_t n; int count; @@ -851,11 +851,11 @@ static void uv__write(uv_stream_t* stream) { count = 32; for (;;) { - if (QUEUE_EMPTY(&stream->write_queue)) + if (uv__queue_empty(&stream->write_queue)) return; - q = QUEUE_HEAD(&stream->write_queue); - req = QUEUE_DATA(q, uv_write_t, queue); + q = uv__queue_head(&stream->write_queue); + req = uv__queue_data(q, uv_write_t, queue); assert(req->handle == stream); n = uv__try_write(stream, @@ -899,19 +899,19 @@ error: static void uv__write_callbacks(uv_stream_t* stream) { uv_write_t* req; - QUEUE* q; - QUEUE pq; + struct uv__queue* q; + struct uv__queue pq; - if (QUEUE_EMPTY(&stream->write_completed_queue)) + if (uv__queue_empty(&stream->write_completed_queue)) return; - QUEUE_MOVE(&stream->write_completed_queue, &pq); + uv__queue_move(&stream->write_completed_queue, &pq); - while (!QUEUE_EMPTY(&pq)) { + while (!uv__queue_empty(&pq)) { /* Pop a req off write_completed_queue. */ - q = QUEUE_HEAD(&pq); - req = QUEUE_DATA(q, uv_write_t, queue); - QUEUE_REMOVE(q); + q = uv__queue_head(&pq); + req = uv__queue_data(q, uv_write_t, queue); + uv__queue_remove(q); uv__req_unregister(stream->loop, req); if (req->bufs != NULL) { @@ -1174,7 +1174,7 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) { stream->shutdown_req = req; stream->flags &= ~UV_HANDLE_WRITABLE; - if (QUEUE_EMPTY(&stream->write_queue)) + if (uv__queue_empty(&stream->write_queue)) uv__io_feed(stream->loop, &stream->io_watcher); return 0; @@ -1227,7 +1227,7 @@ static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { uv__write_callbacks(stream); /* Write queue drained. */ - if (QUEUE_EMPTY(&stream->write_queue)) + if (uv__queue_empty(&stream->write_queue)) uv__drain(stream); } } @@ -1270,7 +1270,7 @@ static void uv__stream_connect(uv_stream_t* stream) { stream->connect_req = NULL; uv__req_unregister(stream->loop, req); - if (error < 0 || QUEUE_EMPTY(&stream->write_queue)) { + if (error < 0 || uv__queue_empty(&stream->write_queue)) { uv__io_stop(stream->loop, &stream->io_watcher, POLLOUT); } @@ -1352,7 +1352,7 @@ int uv_write2(uv_write_t* req, req->handle = stream; req->error = 0; req->send_handle = send_handle; - QUEUE_INIT(&req->queue); + uv__queue_init(&req->queue); req->bufs = req->bufsml; if (nbufs > ARRAY_SIZE(req->bufsml)) @@ -1367,7 +1367,7 @@ int uv_write2(uv_write_t* req, stream->write_queue_size += uv__count_bufs(bufs, nbufs); /* Append the request to write_queue. */ - QUEUE_INSERT_TAIL(&stream->write_queue, &req->queue); + uv__queue_insert_tail(&stream->write_queue, &req->queue); /* If the queue was empty when this function began, we should attempt to * do the write immediately. Otherwise start the write_watcher and wait diff --git a/src/unix/sunos.c b/src/unix/sunos.c index 75b6fbad..2d6bae79 100644 --- a/src/unix/sunos.c +++ b/src/unix/sunos.c @@ -148,7 +148,7 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { struct port_event events[1024]; struct port_event* pe; struct timespec spec; - QUEUE* q; + struct uv__queue* q; uv__io_t* w; sigset_t* pset; sigset_t set; @@ -166,16 +166,16 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { int reset_timeout; if (loop->nfds == 0) { - assert(QUEUE_EMPTY(&loop->watcher_queue)); + assert(uv__queue_empty(&loop->watcher_queue)); return; } - while (!QUEUE_EMPTY(&loop->watcher_queue)) { - q = QUEUE_HEAD(&loop->watcher_queue); - QUEUE_REMOVE(q); - QUEUE_INIT(q); + while (!uv__queue_empty(&loop->watcher_queue)) { + q = uv__queue_head(&loop->watcher_queue); + uv__queue_remove(q); + uv__queue_init(q); - w = QUEUE_DATA(q, uv__io_t, watcher_queue); + w = uv__queue_data(q, uv__io_t, watcher_queue); assert(w->pevents != 0); if (port_associate(loop->backend_fd, @@ -316,8 +316,8 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { continue; /* Disabled by callback. */ /* Events Ports operates in oneshot mode, rearm timer on next run. */ - if (w->pevents != 0 && QUEUE_EMPTY(&w->watcher_queue)) - QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue); + if (w->pevents != 0 && uv__queue_empty(&w->watcher_queue)) + uv__queue_insert_tail(&loop->watcher_queue, &w->watcher_queue); } uv__metrics_inc_events(loop, nevents); diff --git a/src/unix/tcp.c b/src/unix/tcp.c index ab4e06c2..d6c848f4 100644 --- a/src/unix/tcp.c +++ b/src/unix/tcp.c @@ -124,7 +124,7 @@ int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* tcp, unsigned int flags) { if (domain != AF_UNSPEC) { err = new_socket(tcp, domain, 0); if (err) { - QUEUE_REMOVE(&tcp->handle_queue); + uv__queue_remove(&tcp->handle_queue); if (tcp->io_watcher.fd != -1) uv__close(tcp->io_watcher.fd); tcp->io_watcher.fd = -1; @@ -252,7 +252,7 @@ out: uv__req_init(handle->loop, req, UV_CONNECT); req->cb = cb; req->handle = (uv_stream_t*) handle; - QUEUE_INIT(&req->queue); + uv__queue_init(&req->queue); handle->connect_req = req; uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); diff --git a/src/unix/tty.c b/src/unix/tty.c index 7a5390c1..d099bdb3 100644 --- a/src/unix/tty.c +++ b/src/unix/tty.c @@ -222,7 +222,7 @@ skip: int rc = r; if (newfd != -1) uv__close(newfd); - QUEUE_REMOVE(&tty->handle_queue); + uv__queue_remove(&tty->handle_queue); do r = fcntl(fd, F_SETFL, saved_flags); while (r == -1 && errno == EINTR); diff --git a/src/unix/udp.c b/src/unix/udp.c index f556808f..c2814512 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -62,18 +62,18 @@ void uv__udp_close(uv_udp_t* handle) { void uv__udp_finish_close(uv_udp_t* handle) { uv_udp_send_t* req; - QUEUE* q; + struct uv__queue* q; assert(!uv__io_active(&handle->io_watcher, POLLIN | POLLOUT)); assert(handle->io_watcher.fd == -1); - while (!QUEUE_EMPTY(&handle->write_queue)) { - q = QUEUE_HEAD(&handle->write_queue); - QUEUE_REMOVE(q); + while (!uv__queue_empty(&handle->write_queue)) { + q = uv__queue_head(&handle->write_queue); + uv__queue_remove(q); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); req->status = UV_ECANCELED; - QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); } uv__udp_run_completed(handle); @@ -90,16 +90,16 @@ void uv__udp_finish_close(uv_udp_t* handle) { static void uv__udp_run_completed(uv_udp_t* handle) { uv_udp_send_t* req; - QUEUE* q; + struct uv__queue* q; assert(!(handle->flags & UV_HANDLE_UDP_PROCESSING)); handle->flags |= UV_HANDLE_UDP_PROCESSING; - while (!QUEUE_EMPTY(&handle->write_completed_queue)) { - q = QUEUE_HEAD(&handle->write_completed_queue); - QUEUE_REMOVE(q); + while (!uv__queue_empty(&handle->write_completed_queue)) { + q = uv__queue_head(&handle->write_completed_queue); + uv__queue_remove(q); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); uv__req_unregister(handle->loop, req); handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs); @@ -121,7 +121,7 @@ static void uv__udp_run_completed(uv_udp_t* handle) { req->send_cb(req, req->status); } - if (QUEUE_EMPTY(&handle->write_queue)) { + if (uv__queue_empty(&handle->write_queue)) { /* Pending queue and completion queue empty, stop watcher. */ uv__io_stop(handle->loop, &handle->io_watcher, POLLOUT); if (!uv__io_active(&handle->io_watcher, POLLIN)) @@ -280,20 +280,20 @@ static void uv__udp_sendmsg(uv_udp_t* handle) { uv_udp_send_t* req; struct mmsghdr h[20]; struct mmsghdr* p; - QUEUE* q; + struct uv__queue* q; ssize_t npkts; size_t pkts; size_t i; - if (QUEUE_EMPTY(&handle->write_queue)) + if (uv__queue_empty(&handle->write_queue)) return; write_queue_drain: - for (pkts = 0, q = QUEUE_HEAD(&handle->write_queue); + for (pkts = 0, q = uv__queue_head(&handle->write_queue); pkts < ARRAY_SIZE(h) && q != &handle->write_queue; - ++pkts, q = QUEUE_HEAD(q)) { + ++pkts, q = uv__queue_head(q)) { assert(q != NULL); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); assert(req != NULL); p = &h[pkts]; @@ -325,16 +325,16 @@ write_queue_drain: if (npkts < 1) { if (errno == EAGAIN || errno == EWOULDBLOCK || errno == ENOBUFS) return; - for (i = 0, q = QUEUE_HEAD(&handle->write_queue); + for (i = 0, q = uv__queue_head(&handle->write_queue); i < pkts && q != &handle->write_queue; - ++i, q = QUEUE_HEAD(&handle->write_queue)) { + ++i, q = uv__queue_head(&handle->write_queue)) { assert(q != NULL); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); assert(req != NULL); req->status = UV__ERR(errno); - QUEUE_REMOVE(&req->queue); - QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + uv__queue_remove(&req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); } uv__io_feed(handle->loop, &handle->io_watcher); return; @@ -343,11 +343,11 @@ write_queue_drain: /* Safety: npkts known to be >0 below. Hence cast from ssize_t * to size_t safe. */ - for (i = 0, q = QUEUE_HEAD(&handle->write_queue); + for (i = 0, q = uv__queue_head(&handle->write_queue); i < (size_t)npkts && q != &handle->write_queue; - ++i, q = QUEUE_HEAD(&handle->write_queue)) { + ++i, q = uv__queue_head(&handle->write_queue)) { assert(q != NULL); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); assert(req != NULL); req->status = req->bufs[0].len; @@ -357,25 +357,25 @@ write_queue_drain: * why we don't handle partial writes. Just pop the request * off the write queue and onto the completed queue, done. */ - QUEUE_REMOVE(&req->queue); - QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + uv__queue_remove(&req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); } /* couldn't batch everything, continue sending (jump to avoid stack growth) */ - if (!QUEUE_EMPTY(&handle->write_queue)) + if (!uv__queue_empty(&handle->write_queue)) goto write_queue_drain; uv__io_feed(handle->loop, &handle->io_watcher); #else /* __linux__ || ____FreeBSD__ */ uv_udp_send_t* req; struct msghdr h; - QUEUE* q; + struct uv__queue* q; ssize_t size; - while (!QUEUE_EMPTY(&handle->write_queue)) { - q = QUEUE_HEAD(&handle->write_queue); + while (!uv__queue_empty(&handle->write_queue)) { + q = uv__queue_head(&handle->write_queue); assert(q != NULL); - req = QUEUE_DATA(q, uv_udp_send_t, queue); + req = uv__queue_data(q, uv_udp_send_t, queue); assert(req != NULL); memset(&h, 0, sizeof h); @@ -414,8 +414,8 @@ write_queue_drain: * why we don't handle partial writes. Just pop the request * off the write queue and onto the completed queue, done. */ - QUEUE_REMOVE(&req->queue); - QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue); + uv__queue_remove(&req->queue); + uv__queue_insert_tail(&handle->write_completed_queue, &req->queue); uv__io_feed(handle->loop, &handle->io_watcher); } #endif /* __linux__ || ____FreeBSD__ */ @@ -729,7 +729,7 @@ int uv__udp_send(uv_udp_send_t* req, memcpy(req->bufs, bufs, nbufs * sizeof(bufs[0])); handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs); handle->send_queue_count++; - QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue); + uv__queue_insert_tail(&handle->write_queue, &req->queue); uv__handle_start(handle); if (empty_queue && !(handle->flags & UV_HANDLE_UDP_PROCESSING)) { @@ -739,7 +739,7 @@ int uv__udp_send(uv_udp_send_t* req, * away. In such cases the `io_watcher` has to be queued for asynchronous * write. */ - if (!QUEUE_EMPTY(&handle->write_queue)) + if (!uv__queue_empty(&handle->write_queue)) uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); } else { uv__io_start(handle->loop, &handle->io_watcher, POLLOUT); @@ -1007,8 +1007,8 @@ int uv__udp_init_ex(uv_loop_t* loop, handle->send_queue_size = 0; handle->send_queue_count = 0; uv__io_init(&handle->io_watcher, uv__udp_io, fd); - QUEUE_INIT(&handle->write_queue); - QUEUE_INIT(&handle->write_completed_queue); + uv__queue_init(&handle->write_queue); + uv__queue_init(&handle->write_completed_queue); return 0; } diff --git a/src/uv-common.c b/src/uv-common.c index cec771fa..916f3f4e 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -533,17 +533,17 @@ int uv_udp_recv_stop(uv_udp_t* handle) { void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) { - QUEUE queue; - QUEUE* q; + struct uv__queue queue; + struct uv__queue* q; uv_handle_t* h; - QUEUE_MOVE(&loop->handle_queue, &queue); - while (!QUEUE_EMPTY(&queue)) { - q = QUEUE_HEAD(&queue); - h = QUEUE_DATA(q, uv_handle_t, handle_queue); + uv__queue_move(&loop->handle_queue, &queue); + while (!uv__queue_empty(&queue)) { + q = uv__queue_head(&queue); + h = uv__queue_data(q, uv_handle_t, handle_queue); - QUEUE_REMOVE(q); - QUEUE_INSERT_TAIL(&loop->handle_queue, q); + uv__queue_remove(q); + uv__queue_insert_tail(&loop->handle_queue, q); if (h->flags & UV_HANDLE_INTERNAL) continue; walk_cb(h, arg); @@ -553,14 +553,14 @@ void uv_walk(uv_loop_t* loop, uv_walk_cb walk_cb, void* arg) { static void uv__print_handles(uv_loop_t* loop, int only_active, FILE* stream) { const char* type; - QUEUE* q; + struct uv__queue* q; uv_handle_t* h; if (loop == NULL) loop = uv_default_loop(); - QUEUE_FOREACH(q, &loop->handle_queue) { - h = QUEUE_DATA(q, uv_handle_t, handle_queue); + uv__queue_foreach(q, &loop->handle_queue) { + h = uv__queue_data(q, uv_handle_t, handle_queue); if (only_active && !uv__is_active(h)) continue; @@ -846,7 +846,7 @@ uv_loop_t* uv_loop_new(void) { int uv_loop_close(uv_loop_t* loop) { - QUEUE* q; + struct uv__queue* q; uv_handle_t* h; #ifndef NDEBUG void* saved_data; @@ -855,8 +855,8 @@ int uv_loop_close(uv_loop_t* loop) { if (uv__has_active_reqs(loop)) return UV_EBUSY; - QUEUE_FOREACH(q, &loop->handle_queue) { - h = QUEUE_DATA(q, uv_handle_t, handle_queue); + uv__queue_foreach(q, &loop->handle_queue) { + h = uv__queue_data(q, uv_handle_t, handle_queue); if (!(h->flags & UV_HANDLE_INTERNAL)) return UV_EBUSY; } diff --git a/src/uv-common.h b/src/uv-common.h index c0f6daf1..cd57e5a3 100644 --- a/src/uv-common.h +++ b/src/uv-common.h @@ -323,7 +323,7 @@ void uv__threadpool_cleanup(void); (h)->loop = (loop_); \ (h)->type = (type_); \ (h)->flags = UV_HANDLE_REF; /* Ref the loop when active. */ \ - QUEUE_INSERT_TAIL(&(loop_)->handle_queue, &(h)->handle_queue); \ + uv__queue_insert_tail(&(loop_)->handle_queue, &(h)->handle_queue); \ uv__handle_platform_init(h); \ } \ while (0) diff --git a/src/win/core.c b/src/win/core.c index 426edb18..2ae8aa7d 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -255,8 +255,8 @@ int uv_loop_init(uv_loop_t* loop) { loop->time = 0; uv_update_time(loop); - QUEUE_INIT(&loop->wq); - QUEUE_INIT(&loop->handle_queue); + uv__queue_init(&loop->wq); + uv__queue_init(&loop->handle_queue); loop->active_reqs.count = 0; loop->active_handles = 0; @@ -358,7 +358,7 @@ void uv__loop_close(uv_loop_t* loop) { } uv_mutex_lock(&loop->wq_mutex); - assert(QUEUE_EMPTY(&loop->wq) && "thread pool work queue not empty!"); + assert(uv__queue_empty(&loop->wq) && "thread pool work queue not empty!"); assert(!uv__has_active_reqs(loop)); uv_mutex_unlock(&loop->wq_mutex); uv_mutex_destroy(&loop->wq_mutex); diff --git a/src/win/handle-inl.h b/src/win/handle-inl.h index 5c843c24..4722e857 100644 --- a/src/win/handle-inl.h +++ b/src/win/handle-inl.h @@ -75,7 +75,7 @@ #define uv__handle_close(handle) \ do { \ - QUEUE_REMOVE(&(handle)->handle_queue); \ + uv__queue_remove(&(handle)->handle_queue); \ uv__active_handle_rm((uv_handle_t*) (handle)); \ \ (handle)->flags |= UV_HANDLE_CLOSED; \ diff --git a/src/win/pipe.c b/src/win/pipe.c index 787ba105..6044259c 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -55,7 +55,7 @@ static const int pipe_prefix_len = sizeof(pipe_prefix) - 1; typedef struct { uv__ipc_socket_xfer_type_t xfer_type; uv__ipc_socket_xfer_info_t xfer_info; - QUEUE member; + struct uv__queue member; } uv__ipc_xfer_queue_item_t; /* IPC frame header flags. */ @@ -111,7 +111,7 @@ int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) { handle->name = NULL; handle->pipe.conn.ipc_remote_pid = 0; handle->pipe.conn.ipc_data_frame.payload_remaining = 0; - QUEUE_INIT(&handle->pipe.conn.ipc_xfer_queue); + uv__queue_init(&handle->pipe.conn.ipc_xfer_queue); handle->pipe.conn.ipc_xfer_queue_length = 0; handle->ipc = ipc; handle->pipe.conn.non_overlapped_writes_tail = NULL; @@ -637,13 +637,13 @@ void uv__pipe_endgame(uv_loop_t* loop, uv_pipe_t* handle) { if (handle->flags & UV_HANDLE_CONNECTION) { /* Free pending sockets */ - while (!QUEUE_EMPTY(&handle->pipe.conn.ipc_xfer_queue)) { - QUEUE* q; + while (!uv__queue_empty(&handle->pipe.conn.ipc_xfer_queue)) { + struct uv__queue* q; SOCKET socket; - q = QUEUE_HEAD(&handle->pipe.conn.ipc_xfer_queue); - QUEUE_REMOVE(q); - xfer_queue_item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); + q = uv__queue_head(&handle->pipe.conn.ipc_xfer_queue); + uv__queue_remove(q); + xfer_queue_item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member); /* Materialize socket and close it */ socket = WSASocketW(FROM_PROTOCOL_INFO, @@ -1062,20 +1062,20 @@ int uv__pipe_accept(uv_pipe_t* server, uv_stream_t* client) { uv_loop_t* loop = server->loop; uv_pipe_t* pipe_client; uv_pipe_accept_t* req; - QUEUE* q; + struct uv__queue* q; uv__ipc_xfer_queue_item_t* item; int err; if (server->ipc) { - if (QUEUE_EMPTY(&server->pipe.conn.ipc_xfer_queue)) { + if (uv__queue_empty(&server->pipe.conn.ipc_xfer_queue)) { /* No valid pending sockets. */ return WSAEWOULDBLOCK; } - q = QUEUE_HEAD(&server->pipe.conn.ipc_xfer_queue); - QUEUE_REMOVE(q); + q = uv__queue_head(&server->pipe.conn.ipc_xfer_queue); + uv__queue_remove(q); server->pipe.conn.ipc_xfer_queue_length--; - item = QUEUE_DATA(q, uv__ipc_xfer_queue_item_t, member); + item = uv__queue_data(q, uv__ipc_xfer_queue_item_t, member); err = uv__tcp_xfer_import( (uv_tcp_t*) client, item->xfer_type, &item->xfer_info); @@ -1829,7 +1829,7 @@ static void uv__pipe_queue_ipc_xfer_info( item->xfer_type = xfer_type; item->xfer_info = *xfer_info; - QUEUE_INSERT_TAIL(&handle->pipe.conn.ipc_xfer_queue, &item->member); + uv__queue_insert_tail(&handle->pipe.conn.ipc_xfer_queue, &item->member); handle->pipe.conn.ipc_xfer_queue_length++; } diff --git a/src/win/tcp.c b/src/win/tcp.c index 6b282e0b..187f36e2 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -175,14 +175,14 @@ int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) { sock = socket(domain, SOCK_STREAM, 0); if (sock == INVALID_SOCKET) { err = WSAGetLastError(); - QUEUE_REMOVE(&handle->handle_queue); + uv__queue_remove(&handle->handle_queue); return uv_translate_sys_error(err); } err = uv__tcp_set_socket(handle->loop, handle, sock, domain, 0); if (err) { closesocket(sock); - QUEUE_REMOVE(&handle->handle_queue); + uv__queue_remove(&handle->handle_queue); return uv_translate_sys_error(err); } diff --git a/src/win/udp.c b/src/win/udp.c index 8a982d19..eab53842 100644 --- a/src/win/udp.c +++ b/src/win/udp.c @@ -146,14 +146,14 @@ int uv__udp_init_ex(uv_loop_t* loop, sock = socket(domain, SOCK_DGRAM, 0); if (sock == INVALID_SOCKET) { err = WSAGetLastError(); - QUEUE_REMOVE(&handle->handle_queue); + uv__queue_remove(&handle->handle_queue); return uv_translate_sys_error(err); } err = uv__udp_set_socket(handle->loop, handle, sock, domain); if (err) { closesocket(sock); - QUEUE_REMOVE(&handle->handle_queue); + uv__queue_remove(&handle->handle_queue); return uv_translate_sys_error(err); } } diff --git a/test/test-queue-foreach-delete.c b/test/test-queue-foreach-delete.c index 75d63f5c..4fe8aece 100644 --- a/test/test-queue-foreach-delete.c +++ b/test/test-queue-foreach-delete.c @@ -29,7 +29,7 @@ * The idea behind the test is as follows. * Certain handle types are stored in a queue internally. * Extra care should be taken for removal of a handle from the queue while iterating over the queue. - * (i.e., QUEUE_REMOVE() called within QUEUE_FOREACH()) + * (i.e., uv__queue_remove() called within uv__queue_foreach()) * This usually happens when someone closes or stops a handle from within its callback. * So we need to check that we haven't screwed the queue on close/stop. * To do so we do the following (for each handle type): @@ -54,7 +54,8 @@ * wrong foreach "next" | * * 4. The callback for handle #1 shouldn't be called because the handle #1 is stopped in the previous step. - * However, if QUEUE_REMOVE() is not handled properly within QUEUE_FOREACH(), the callback _will_ be called. + * However, if uv__queue_remove() is not handled properly within uv__queue_foreach(), the callback _will_ + * be called. */ static const unsigned first_handle_number_idle = 2;