poll: add UV_DISCONNECT event

It allows detecting the remote socket closing the connection. It's
emitted when `EPOLLRDHUP`(Linux), `EV_EOF`(BSD), `POLLRDHUP`(Solaris,
AIX) and `AFD_POLL_DISCONNECT`(Windows) events are received.

PR-URL: https://github.com/libuv/libuv/pull/691
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Reviewed-By: Fedor Indutny <fedor@indutny.com>
Reviewed-By: Saúl Ibarra Corretgé <saghul@gmail.com>
This commit is contained in:
Santiago Gimeno 2016-01-11 23:00:57 +01:00 committed by Saúl Ibarra Corretgé
parent 42ebae18d6
commit c7c8e916b8
9 changed files with 74 additions and 29 deletions

View File

@ -4,8 +4,8 @@
:c:type:`uv_poll_t` --- Poll handle
===================================
Poll handles are used to watch file descriptors for readability and
writability, similar to the purpose of :man:`poll(2)`.
Poll handles are used to watch file descriptors for readability,
writability and disconnection similar to the purpose of :man:`poll(2)`.
The purpose of poll handles is to enable integrating external libraries that
rely on the event loop to signal it about the socket status changes, like
@ -51,7 +51,8 @@ Data types
enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2
UV_WRITABLE = 2,
UV_DISCONNECT = 4
};
@ -82,10 +83,14 @@ API
.. c:function:: int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb)
Starts polling the file descriptor. `events` is a bitmask consisting made up
of UV_READABLE and UV_WRITABLE. As soon as an event is detected the callback
will be called with `status` set to 0, and the detected events set on the
of UV_READABLE, UV_WRITABLE and UV_DISCONNECT. As soon as an event is detected
the callback will be called with `status` set to 0, and the detected events set on the
`events` field.
The UV_DISCONNECT event is optional in the sense that it may not be
reported and the user is free to ignore it, but it can help optimize the shutdown
path because an extra read or write call might be avoided.
If an error happens while polling, `status` will be < 0 and corresponds
with one of the UV_E* error codes (see :ref:`errors`). The user should
not close the socket while the handle is active. If the user does that
@ -96,6 +101,8 @@ API
Calling :c:func:`uv_poll_start` on a handle that is already active is fine. Doing so
will update the events mask that is being watched for.
.. versionchanged:: 1.9.0 Added the UV_DISCONNECT event.
.. c:function:: int uv_poll_stop(uv_poll_t* poll)
Stop polling the file descriptor, the callback will no longer be called.

View File

@ -714,7 +714,8 @@ struct uv_poll_s {
enum uv_poll_event {
UV_READABLE = 1,
UV_WRITABLE = 2
UV_WRITABLE = 2,
UV_DISCONNECT = 4
};
UV_EXTERN int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd);

View File

@ -825,7 +825,7 @@ void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT)));
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP)));
assert(0 != events);
assert(w->fd >= 0);
assert(w->fd < INT_MAX);
@ -858,7 +858,7 @@ void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT)));
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP)));
assert(0 != events);
if (w->fd == -1)
@ -890,7 +890,7 @@ void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
void uv__io_close(uv_loop_t* loop, uv__io_t* w) {
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT);
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP);
QUEUE_REMOVE(&w->pending_queue);
/* Remove stale events for this file descriptor */
@ -905,7 +905,7 @@ void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
int uv__io_active(const uv__io_t* w, unsigned int events) {
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT)));
assert(0 == (events & ~(UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP)));
assert(0 != events);
return 0 != (w->pevents & events);
}

View File

@ -90,17 +90,19 @@
#endif
#if defined(__linux__)
# define UV__POLLIN UV__EPOLLIN
# define UV__POLLOUT UV__EPOLLOUT
# define UV__POLLERR UV__EPOLLERR
# define UV__POLLHUP UV__EPOLLHUP
# define UV__POLLIN UV__EPOLLIN
# define UV__POLLOUT UV__EPOLLOUT
# define UV__POLLERR UV__EPOLLERR
# define UV__POLLHUP UV__EPOLLHUP
# define UV__POLLRDHUP UV__EPOLLRDHUP
#endif
#if defined(__sun) || defined(_AIX)
# define UV__POLLIN POLLIN
# define UV__POLLOUT POLLOUT
# define UV__POLLERR POLLERR
# define UV__POLLHUP POLLHUP
# define UV__POLLIN POLLIN
# define UV__POLLOUT POLLOUT
# define UV__POLLERR POLLERR
# define UV__POLLHUP POLLHUP
# define UV__POLLRDHUP POLLRDHUP
#endif
#ifndef UV__POLLIN
@ -119,6 +121,10 @@
# define UV__POLLHUP 8
#endif
#ifndef UV__POLLRDHUP
# define UV__POLLRDHUP 0x200
#endif
#if !defined(O_CLOEXEC) && defined(__FreeBSD__)
/*
* It may be that we are just missing `__POSIX_VISIBLE >= 200809`.

View File

@ -259,6 +259,9 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (ev->flags & EV_ERROR)
revents |= UV__POLLERR;
if ((ev->flags & EV_EOF) && (w->pevents & UV__POLLRDHUP))
revents |= UV__POLLRDHUP;
if (revents == 0)
continue;

View File

@ -76,6 +76,7 @@
#define UV__EPOLLOUT 4
#define UV__EPOLLERR 8
#define UV__EPOLLHUP 16
#define UV__EPOLLRDHUP 0x2000
#define UV__EPOLLONESHOT 0x40000000
#define UV__EPOLLET 0x80000000

View File

@ -34,7 +34,7 @@ static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
handle = container_of(w, uv_poll_t, io_watcher);
if (events & UV__POLLERR) {
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT);
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP);
uv__handle_stop(handle);
handle->poll_cb(handle, -EBADF, 0);
return;
@ -45,6 +45,8 @@ static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
pevents |= UV_READABLE;
if (events & UV__POLLOUT)
pevents |= UV_WRITABLE;
if (events & UV__POLLRDHUP)
pevents |= UV_DISCONNECT;
handle->poll_cb(handle, 0, pevents);
}
@ -75,7 +77,9 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
static void uv__poll_stop(uv_poll_t* handle) {
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN | UV__POLLOUT);
uv__io_stop(handle->loop,
&handle->io_watcher,
UV__POLLIN | UV__POLLOUT | UV__POLLRDHUP);
uv__handle_stop(handle);
}
@ -90,7 +94,7 @@ int uv_poll_stop(uv_poll_t* handle) {
int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
int events;
assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert((pevents & ~(UV_READABLE | UV_WRITABLE | UV_DISCONNECT)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));
uv__poll_stop(handle);
@ -103,6 +107,8 @@ int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
events |= UV__POLLIN;
if (pevents & UV_WRITABLE)
events |= UV__POLLOUT;
if (pevents & UV_DISCONNECT)
events |= UV__POLLRDHUP;
uv__io_start(handle->loop, &handle->io_watcher, events);
uv__handle_start(handle);

View File

@ -91,7 +91,11 @@ static void uv__fast_poll_submit_poll_req(uv_loop_t* loop, uv_poll_t* handle) {
handle->mask_events_1 = handle->events;
handle->mask_events_2 = 0;
} else {
assert(0);
/* Just wait until there's an unsubmitted req. */
/* This will happen almost immediately as one of the 2 outstanding */
/* requests is about to return. When this happens, */
/* uv__fast_poll_process_poll_req will be called, and the pending */
/* events, if needed, will be processed in a subsequent request. */
return;
}
@ -107,6 +111,10 @@ static void uv__fast_poll_submit_poll_req(uv_loop_t* loop, uv_poll_t* handle) {
if (handle->events & UV_READABLE) {
afd_poll_info->Handles[0].Events |= AFD_POLL_RECEIVE |
AFD_POLL_DISCONNECT | AFD_POLL_ACCEPT | AFD_POLL_ABORT;
} else {
if (handle->events & UV_DISCONNECT) {
afd_poll_info->Handles[0].Events |= AFD_POLL_DISCONNECT;
}
}
if (handle->events & UV_WRITABLE) {
afd_poll_info->Handles[0].Events |= AFD_POLL_SEND | AFD_POLL_CONNECT_FAIL;
@ -184,6 +192,9 @@ static void uv__fast_poll_process_poll_req(uv_loop_t* loop, uv_poll_t* handle,
if ((afd_poll_info->Handles[0].Events & (AFD_POLL_RECEIVE |
AFD_POLL_DISCONNECT | AFD_POLL_ACCEPT | AFD_POLL_ABORT)) != 0) {
events |= UV_READABLE;
if ((afd_poll_info->Handles[0].Events & AFD_POLL_DISCONNECT) != 0) {
events |= UV_DISCONNECT;
}
}
if ((afd_poll_info->Handles[0].Events & (AFD_POLL_SEND |
AFD_POLL_CONNECT_FAIL)) != 0) {
@ -218,7 +229,7 @@ static void uv__fast_poll_process_poll_req(uv_loop_t* loop, uv_poll_t* handle,
static int uv__fast_poll_set(uv_loop_t* loop, uv_poll_t* handle, int events) {
assert(handle->type == UV_POLL);
assert(!(handle->flags & UV__HANDLE_CLOSING));
assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert((events & ~(UV_READABLE | UV_WRITABLE | UV_DISCONNECT)) == 0);
handle->events = events;

View File

@ -51,7 +51,7 @@ typedef struct connection_context_s {
size_t read, sent;
int is_server_connection;
int open_handles;
int got_fin, sent_fin;
int got_fin, sent_fin, got_disconnect;
unsigned int events, delayed_events;
} connection_context_t;
@ -72,6 +72,8 @@ static int closed_connections = 0;
static int valid_writable_wakeups = 0;
static int spurious_writable_wakeups = 0;
static int disconnects = 0;
static int got_eagain(void) {
#ifdef _WIN32
@ -142,6 +144,7 @@ static connection_context_t* create_connection_context(
context->delayed_events = 0;
context->got_fin = 0;
context->sent_fin = 0;
context->got_disconnect = 0;
r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock);
context->open_handles++;
@ -375,7 +378,13 @@ static void connection_poll_cb(uv_poll_t* handle, int status, int events) {
}
}
if (context->got_fin && context->sent_fin) {
if (events & UV_DISCONNECT) {
context->got_disconnect = 1;
++disconnects;
new_events &= ~UV_DISCONNECT;
}
if (context->got_fin && context->sent_fin && context->got_disconnect) {
/* Sent and received FIN. Close and destroy context. */
close_socket(context->sock);
destroy_connection_context(context);
@ -463,9 +472,9 @@ static void server_poll_cb(uv_poll_t* handle, int status, int events) {
#endif
connection_context = create_connection_context(sock, 1);
connection_context->events = UV_READABLE | UV_WRITABLE;
connection_context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
r = uv_poll_start(&connection_context->poll_handle,
UV_READABLE | UV_WRITABLE,
UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
connection_poll_cb);
ASSERT(r == 0);
@ -507,9 +516,9 @@ static void start_client(void) {
sock = create_bound_socket(addr);
context = create_connection_context(sock, 0);
context->events = UV_READABLE | UV_WRITABLE;
context->events = UV_READABLE | UV_WRITABLE | UV_DISCONNECT;
r = uv_poll_start(&context->poll_handle,
UV_READABLE | UV_WRITABLE,
UV_READABLE | UV_WRITABLE | UV_DISCONNECT,
connection_poll_cb);
ASSERT(r == 0);
@ -543,6 +552,7 @@ static void start_poll_test(void) {
spurious_writable_wakeups > 20);
ASSERT(closed_connections == NUM_CLIENTS * 2);
ASSERT(disconnects == NUM_CLIENTS * 2);
MAKE_VALGRIND_HAPPY();
}