unix: avoid iterating over all async handles
This commit is contained in:
parent
1e32cb01b5
commit
209abbab27
@ -151,6 +151,7 @@ typedef struct {
|
||||
ngx_queue_t async_handles; \
|
||||
uv__io_t async_watcher; \
|
||||
int async_pipefd[2]; \
|
||||
volatile sig_atomic_t async_sweep_needed; \
|
||||
/* RB_HEAD(uv__timers, uv_timer_s) */ \
|
||||
struct uv__timers { \
|
||||
struct uv_timer_s* rbh_root; \
|
||||
|
||||
@ -81,11 +81,25 @@ int uv_async_send(uv_async_t* handle) {
|
||||
return 0; /* already pending */
|
||||
|
||||
do
|
||||
r = write(handle->loop->async_pipefd[1], "x", 1);
|
||||
r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle));
|
||||
while (r == -1 && errno == EINTR);
|
||||
|
||||
if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
|
||||
return uv__set_sys_error(handle->loop, errno);
|
||||
if (r == -1) {
|
||||
/* If pipe is full, event loop should sweep all handles */
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
handle->loop->async_sweep_needed = 1;
|
||||
|
||||
/* Pipe might be drained at this stage, which means that
|
||||
* async_sweep_needed flag wasn't seen yet. Try writing pointer
|
||||
* once again to ensure that pipe isn't drained yet, or to invoke
|
||||
* uv__async_io.
|
||||
*/
|
||||
do
|
||||
r = write(handle->loop->async_pipefd[1], &handle, sizeof(&handle));
|
||||
while (r == -1 && errno == EINTR);
|
||||
} else
|
||||
return uv__set_sys_error(handle->loop, errno);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -114,34 +128,75 @@ static int uv__async_init(uv_loop_t* loop) {
|
||||
}
|
||||
|
||||
|
||||
#define ASYNC_CB(a) \
|
||||
if (((a)->flags & (UV_CLOSING | UV_CLOSED)) == 0 && (a)->pending) { \
|
||||
(a)->pending = 0; \
|
||||
(a)->async_cb((a), 0); \
|
||||
}
|
||||
|
||||
|
||||
static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) {
|
||||
char buf[1024];
|
||||
ngx_queue_t* q;
|
||||
uv_async_t* h;
|
||||
ssize_t r;
|
||||
ssize_t i;
|
||||
ssize_t bytes;
|
||||
ssize_t end;
|
||||
|
||||
bytes = 0;
|
||||
while (1) {
|
||||
r = read(loop->async_pipefd[0], buf, sizeof(buf));
|
||||
r = read(loop->async_pipefd[0], buf + bytes, sizeof(buf) - bytes);
|
||||
|
||||
if (r == sizeof(buf))
|
||||
if (r == -1) {
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||
/* Spin if there're partial data in buffer already */
|
||||
if (bytes)
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
|
||||
abort();
|
||||
}
|
||||
|
||||
bytes += r;
|
||||
|
||||
/* Round up end */
|
||||
end = (bytes / sizeof(h)) * sizeof(h);
|
||||
|
||||
/* If we need to sweep all handles anyway - skip this loop */
|
||||
if (!loop->async_sweep_needed) {
|
||||
for (i = 0; i < end; i += sizeof(h)) {
|
||||
h = *((uv_async_t**) (buf + i));
|
||||
ASYNC_CB(h)
|
||||
}
|
||||
}
|
||||
|
||||
bytes -= end;
|
||||
|
||||
/* Partial read happened */
|
||||
if (bytes) {
|
||||
memmove(buf, buf + end, bytes);
|
||||
/* Spin */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (r != -1)
|
||||
if (r != sizeof(buf))
|
||||
break;
|
||||
|
||||
if (errno == EAGAIN || errno == EWOULDBLOCK)
|
||||
break;
|
||||
|
||||
if (errno == EINTR)
|
||||
continue;
|
||||
|
||||
abort();
|
||||
}
|
||||
|
||||
/* Fast path: no sweep is required */
|
||||
if (!loop->async_sweep_needed)
|
||||
return;
|
||||
|
||||
/* Slow path: sweep all handles */
|
||||
loop->async_sweep_needed = 0;
|
||||
|
||||
ngx_queue_foreach(q, &loop->async_handles) {
|
||||
h = ngx_queue_data(q, uv_async_t, queue);
|
||||
if (!h->pending) continue;
|
||||
h->pending = 0;
|
||||
h->async_cb(h, 0);
|
||||
ASYNC_CB(h)
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,6 +52,7 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
|
||||
loop->time = uv_hrtime() / 1000000;
|
||||
loop->async_pipefd[0] = -1;
|
||||
loop->async_pipefd[1] = -1;
|
||||
loop->async_sweep_needed = 0;
|
||||
loop->signal_pipefd[0] = -1;
|
||||
loop->signal_pipefd[1] = -1;
|
||||
loop->emfile_fd = -1;
|
||||
|
||||
Loading…
Reference in New Issue
Block a user