From e1154d70ceb0e6e30b6f3cd4fa879bef17a162e2 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 25 Apr 2012 00:15:59 +0200 Subject: [PATCH 1/6] Api for polling external sockets --- include/uv.h | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/include/uv.h b/include/uv.h index 97d419d2..78a2f156 100644 --- a/include/uv.h +++ b/include/uv.h @@ -164,6 +164,7 @@ typedef enum { #define XX(uc, lc) UV_##uc, UV_HANDLE_TYPE_MAP(XX) #undef XX + UV_POLL, UV_FILE, UV_HANDLE_TYPE_PRIVATE UV_HANDLE_TYPE_MAX @@ -189,6 +190,7 @@ typedef struct uv_tcp_s uv_tcp_t; typedef struct uv_udp_s uv_udp_t; typedef struct uv_pipe_s uv_pipe_t; typedef struct uv_tty_s uv_tty_t; +typedef struct uv_poll_s uv_poll_t; typedef struct uv_timer_s uv_timer_t; typedef struct uv_prepare_s uv_prepare_t; typedef struct uv_check_s uv_check_t; @@ -288,6 +290,7 @@ typedef void (*uv_connect_cb)(uv_connect_t* req, int status); typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); typedef void (*uv_connection_cb)(uv_stream_t* server, int status); typedef void (*uv_close_cb)(uv_handle_t* handle); +typedef void (*uv_poll_cb)(uv_poll_t* handle, int status, int events); typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); /* TODO: do these really need a status argument? */ typedef void (*uv_async_cb)(uv_async_t* handle, int status); @@ -926,6 +929,74 @@ UV_EXTERN void uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, UV_EXTERN void uv_pipe_pending_instances(uv_pipe_t* handle, int count); +/* + * uv_poll_t is a subclass of uv_handle_t. + * + * The uv_poll watcher is used to watch file descriptors for readability and + * writability, similar to the purpose of poll(2). + * + * The purpose of uv_poll is to enable integrating external libraries that + * rely on the event loop to signal it about the socket status changes, like + * c-ares or libssh2. Using uv_poll_t for any other other purpose is not + * recommended; uv_tcp_t, uv_udp_t, etc. provide an implementation that is + * much faster and more scalable than what can be achieved with uv_poll_t, + * especially on Windows. + * + * It is possible that uv_poll occasionally signals that a file descriptor is + * readable or writable even when it isn't. The user should therefore always + * be prepared to handle EAGAIN or equivalent when it attempts to read from or + * write to the fd. + * + * It is not okay to have multiple active uv_poll watchers for the same socket. + * This can cause libuv to busyloop or otherwise malfunction. + * + * The user should not close a file descriptor while it is being polled by an + * active uv_poll watcher. This can cause the poll watcher to report an error, + * but it might also start polling another socket. However the fd can be safely + * closed immediately after a call to uv_poll_stop() or uv_close(). + * + * On windows only sockets can be polled with uv_poll. On unix any file + * descriptor that would be accepted by poll(2) can be used with uv_poll. + */ +struct uv_poll_s { + UV_HANDLE_FIELDS + uv_poll_cb poll_cb; + UV_POLL_PRIVATE_FIELDS +}; + +enum uv_poll_event { + UV_READABLE = 1, + UV_WRITABLE = 2 +}; + +/* Initialize the poll watcher using a file descriptor. */ +UV_EXTERN int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd); + +/* Initialize the poll watcher using a socket descriptor. On unix this is */ +/* identical to uv_poll_init. On windows it takes a SOCKET handle. */ +UV_EXTERN int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, + uv_platform_socket_t socket); + +/* + * Starts polling the file descriptor. `events` is a bitmask consisting made up + * of UV_READABLE and UV_WRITABLE. As soon as an event is detected the callback + * will be called with `status` set to 0, and the detected events set en the + * `events` field. + * + * If an error happens while polling status may be set to -1 and the error + * code can be retrieved with uv_last_error. The user should not close the + * socket while uv_poll is active. If the user does that anyway, the callback + * *may* be called reporting an error status, but this is not guaranteed. + * + * Calling uv_poll_start on an uv_poll watcher that is already active is fine. + * Doing so will update the events mask that is being watched for. + */ +UV_EXTERN int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb); + +/* Stops polling the file descriptor. */ +UV_EXTERN int uv_poll_stop(uv_poll_t* handle); + + /* * uv_prepare_t is a subclass of uv_handle_t. * @@ -1543,6 +1614,7 @@ struct uv_counters_s { uint64_t udp_init; uint64_t pipe_init; uint64_t tty_init; + uint64_t poll_init; uint64_t prepare_init; uint64_t check_init; uint64_t idle_init; From d7a71761c4ee6b7d02d98f7e3a6efdffbadcb32a Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 25 Apr 2012 00:25:01 +0200 Subject: [PATCH 2/6] Windows: implement uv_poll --- include/uv-private/uv-win.h | 23 ++ src/win/core.c | 10 + src/win/handle.c | 11 + src/win/internal.h | 13 + src/win/poll.c | 609 ++++++++++++++++++++++++++++++++++++ src/win/req.c | 4 + uv.gyp | 1 + 7 files changed, 671 insertions(+) create mode 100644 src/win/poll.c diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index f6545d6c..49a4bac4 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -146,6 +146,8 @@ typedef struct _AFD_POLL_INFO { AFD_POLL_HANDLE_INFO Handles[1]; } AFD_POLL_INFO, *PAFD_POLL_INFO; +#define UV_MSAFD_PROVIDER_COUNT 3 + /** * It should be possible to cast uv_buf_t[] to WSABUF[] @@ -158,6 +160,8 @@ typedef struct uv_buf_t { typedef int uv_file; +typedef SOCKET uv_platform_socket_t; + typedef HANDLE uv_thread_t; typedef CRITICAL_SECTION uv_mutex_t; @@ -219,6 +223,9 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_prepare_t* next_prepare_handle; \ uv_check_t* next_check_handle; \ uv_idle_t* next_idle_handle; \ + /* This handle holds the peer sockets for the fast variant of uv_poll_t */ \ + SOCKET poll_peer_sockets[UV_MSAFD_PROVIDER_COUNT]; \ + /* State used by uv_ares. */ \ ares_channel ares_chan; \ int ares_active_sockets; \ uv_timer_t ares_polling_timer; \ @@ -237,6 +244,7 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); UV_ARES_CLEANUP_REQ, \ UV_FS_EVENT_REQ, \ UV_GETADDRINFO_REQ, \ + UV_POLL_REQ, \ UV_PROCESS_EXIT, \ UV_PROCESS_CLOSE, \ UV_READ, \ @@ -386,6 +394,21 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); COORD saved_position; \ WORD saved_attributes; +#define UV_POLL_PRIVATE_FIELDS \ + SOCKET socket; \ + /* Used in fast mode */ \ + SOCKET peer_socket; \ + AFD_POLL_INFO afd_poll_info_1; \ + AFD_POLL_INFO afd_poll_info_2; \ + /* Used in fast and slow mode. */ \ + uv_req_t poll_req_1; \ + uv_req_t poll_req_2; \ + unsigned char submitted_events_1; \ + unsigned char submitted_events_2; \ + unsigned char mask_events_1; \ + unsigned char mask_events_2; \ + unsigned char events; + #define UV_TIMER_PRIVATE_FIELDS \ RB_ENTRY(uv_timer_s) tree_entry; \ int64_t due; \ diff --git a/src/win/core.c b/src/win/core.c index 1358cb29..e698d75c 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -84,6 +84,8 @@ static void uv_loop_init(uv_loop_t* loop) { loop->next_check_handle = NULL; loop->next_idle_handle = NULL; + memset(&loop->poll_peer_sockets, 0, sizeof loop->poll_peer_sockets); + loop->ares_active_sockets = 0; loop->ares_chan = NULL; @@ -130,6 +132,14 @@ uv_loop_t* uv_loop_new(void) { void uv_loop_delete(uv_loop_t* loop) { if (loop != &uv_default_loop_) { + int i; + for (i = 0; i < ARRAY_SIZE(loop->poll_peer_sockets); i++) { + SOCKET sock = loop->poll_peer_sockets[i]; + if (sock != 0 && sock != INVALID_SOCKET) { + closesocket(sock); + } + } + free(loop); } } diff --git a/src/win/handle.c b/src/win/handle.c index 797b0716..39bd7dd5 100644 --- a/src/win/handle.c +++ b/src/win/handle.c @@ -64,6 +64,9 @@ int uv_is_active(const uv_handle_t* handle) { case UV_CHECK: return (handle->flags & UV_HANDLE_ACTIVE) ? 1 : 0; + case UV_POLL: + return ((uv_poll_t*) handle)->events != 0; + default: return 1; } @@ -112,6 +115,10 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) { } return; + case UV_POLL: + uv_poll_close(handle->loop, (uv_poll_t*) handle); + return; + case UV_TIMER: uv_timer_stop((uv_timer_t*)handle); uv_want_endgame(loop, handle); @@ -195,6 +202,10 @@ void uv_process_endgames(uv_loop_t* loop) { uv_udp_endgame(loop, (uv_udp_t*) handle); break; + case UV_POLL: + uv_poll_endgame(loop, (uv_poll_t*) handle); + break; + case UV_TIMER: uv_timer_endgame(loop, (uv_timer_t*) handle); break; diff --git a/src/win/internal.h b/src/win/internal.h index 3ff4665d..0e647739 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -84,6 +84,9 @@ void uv_process_timers(uv_loop_t* loop); #define UV_HANDLE_TTY_SAVED_POSITION 0x02000000 #define UV_HANDLE_TTY_SAVED_ATTRIBUTES 0x04000000 +/* Only used by uv_poll_t handles. */ +#define UV_HANDLE_POLL_SLOW 0x02000000 + void uv_want_endgame(uv_loop_t* loop, uv_handle_t* handle); void uv_process_endgames(uv_loop_t* loop); @@ -230,6 +233,16 @@ void uv_process_tty_connect_req(uv_loop_t* loop, uv_tty_t* handle, void uv_tty_endgame(uv_loop_t* loop, uv_tty_t* handle); +/* + * Poll watchers + */ +void uv_process_poll_req(uv_loop_t* loop, uv_poll_t* handle, + uv_req_t* req); + +void uv_poll_close(uv_loop_t* loop, uv_poll_t* handle); +void uv_poll_endgame(uv_loop_t* loop, uv_poll_t* handle); + + /* * Loop watchers */ diff --git a/src/win/poll.c b/src/win/poll.c new file mode 100644 index 00000000..f41433cc --- /dev/null +++ b/src/win/poll.c @@ -0,0 +1,609 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + + +#include +#include + +#include "uv.h" +#include "../uv-common.h" +#include "internal.h" + + +static const GUID uv_msafd_provider_ids[UV_MSAFD_PROVIDER_COUNT] = { + {0xe70f1aa0, 0xab8b, 0x11cf, + {0x8c, 0xa3, 0x00, 0x80, 0x5f, 0x48, 0xa1, 0x92}}, + {0xf9eab0c0, 0x26d4, 0x11d0, + {0xbb, 0xbf, 0x00, 0xaa, 0x00, 0x6c, 0x34, 0xe4}}, + {0x9fc48064, 0x7298, 0x43e4, + {0xb7, 0xbd, 0x18, 0x1f, 0x20, 0x89, 0x79, 0x2a}} +}; + +typedef struct uv_single_fd_set_s { + unsigned int fd_count; + SOCKET fd_array[1]; +} uv_single_fd_set_t; + + +static void uv__fast_poll_submit_poll_req(uv_loop_t* loop, uv_poll_t* handle) { + uv_req_t* req; + AFD_POLL_INFO* afd_poll_info; + DWORD result; + + /* Find a yet unsubmitted req to submit. */ + if (handle->submitted_events_1 == 0) { + req = &handle->poll_req_1; + afd_poll_info = &handle->afd_poll_info_1; + handle->submitted_events_1 = handle->events; + handle->mask_events_1 = 0; + handle->mask_events_2 = handle->events; + } else if (handle->submitted_events_2 == 0) { + req = &handle->poll_req_2; + afd_poll_info = &handle->afd_poll_info_2; + handle->submitted_events_2 = handle->events; + handle->mask_events_1 = handle->events; + handle->mask_events_2 = 0; + } else { + assert(0); + } + + /* Setting Exclusive to TRUE makes the other poll request return if there */ + /* is any. */ + afd_poll_info->Exclusive = TRUE; + afd_poll_info->NumberOfHandles = 1; + afd_poll_info->Timeout.QuadPart = INT64_MAX; + afd_poll_info->Handles[0].Handle = (HANDLE) handle->socket; + afd_poll_info->Handles[0].Status = 0; + afd_poll_info->Handles[0].Events = 0; + + if (handle->events & UV_READABLE) { + afd_poll_info->Handles[0].Events |= AFD_POLL_RECEIVE | + AFD_POLL_DISCONNECT | AFD_POLL_ACCEPT | AFD_POLL_ABORT; + } + if (handle->events & UV_WRITABLE) { + afd_poll_info->Handles[0].Events |= AFD_POLL_SEND | AFD_POLL_CONNECT_FAIL; + } + + memset(&req->overlapped, 0, sizeof req->overlapped); + + result = uv_msafd_poll((SOCKET) handle->peer_socket, + afd_poll_info, + &req->overlapped); + if (result != 0 && WSAGetLastError() != WSA_IO_PENDING) { + /* Queue this req, reporting an error. */ + SET_REQ_ERROR(req, WSAGetLastError()); + uv_insert_pending_req(loop, req); + } +} + + +static int uv__fast_poll_cancel_poll_reqs(uv_loop_t* loop, uv_poll_t* handle) { + AFD_POLL_INFO afd_poll_info; + DWORD result; + HANDLE event; + OVERLAPPED overlapped; + + event = CreateEvent(NULL, TRUE, FALSE, NULL); + if (event == NULL) { + uv__set_sys_error(loop, GetLastError()); + return -1; + } + + afd_poll_info.Exclusive = TRUE; + afd_poll_info.NumberOfHandles = 1; + afd_poll_info.Timeout.QuadPart = INT64_MAX; + afd_poll_info.Handles[0].Handle = (HANDLE) handle->socket; + afd_poll_info.Handles[0].Status = 0; + afd_poll_info.Handles[0].Events = AFD_POLL_ALL; + + memset(&overlapped, 0, sizeof overlapped); + overlapped.hEvent = (HANDLE) ((uintptr_t) event & 1); + + result = uv_msafd_poll(handle->socket, + &afd_poll_info, + &overlapped); + + if (result == SOCKET_ERROR) { + DWORD error = WSAGetLastError(); + if (error != WSA_IO_PENDING) { + uv__set_sys_error(loop, WSAGetLastError()); + CloseHandle(event); + return -1; + } + } + + CloseHandle(event); + return 0; +} + + +static void uv__fast_poll_process_poll_req(uv_loop_t* loop, uv_poll_t* handle, + uv_req_t* req) { + unsigned char mask_events; + AFD_POLL_INFO* afd_poll_info; + + if (req == &handle->poll_req_1) { + afd_poll_info = &handle->afd_poll_info_1; + handle->submitted_events_1 = 0; + mask_events = handle->mask_events_1; + } else if (req == &handle->poll_req_2) { + afd_poll_info = &handle->afd_poll_info_2; + handle->submitted_events_2 = 0; + mask_events = handle->mask_events_2; + } else { + assert(0); + } + + /* Report an error unless the select was just interrupted. */ + if (!REQ_SUCCESS(req)) { + DWORD error = GET_REQ_SOCK_ERROR(req); + if (error != WSAEINTR && handle->events != 0) { + handle->events = 0; /* Stop the watcher */ + uv__set_sys_error(loop, error); + handle->poll_cb(handle, -1, 0); + } + + } else if (afd_poll_info->NumberOfHandles >= 1) { + unsigned char events = 0; + + if ((afd_poll_info->Handles[0].Events & (AFD_POLL_RECEIVE | + AFD_POLL_DISCONNECT | AFD_POLL_ACCEPT | AFD_POLL_ABORT)) != 0) { + events |= UV_READABLE; + } + if ((afd_poll_info->Handles[0].Events & (AFD_POLL_SEND | + AFD_POLL_CONNECT_FAIL)) != 0) { + events |= UV_WRITABLE; + } + + events &= handle->events & ~mask_events; + + if (afd_poll_info->Handles[0].Events & AFD_POLL_LOCAL_CLOSE) { + /* Stop polling. */ + handle->events = 0; + } + + if (events != 0) { + handle->poll_cb(handle, 0, events); + } + } + + if ((handle->events & ~(handle->submitted_events_1 | + handle->submitted_events_2)) != 0) { + uv__fast_poll_submit_poll_req(loop, handle); + } else if ((handle->flags & UV_HANDLE_CLOSING) && + handle->submitted_events_1 == 0 && + handle->submitted_events_2 == 0) { + uv_want_endgame(loop, (uv_handle_t*) handle); + } +} + + +static int uv__fast_poll_set(uv_loop_t* loop, uv_poll_t* handle, int events) { + assert(handle->type == UV_POLL); + assert(!(handle->flags & UV_HANDLE_CLOSING)); + assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0); + + handle->events = events; + if ((handle->events & ~(handle->submitted_events_1 | + handle->submitted_events_2)) != 0) { + uv__fast_poll_submit_poll_req(handle->loop, handle); + } + + return 0; +} + + +static void uv__fast_poll_close(uv_loop_t* loop, uv_poll_t* handle) { + handle->events = 0; + + if (handle->submitted_events_1 == 0 && + handle->submitted_events_2 == 0) { + uv_want_endgame(loop, (uv_handle_t*) handle); + } else { + /* Try to cancel outstanding poll requests. */ + if (pCancelIoEx) { + /* Use CancelIoEx to cancel poll requests if available. */ + if (handle->submitted_events_1) + pCancelIoEx((HANDLE) handle->socket, &handle->poll_req_1.overlapped); + if (handle->submitted_events_2) + pCancelIoEx((HANDLE) handle->socket, &handle->poll_req_2.overlapped); + } else if (handle->submitted_events_1 | handle->submitted_events_2) { + /* Execute another unique poll to force the others to return. */ + uv__fast_poll_cancel_poll_reqs(loop, handle); + } + } +} + + +static SOCKET uv__fast_poll_create_peer_socket(HANDLE iocp, + WSAPROTOCOL_INFOW* protocol_info) { + SOCKET sock = 0; + + sock = WSASocketW(protocol_info->iAddressFamily, + protocol_info->iSocketType, + protocol_info->iProtocol, + protocol_info, + 0, + WSA_FLAG_OVERLAPPED); + if (sock == INVALID_SOCKET) { + return INVALID_SOCKET; + } + + if (!SetHandleInformation((HANDLE) sock, HANDLE_FLAG_INHERIT, 0)) { + goto error; + }; + + if (CreateIoCompletionPort((HANDLE) sock, + iocp, + (ULONG_PTR) sock, + 0) == NULL) { + goto error; + } + + return sock; + + error: + closesocket(sock); + return INVALID_SOCKET; +} + + +static SOCKET uv__fast_poll_get_peer_socket(uv_loop_t* loop, + WSAPROTOCOL_INFOW* protocol_info) { + int index, i; + SOCKET peer_socket; + + index = -1; + for (i = 0; i < ARRAY_SIZE(uv_msafd_provider_ids); i++) { + if (memcmp((void*) &protocol_info->ProviderId, + (void*) &uv_msafd_provider_ids[i], + sizeof protocol_info->ProviderId) == 0) { + index = i; + } + } + + /* Check if the protocol uses an msafd socket. */ + if (index < 0) { + return INVALID_SOCKET; + } + + /* If we didn't (try) to create a peer socket yet, try to make one. Don't */ + /* try again if the peer socket creation failed earlier for the same */ + /* protocol. */ + peer_socket = loop->poll_peer_sockets[index]; + if (peer_socket == 0) { + peer_socket = uv__fast_poll_create_peer_socket(loop->iocp, protocol_info); + loop->poll_peer_sockets[index] = peer_socket; + } + + return peer_socket; +} + + +static DWORD WINAPI uv__slow_poll_thread_proc(void* arg) { + uv_req_t* req = (uv_req_t*) arg; + uv_poll_t* handle = (uv_poll_t*) req->data; + unsigned char events, reported_events; + int r; + uv_single_fd_set_t rfds, wfds, efds; + struct timeval timeout; + + assert(handle->type == UV_POLL); + assert(req->type == UV_POLL_REQ); + + if (req == &handle->poll_req_1) { + events = handle->submitted_events_1; + } else if (req == &handle->poll_req_2) { + events = handle->submitted_events_2; + } else { + assert(0); + } + + if (handle->events & UV_READABLE) { + rfds.fd_count = 1; + rfds.fd_array[0] = handle->socket; + } else { + rfds.fd_count = 0; + } + + if (handle->events & UV_WRITABLE) { + wfds.fd_count = 1; + wfds.fd_array[0] = handle->socket; + efds.fd_count = 1; + efds.fd_array[0] = handle->socket; + } else { + wfds.fd_count = 0; + efds.fd_count = 0; + } + + /* Make the select() time out after 3 minutes. If select() hangs because */ + /* the user closed the socket, we will at least not hang indefinitely. */ + timeout.tv_sec = 3 * 60; + timeout.tv_usec = 0; + + r = select(1, (fd_set*) &rfds, (fd_set*) &wfds, (fd_set*) &efds, &timeout); + if (r == SOCKET_ERROR) { + /* Queue this req, reporting an error. */ + SET_REQ_ERROR(&handle->poll_req_1, WSAGetLastError()); + POST_COMPLETION_FOR_REQ(handle->loop, req); + return 0; + } + + reported_events = 0; + + if (r > 0) { + if (rfds.fd_count > 0) { + assert(rfds.fd_count == 1); + assert(rfds.fd_array[0] == handle->socket); + reported_events |= UV_READABLE; + } + + if (wfds.fd_count > 0) { + assert(wfds.fd_count == 1); + assert(wfds.fd_array[0] == handle->socket); + reported_events |= UV_WRITABLE; + } else if (efds.fd_count > 0) { + assert(efds.fd_count == 1); + assert(efds.fd_array[0] == handle->socket); + reported_events |= UV_WRITABLE; + } + } + + SET_REQ_SUCCESS(req); + req->overlapped.InternalHigh = (DWORD) reported_events; + POST_COMPLETION_FOR_REQ(handle->loop, req); + + return 0; +} + + +static void uv__slow_poll_submit_poll_req(uv_loop_t* loop, uv_poll_t* handle) { + uv_req_t* req; + + /* Find a yet unsubmitted req to submit. */ + if (handle->submitted_events_1 == 0) { + req = &handle->poll_req_1; + handle->submitted_events_1 = handle->events; + handle->mask_events_1 = 0; + handle->mask_events_2 = handle->events; + } else if (handle->submitted_events_2 == 0) { + req = &handle->poll_req_2; + handle->submitted_events_2 = handle->events; + handle->mask_events_1 = handle->events; + handle->mask_events_2 = 0; + } else { + assert(0); + } + + if (!QueueUserWorkItem(uv__slow_poll_thread_proc, + (void*) req, + WT_EXECUTELONGFUNCTION)) { + /* Make this req pending, reporting an error. */ + SET_REQ_ERROR(req, GetLastError()); + uv_insert_pending_req(loop, req); + } +} + + + +static void uv__slow_poll_process_poll_req(uv_loop_t* loop, uv_poll_t* handle, + uv_req_t* req) { + unsigned char mask_events; + if (req == &handle->poll_req_1) { + handle->submitted_events_1 = 0; + mask_events = handle->mask_events_1; + } else if (req == &handle->poll_req_2) { + handle->submitted_events_2 = 0; + mask_events = handle->mask_events_2; + } else { + assert(0); + } + + if (!REQ_SUCCESS(req)) { + /* Error. */ + if (handle->events != 0) { + handle->events = 0; /* Stop the watcher */ + uv__set_sys_error(loop, GET_REQ_ERROR(req)); + handle->poll_cb(handle, -1, 0); + } + } else { + /* Got some events. */ + int events = req->overlapped.InternalHigh & handle->events & ~mask_events; + if (events != 0) { + handle->poll_cb(handle, 0, events); + } + } + + if ((handle->events & ~(handle->submitted_events_1 | + handle->submitted_events_2)) != 0) { + uv__slow_poll_submit_poll_req(loop, handle); + } else if ((handle->flags & UV_HANDLE_CLOSING) && + handle->submitted_events_1 == 0 && + handle->submitted_events_2 == 0) { + uv_want_endgame(loop, (uv_handle_t*) handle); + } +} + + +static int uv__slow_poll_set(uv_loop_t* loop, uv_poll_t* handle, int events) { + assert(handle->type == UV_POLL); + assert(!(handle->flags & UV_HANDLE_CLOSING)); + assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0); + + handle->events = events; + if ((handle->events & + ~(handle->submitted_events_1 | handle->submitted_events_2)) != 0) { + uv__slow_poll_submit_poll_req(handle->loop, handle); + } + + return 0; +} + + +static void uv__slow_poll_close(uv_loop_t* loop, uv_poll_t* handle) { + handle->events = 0; + + if (handle->submitted_events_1 == 0 && + handle->submitted_events_2 == 0) { + uv_want_endgame(loop, (uv_handle_t*) handle); + } +} + + +int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { + return uv_poll_init_socket(loop, handle, (SOCKET) _get_osfhandle(fd)); +} + + +int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, + uv_platform_socket_t socket) { + WSAPROTOCOL_INFOW protocol_info; + int len; + SOCKET peer_socket, base_socket; + DWORD bytes; + + /* Try to obtain a base handle for the socket. This increases this chances */ + /* that we find an AFD handle and are able to use the fast poll mechanism. */ + /* This will always fail on windows XP/2k3, since they don't support the */ + /* SIO_BASE_HANDLE ioctl. */ +#ifndef NDEBUG + base_socket = INVALID_SOCKET; +#endif + + if (WSAIoctl(socket, + SIO_BASE_HANDLE, + NULL, + 0, + &base_socket, + sizeof base_socket, + &bytes, + NULL, + NULL) == 0) { + assert(base_socket != 0 && base_socket != INVALID_SOCKET); + socket = base_socket; + } + + handle->type = UV_POLL; + handle->socket = socket; + handle->loop = loop; + handle->flags = 0; + handle->events = 0; + + /* Obtain protocol information about the socket. */ + len = sizeof protocol_info; + if (getsockopt(socket, + SOL_SOCKET, + SO_PROTOCOL_INFOW, + (char*) &protocol_info, + &len) != 0) { + uv__set_sys_error(loop, WSAGetLastError()); + return -1; + } + + /* Get the peer socket that is needed to enable fast poll. If the returned */ + /* value is NULL, the protocol is not implemented by MSAFD and we'll have */ + /* to use slow mode. */ + peer_socket = uv__fast_poll_get_peer_socket(loop, &protocol_info); + + if (peer_socket != INVALID_SOCKET) { + /* Initialize fast poll specific fields. */ + handle->peer_socket = peer_socket; + } else { + /* Initialize slow poll specific fields. */ + handle->flags |= UV_HANDLE_POLL_SLOW; + } + + /* Intialize 2 poll reqs. */ + handle->submitted_events_1 = 0; + uv_req_init(loop, (uv_req_t*) &(handle->poll_req_1)); + handle->poll_req_1.type = UV_POLL_REQ; + handle->poll_req_1.data = handle; + + handle->submitted_events_2 = 0; + uv_req_init(loop, (uv_req_t*) &(handle->poll_req_2)); + handle->poll_req_2.type = UV_POLL_REQ; + handle->poll_req_2.data = handle; + + uv_ref(loop); + + loop->counters.handle_init++; + loop->counters.poll_init++; + + return 0; +} + + +int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb cb) { + if (!(handle->flags & UV_HANDLE_POLL_SLOW)) { + if (uv__fast_poll_set(handle->loop, handle, events) < 0) + return -1; + } else { + if (uv__slow_poll_set(handle->loop, handle, events) < 0) + return -1; + } + + handle->poll_cb = cb; + return 0; +} + + +int uv_poll_stop(uv_poll_t* handle) { + if (!(handle->flags & UV_HANDLE_POLL_SLOW)) { + return uv__fast_poll_set(handle->loop, handle, 0); + } else { + return uv__slow_poll_set(handle->loop, handle, 0); + } +} + + +void uv_process_poll_req(uv_loop_t* loop, uv_poll_t* handle, uv_req_t* req) { + if (!(handle->flags & UV_HANDLE_POLL_SLOW)) { + uv__fast_poll_process_poll_req(loop, handle, req); + } else { + uv__slow_poll_process_poll_req(loop, handle, req); + } +} + + +void uv_poll_close(uv_loop_t* loop, uv_poll_t* handle) { + if (!(handle->flags & UV_HANDLE_POLL_SLOW)) { + uv__fast_poll_close(loop, handle); + } else { + uv__slow_poll_close(loop, handle); + } +} + + +void uv_poll_endgame(uv_loop_t* loop, uv_poll_t* handle) { + assert(handle->flags & UV_HANDLE_CLOSING); + assert(!(handle->flags & UV_HANDLE_CLOSED)); + + assert(handle->submitted_events_1 == 0); + assert(handle->submitted_events_2 == 0); + + handle->flags |= UV_HANDLE_CLOSED; + + if (handle->close_cb) { + handle->close_cb((uv_handle_t*)handle); + } + + uv_unref(loop); +} diff --git a/src/win/req.c b/src/win/req.c index 65aa6c15..dcfbd9c5 100644 --- a/src/win/req.c +++ b/src/win/req.c @@ -135,6 +135,10 @@ void uv_process_reqs(uv_loop_t* loop) { uv_process_async_wakeup_req(loop, (uv_async_t*) req->data, req); break; + case UV_POLL_REQ: + uv_process_poll_req(loop, (uv_poll_t*) req->data, req); + break; + case UV_ARES_EVENT_REQ: uv_process_ares_event_req(loop, (uv_ares_action_t*) req->data, req); break; diff --git a/uv.gyp b/uv.gyp index d03ea568..ba9128eb 100644 --- a/uv.gyp +++ b/uv.gyp @@ -145,6 +145,7 @@ 'src/win/loop-watcher.c', 'src/win/pipe.c', 'src/win/thread.c', + 'src/win/poll.c', 'src/win/process.c', 'src/win/req.c', 'src/win/stream.c', From d60d94e0c36cc45ab32773e3a7c47173a784e1f9 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Thu, 3 May 2012 01:43:56 +0200 Subject: [PATCH 3/6] Unix: implement uv_poll --- config-unix.mk | 1 + include/uv-private/uv-unix.h | 7 ++ src/unix/core.c | 9 +++ src/unix/internal.h | 4 ++ src/unix/poll.c | 130 +++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 6 files changed, 152 insertions(+) create mode 100644 src/unix/poll.c diff --git a/config-unix.mk b/config-unix.mk index cdcf5eaf..8d26fc44 100644 --- a/config-unix.mk +++ b/config-unix.mk @@ -37,6 +37,7 @@ OBJS += src/unix/fs.o OBJS += src/unix/idle.o OBJS += src/unix/loop.o OBJS += src/unix/pipe.o +OBJS += src/unix/poll.o OBJS += src/unix/prepare.o OBJS += src/unix/process.o OBJS += src/unix/stream.o diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index e190b853..08b2d120 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -45,6 +45,8 @@ typedef struct { typedef int uv_file; +typedef int uv_platform_socket_t; + #define UV_ONCE_INIT PTHREAD_ONCE_INIT typedef pthread_once_t uv_once_t; @@ -162,6 +164,11 @@ typedef void* uv_lib_t; const char* pipe_fname; /* strdup'ed */ +/* UV_POLL */ +#define UV_POLL_PRIVATE_FIELDS \ + ev_io io_watcher; + + /* UV_PREPARE */ \ #define UV_PREPARE_PRIVATE_FIELDS \ ev_prepare prepare_watcher; \ diff --git a/src/unix/core.c b/src/unix/core.c index 9602b642..c5a7da5f 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -107,6 +107,10 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { uv__fs_event_close((uv_fs_event_t*)handle); break; + case UV_POLL: + uv__poll_close((uv_poll_t*)handle); + break; + default: assert(0); } @@ -249,6 +253,9 @@ void uv__finish_close(uv_handle_t* handle) { case UV_FS_EVENT: break; + case UV_POLL: + break; + default: assert(0); break; @@ -285,6 +292,8 @@ int64_t uv_now(uv_loop_t* loop) { int uv_is_active(const uv_handle_t* handle) { switch (handle->type) { + case UV_POLL: + return uv__poll_active((const uv_poll_t*)handle); case UV_CHECK: return uv__check_active((const uv_check_t*)handle); case UV_IDLE: diff --git a/src/unix/internal.h b/src/unix/internal.h index d6c499bb..724120d4 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -141,6 +141,10 @@ int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay); int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); void uv__pipe_accept(EV_P_ ev_io* watcher, int revents); +/* poll */ +void uv__poll_close(uv_poll_t* handle); +int uv__poll_active(const uv_poll_t* handle); + /* various */ int uv__check_active(const uv_check_t* handle); int uv__idle_active(const uv_idle_t* handle); diff --git a/src/unix/poll.c b/src/unix/poll.c new file mode 100644 index 00000000..ca248113 --- /dev/null +++ b/src/unix/poll.c @@ -0,0 +1,130 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "internal.h" + +#include +#include +#include + + +static void uv__poll_io(EV_P_ ev_io* watcher, int ev_events) { + uv_poll_t* handle = watcher->data; + int events; + + if (ev_events & EV_ERROR) { + /* An error happened. Libev has implicitly stopped the watcher, but we */ + /* need to fix the refcount. */ + uv_ref(handle->loop); + uv__set_sys_error(handle->loop, EBADF); + handle->poll_cb(handle, -1, 0); + return; + } + + assert(ev_events & (EV_READ | EV_WRITE)); + assert((ev_events & ~(EV_READ | EV_WRITE)) == 0); + + events = 0; + if (ev_events & EV_READ) + events |= UV_READABLE; + if (ev_events & EV_WRITE) + events |= UV_WRITABLE; + + handle->poll_cb(handle, 0, events); +} + + +int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { + uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL); + loop->counters.poll_init++; + + handle->fd = fd; + handle->poll_cb = NULL; + + ev_init(&handle->io_watcher, uv__poll_io); + handle->io_watcher.data = handle; + + return 0; +} + + +int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, + uv_platform_socket_t socket) { + return uv_poll_init(loop, handle, socket); +} + + +static void uv__poll_stop(uv_poll_t* handle) { + if (ev_is_active(&handle->io_watcher)) { + ev_io_stop(handle->loop->ev, &handle->io_watcher); + uv_ref(handle->loop); + } +} + + +int uv_poll_stop(uv_poll_t* handle) { + assert(!(handle->flags & (UV_CLOSING | UV_CLOSED))); + uv__poll_stop(handle); + return 0; +} + + +int uv_poll_start(uv_poll_t* handle, int events, uv_poll_cb poll_cb) { + int ev_events; + int was_active; + + assert((events & ~(UV_READABLE | UV_WRITABLE)) == 0); + assert(!(handle->flags & (UV_CLOSING | UV_CLOSED))); + + if (events == 0) { + uv__poll_stop(handle); + return 0; + } + + ev_events = 0; + if (events & UV_READABLE) + ev_events |= EV_READ; + if (events & UV_WRITABLE) + ev_events |= EV_WRITE; + + was_active = ev_is_active(&handle->io_watcher); + + ev_io_set(&handle->io_watcher, handle->fd, ev_events); + ev_io_start(handle->loop->ev, &handle->io_watcher); + + if (!was_active) + uv_unref(handle->loop); + + handle->poll_cb = poll_cb; + + return 0; +} + + +void uv__poll_close(uv_poll_t* handle) { + uv__poll_stop(handle); +} + + +int uv__poll_active(const uv_poll_t* handle) { + return ev_is_active(&handle->io_watcher); +} diff --git a/uv.gyp b/uv.gyp index ba9128eb..21a29e0c 100644 --- a/uv.gyp +++ b/uv.gyp @@ -199,6 +199,7 @@ 'src/unix/internal.h', 'src/unix/loop.c', 'src/unix/pipe.c', + 'src/unix/poll.c', 'src/unix/prepare.c', 'src/unix/process.c', 'src/unix/stream.c', From beaf7507002a9f94cce9b30cbbdb108a64d8a5cd Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 25 Apr 2012 00:26:21 +0200 Subject: [PATCH 4/6] Test: add tests for uv_poll --- test/test-list.h | 5 + test/test-poll.c | 573 +++++++++++++++++++++++++++++++++++++++++++++++ uv.gyp | 1 + 3 files changed, 579 insertions(+) create mode 100644 test/test-poll.c diff --git a/test/test-list.h b/test/test-list.h index 84782b2a..6961b6f2 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -159,6 +159,8 @@ TEST_DECLARE (strlcpy) TEST_DECLARE (strlcat) TEST_DECLARE (counters_init) TEST_DECLARE (dlerror) +TEST_DECLARE (poll_duplex) +TEST_DECLARE (poll_unidirectional) #ifdef _WIN32 TEST_DECLARE (spawn_detect_pipe_name_collisions_on_windows) TEST_DECLARE (argument_escaping) @@ -314,6 +316,9 @@ TASK_LIST_START TEST_ENTRY (getsockname_tcp) TEST_ENTRY (getsockname_udp) + TEST_ENTRY (poll_duplex) + TEST_ENTRY (poll_unidirectional) + TEST_ENTRY (spawn_exit_code) TEST_ENTRY (spawn_stdout) TEST_ENTRY (spawn_stdin) diff --git a/test/test-poll.c b/test/test-poll.c new file mode 100644 index 00000000..64f9e36c --- /dev/null +++ b/test/test-poll.c @@ -0,0 +1,573 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include + +#ifndef _WIN32 +# include +# include +# include +#endif + +#include "uv.h" +#include "task.h" + + +#define NUM_CLIENTS 5 +#define TRANSFER_BYTES (1 << 16) + +#undef MIN +#define MIN(a, b) (((a) < (b)) ? (a) : (b)); + + +typedef enum { + UNIDIRECTIONAL, + DUPLEX +} test_mode_t; + +typedef struct connection_context_s { + uv_poll_t poll_handle; + uv_timer_t timer_handle; + uv_platform_socket_t sock; + size_t read, sent; + int is_server_connection; + int open_handles; + int got_fin, sent_fin; + unsigned int events, delayed_events; +} connection_context_t; + +typedef struct server_context_s { + uv_poll_t poll_handle; + uv_platform_socket_t sock; + int connections; +} server_context_t; + + +static void delay_timer_cb(uv_timer_t* timer, int status); + + +static test_mode_t test_mode = DUPLEX; + +static int closed_connections = 0; + +static int valid_writable_wakeups = 0; +static int spurious_writable_wakeups = 0; + + +static int got_eagain() { +#ifdef _WIN32 + return WSAGetLastError() == WSAEWOULDBLOCK; +#else + return errno == EAGAIN + || errno == EINPROGRESS +#ifdef EWOULDBLOCK + || errno == EWOULDBLOCK; +#endif + ; +#endif +} + + +static void set_nonblocking(uv_platform_socket_t sock) { + int r; +#ifdef _WIN32 + unsigned long on = 1; + r = ioctlsocket(sock, FIONBIO, &on); + ASSERT(r == 0); +#else + int flags = fcntl(sock, F_GETFL, 0); + ASSERT(flags >= 0); + r = fcntl(sock, F_SETFL, flags | O_NONBLOCK); + ASSERT(r >= 0); +#endif +} + + +static uv_platform_socket_t create_nonblocking_bound_socket( + struct sockaddr_in bind_addr) { + uv_platform_socket_t sock; + int r; + + sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); +#ifdef _WIN32 + ASSERT(sock != INVALID_SOCKET); +#else + ASSERT(sock >= 0); +#endif + + set_nonblocking(sock); + +#ifndef _WIN32 + { + /* Allow reuse of the port. */ + int yes = 1; + r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes); + ASSERT(r == 0); + } +#endif + + r = bind(sock, (const struct sockaddr*) &bind_addr, sizeof bind_addr); + ASSERT(r == 0); + + return sock; +} + + +static void close_socket(uv_platform_socket_t sock) { + int r; +#ifdef _WIN32 + r = closesocket(sock); +#else + r = close(sock); +#endif + ASSERT(r == 0); +} + + +static connection_context_t* create_connection_context( + uv_platform_socket_t sock, int is_server_connection) { + int r; + connection_context_t* context; + + context = (connection_context_t*) malloc(sizeof *context); + ASSERT(context != NULL); + + context->sock = sock; + context->is_server_connection = is_server_connection; + context->read = 0; + context->sent = 0; + context->open_handles = 0; + context->events = 0; + context->delayed_events = 0; + context->got_fin = 0; + context->sent_fin = 0; + + r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); + context->open_handles++; + context->poll_handle.data = context; + ASSERT(r == 0); + + r = uv_timer_init(uv_default_loop(), &context->timer_handle); + context->open_handles++; + context->timer_handle.data = context; + ASSERT(r == 0); + + return context; +} + + +static void connection_close_cb(uv_handle_t* handle) { + connection_context_t* context = (connection_context_t*) handle->data; + + if (--context->open_handles == 0) { + if (test_mode == DUPLEX || context->is_server_connection) { + ASSERT(context->read == TRANSFER_BYTES); + } else { + ASSERT(context->read == 0); + } + + if (test_mode == DUPLEX || !context->is_server_connection) { + ASSERT(context->sent == TRANSFER_BYTES); + } else { + ASSERT(context->sent == 0); + } + + closed_connections++; + + free(context); + } +} + + +static void destroy_connection_context(connection_context_t* context) { + uv_close((uv_handle_t*) &context->poll_handle, connection_close_cb); + uv_close((uv_handle_t*) &context->timer_handle, connection_close_cb); +} + + +static void connection_poll_cb(uv_poll_t* handle, int status, int events) { + connection_context_t* context = (connection_context_t*) handle->data; + int new_events; + int r; + + ASSERT(status == 0); + ASSERT(events & context->events); + ASSERT(!(events & ~context->events)); + + new_events = context->events; + + if (events & UV_READABLE) { + int action = rand() % 7; + + switch (action) { + case 0: + case 1: { + /* Read a couple of bytes. */ + static char buffer[74]; + r = recv(context->sock, buffer, sizeof buffer, 0); + ASSERT(r >= 0); + + if (r > 0) { + context->read += r; + } else { + /* Got FIN. */ + context->got_fin = 1; + new_events &= ~UV_READABLE; + } + + break; + } + + case 2: + case 3: { + /* Read until EAGAIN. */ + static char buffer[931]; + r = recv(context->sock, buffer, sizeof buffer, 0); + ASSERT(r >= 0); + + while (r > 0) { + context->read += r; + r = recv(context->sock, buffer, sizeof buffer, 0); + } + + if (r == 0) { + /* Got FIN. */ + context->got_fin = 1; + new_events &= ~UV_READABLE; + } else { + ASSERT(got_eagain()); + } + + break; + } + + case 4: + /* Ignore. */ + break; + + case 5: + /* Stop reading for a while. Restart in timer callback. */ + new_events &= ~UV_READABLE; + if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { + context->delayed_events = UV_READABLE; + uv_timer_start(&context->timer_handle, delay_timer_cb, 10, 0); + } else { + context->delayed_events |= UV_READABLE; + } + break; + + case 6: + /* Fudge with the event mask. */ + uv_poll_start(&context->poll_handle, UV_WRITABLE, connection_poll_cb); + uv_poll_start(&context->poll_handle, UV_READABLE, connection_poll_cb); + context->events = UV_READABLE; + break; + + default: + ASSERT(0); + } + } + + if (events & UV_WRITABLE) { + if (context->sent < TRANSFER_BYTES && + !(test_mode == UNIDIRECTIONAL && context->is_server_connection)) { + /* We have to send more bytes. */ + int action = rand() % 7; + + switch (action) { + case 0: + case 1: { + /* Send a couple of bytes. */ + static char buffer[103]; + + int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT(send_bytes > 0); + + r = send(context->sock, buffer, send_bytes, 0); + + if (r < 0) { + ASSERT(got_eagain()); + spurious_writable_wakeups++; + break; + } + + ASSERT(r > 0); + context->sent += r; + valid_writable_wakeups++; + break; + } + + case 2: + case 3: { + /* Send until EAGAIN. */ + static char buffer[1234]; + + int send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT(send_bytes > 0); + + r = send(context->sock, buffer, send_bytes, 0); + + if (r < 0) { + ASSERT(got_eagain()); + spurious_writable_wakeups++; + break; + } + + ASSERT(r > 0); + valid_writable_wakeups++; + context->sent += r; + + while (context->sent < TRANSFER_BYTES) { + send_bytes = MIN(TRANSFER_BYTES - context->sent, sizeof buffer); + ASSERT(send_bytes > 0); + + r = send(context->sock, buffer, send_bytes, 0); + + if (r <= 0) break; + context->sent += r; + } + ASSERT(r > 0 || got_eagain()); + break; + } + + case 4: + /* Ignore. */ + break; + + case 5: + /* Stop sending for a while. Restart in timer callback. */ + new_events &= ~UV_WRITABLE; + if (!uv_is_active((uv_handle_t*) &context->timer_handle)) { + context->delayed_events = UV_WRITABLE; + uv_timer_start(&context->timer_handle, delay_timer_cb, 100, 0); + } else { + context->delayed_events |= UV_WRITABLE; + } + break; + + case 6: + /* Fudge with the event mask. */ + uv_poll_start(&context->poll_handle, + UV_READABLE, + connection_poll_cb); + uv_poll_start(&context->poll_handle, + UV_WRITABLE, + connection_poll_cb); + context->events = UV_WRITABLE; + break; + + default: + ASSERT(0); + } + + } else { + /* Nothing more to write. Send FIN. */ + int r; +#ifdef _WIN32 + r = shutdown(context->sock, SD_SEND); +#else + r = shutdown(context->sock, SHUT_WR); +#endif + ASSERT(r == 0); + context->sent_fin = 1; + new_events &= ~UV_WRITABLE; + } + } + + if (context->got_fin && context->sent_fin) { + /* Sent and received FIN. Close and destroy context. */ + close_socket(context->sock); + destroy_connection_context(context); + context->events = 0; + + } else if (new_events != context->events) { + /* Poll mask changed. Call uv_poll_start again. */ + context->events = new_events; + uv_poll_start(handle, new_events, connection_poll_cb); + } + + /* Assert that uv_is_active works correctly for poll handles. */ + if (context->events != 0) { + ASSERT(uv_is_active((uv_handle_t*) handle)); + } else { + ASSERT(!uv_is_active((uv_handle_t*) handle)); + } +} + + +static void delay_timer_cb(uv_timer_t* timer, int status) { + connection_context_t* context = (connection_context_t*) timer->data; + int r; + + /* Timer should auto stop. */ + ASSERT(!uv_is_active((uv_handle_t*) timer)); + + /* Add the requested events to the poll mask. */ + ASSERT(context->delayed_events != 0); + context->events |= context->delayed_events; + context->delayed_events = 0; + + r = uv_poll_start(&context->poll_handle, + context->events, + connection_poll_cb); + ASSERT(r == 0); +} + + +static server_context_t* create_server_context( + uv_platform_socket_t sock) { + int r; + server_context_t* context; + + context = (server_context_t*) malloc(sizeof *context); + ASSERT(context != NULL); + + context->sock = sock; + context->connections = 0; + + r = uv_poll_init_socket(uv_default_loop(), &context->poll_handle, sock); + context->poll_handle.data = context; + ASSERT(r == 0); + + return context; +} + + +static void server_close_cb(uv_handle_t* handle) { + server_context_t* context = (server_context_t*) handle->data; + free(context); +} + + +static void destroy_server_context(server_context_t* context) { + uv_close((uv_handle_t*) &context->poll_handle, server_close_cb); +} + + +static void server_poll_cb(uv_poll_t* handle, int status, int events) { + server_context_t* server_context = (server_context_t*) + handle->data; + connection_context_t* connection_context; + struct sockaddr_in addr; + socklen_t addr_len; + uv_platform_socket_t sock; + int r; + + addr_len = sizeof addr; + sock = accept(server_context->sock, (struct sockaddr*) &addr, &addr_len); +#ifdef _WIN32 + ASSERT(sock != INVALID_SOCKET); +#else + ASSERT(sock >= 0); +#endif + + set_nonblocking(sock); + + connection_context = create_connection_context(sock, 1); + connection_context->events = UV_READABLE | UV_WRITABLE; + r = uv_poll_start(&connection_context->poll_handle, + UV_READABLE | UV_WRITABLE, + connection_poll_cb); + ASSERT(r == 0); + + if (++server_context->connections == NUM_CLIENTS) { + close_socket(server_context->sock); + destroy_server_context(server_context); + } +} + + +static void start_server() { + uv_platform_socket_t sock; + server_context_t* context; + int r; + + sock = create_nonblocking_bound_socket(uv_ip4_addr("127.0.0.1", TEST_PORT)); + context = create_server_context(sock); + + r = listen(sock, 100); + ASSERT(r == 0); + + r = uv_poll_start(&context->poll_handle, UV_READABLE, server_poll_cb); + ASSERT(r == 0); +} + + +static void start_client() { + uv_platform_socket_t sock; + connection_context_t* context; + struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + int r; + + sock = create_nonblocking_bound_socket(uv_ip4_addr("0.0.0.0", 0)); + context = create_connection_context(sock, 0); + + context->events = UV_READABLE | UV_WRITABLE; + r = uv_poll_start(&context->poll_handle, + UV_READABLE | UV_WRITABLE, + connection_poll_cb); + ASSERT(r == 0); + + r = connect(sock, (struct sockaddr*) &server_addr, sizeof server_addr); + ASSERT(r == 0 || got_eagain()); +} + + +static void start_poll_test() { + int i, r; + +#ifdef _WIN32 + { + struct WSAData wsa_data; + r = WSAStartup(MAKEWORD(2, 2), &wsa_data); + ASSERT(r == 0); + } +#endif + + start_server(); + + for (i = 0; i < NUM_CLIENTS; i++) + start_client(); + + r = uv_run(uv_default_loop()); + ASSERT(r == 0); + + /* Assert that at most one percent of the writable wakeups was spurious. */ + ASSERT(spurious_writable_wakeups == 0 || + (valid_writable_wakeups + spurious_writable_wakeups) / + spurious_writable_wakeups > 100); + + ASSERT(closed_connections == NUM_CLIENTS * 2); +} + + +TEST_IMPL(poll_duplex) { + test_mode = DUPLEX; + start_poll_test(); + return 0; +} + + +TEST_IMPL(poll_unidirectional) { + test_mode = UNIDIRECTIONAL; + start_poll_test(); + return 0; +} diff --git a/uv.gyp b/uv.gyp index 21a29e0c..4b804dcb 100644 --- a/uv.gyp +++ b/uv.gyp @@ -328,6 +328,7 @@ 'test/test-pipe-bind-error.c', 'test/test-pipe-connect-error.c', 'test/test-platform-output.c', + 'test/test-poll.c', 'test/test-process-title.c', 'test/test-ref.c', 'test/test-shutdown-close.c', From 444ab19be8aee6f167e84a500e4d488d52d82277 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Wed, 25 Apr 2012 00:23:16 +0200 Subject: [PATCH 5/6] Benchmarks: add size of uv_poll_t to benchmark-sizes --- test/benchmark-sizes.c | 1 + 1 file changed, 1 insertion(+) diff --git a/test/benchmark-sizes.c b/test/benchmark-sizes.c index d75cb8e0..09e06e8f 100644 --- a/test/benchmark-sizes.c +++ b/test/benchmark-sizes.c @@ -36,5 +36,6 @@ BENCHMARK_IMPL(sizes) { LOGF("uv_async_t: %u bytes\n", (unsigned int) sizeof(uv_async_t)); LOGF("uv_timer_t: %u bytes\n", (unsigned int) sizeof(uv_timer_t)); LOGF("uv_process_t: %u bytes\n", (unsigned int) sizeof(uv_process_t)); + LOGF("uv_poll_t: %u bytes\n", (unsigned int) sizeof(uv_poll_t)); return 0; } From b9504f7987ce6e4979d1c3edc0aceb8f9d592409 Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Thu, 3 May 2012 16:02:21 +0200 Subject: [PATCH 6/6] Rename uv_platform_socket_t to uv_os_sock_t --- include/uv-private/uv-unix.h | 2 +- include/uv-private/uv-win.h | 2 +- include/uv.h | 2 +- src/unix/poll.c | 2 +- src/win/poll.c | 2 +- test/test-poll.c | 22 +++++++++++----------- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/include/uv-private/uv-unix.h b/include/uv-private/uv-unix.h index 08b2d120..4483cea3 100644 --- a/include/uv-private/uv-unix.h +++ b/include/uv-private/uv-unix.h @@ -45,7 +45,7 @@ typedef struct { typedef int uv_file; -typedef int uv_platform_socket_t; +typedef int uv_os_sock_t; #define UV_ONCE_INIT PTHREAD_ONCE_INIT diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 49a4bac4..16a9a035 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -160,7 +160,7 @@ typedef struct uv_buf_t { typedef int uv_file; -typedef SOCKET uv_platform_socket_t; +typedef SOCKET uv_os_sock_t; typedef HANDLE uv_thread_t; diff --git a/include/uv.h b/include/uv.h index 78a2f156..ec8a3e79 100644 --- a/include/uv.h +++ b/include/uv.h @@ -975,7 +975,7 @@ UV_EXTERN int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd); /* Initialize the poll watcher using a socket descriptor. On unix this is */ /* identical to uv_poll_init. On windows it takes a SOCKET handle. */ UV_EXTERN int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, - uv_platform_socket_t socket); + uv_os_sock_t socket); /* * Starts polling the file descriptor. `events` is a bitmask consisting made up diff --git a/src/unix/poll.c b/src/unix/poll.c index ca248113..74a2cb51 100644 --- a/src/unix/poll.c +++ b/src/unix/poll.c @@ -68,7 +68,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, - uv_platform_socket_t socket) { + uv_os_sock_t socket) { return uv_poll_init(loop, handle, socket); } diff --git a/src/win/poll.c b/src/win/poll.c index f41433cc..e0b9b5ad 100644 --- a/src/win/poll.c +++ b/src/win/poll.c @@ -474,7 +474,7 @@ int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) { int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle, - uv_platform_socket_t socket) { + uv_os_sock_t socket) { WSAPROTOCOL_INFOW protocol_info; int len; SOCKET peer_socket, base_socket; diff --git a/test/test-poll.c b/test/test-poll.c index 64f9e36c..0033f01f 100644 --- a/test/test-poll.c +++ b/test/test-poll.c @@ -46,7 +46,7 @@ typedef enum { typedef struct connection_context_s { uv_poll_t poll_handle; uv_timer_t timer_handle; - uv_platform_socket_t sock; + uv_os_sock_t sock; size_t read, sent; int is_server_connection; int open_handles; @@ -56,7 +56,7 @@ typedef struct connection_context_s { typedef struct server_context_s { uv_poll_t poll_handle; - uv_platform_socket_t sock; + uv_os_sock_t sock; int connections; } server_context_t; @@ -86,7 +86,7 @@ static int got_eagain() { } -static void set_nonblocking(uv_platform_socket_t sock) { +static void set_nonblocking(uv_os_sock_t sock) { int r; #ifdef _WIN32 unsigned long on = 1; @@ -101,9 +101,9 @@ static void set_nonblocking(uv_platform_socket_t sock) { } -static uv_platform_socket_t create_nonblocking_bound_socket( +static uv_os_sock_t create_nonblocking_bound_socket( struct sockaddr_in bind_addr) { - uv_platform_socket_t sock; + uv_os_sock_t sock; int r; sock = socket(AF_INET, SOCK_STREAM, IPPROTO_IP); @@ -131,7 +131,7 @@ static uv_platform_socket_t create_nonblocking_bound_socket( } -static void close_socket(uv_platform_socket_t sock) { +static void close_socket(uv_os_sock_t sock) { int r; #ifdef _WIN32 r = closesocket(sock); @@ -143,7 +143,7 @@ static void close_socket(uv_platform_socket_t sock) { static connection_context_t* create_connection_context( - uv_platform_socket_t sock, int is_server_connection) { + uv_os_sock_t sock, int is_server_connection) { int r; connection_context_t* context; @@ -433,7 +433,7 @@ static void delay_timer_cb(uv_timer_t* timer, int status) { static server_context_t* create_server_context( - uv_platform_socket_t sock) { + uv_os_sock_t sock) { int r; server_context_t* context; @@ -468,7 +468,7 @@ static void server_poll_cb(uv_poll_t* handle, int status, int events) { connection_context_t* connection_context; struct sockaddr_in addr; socklen_t addr_len; - uv_platform_socket_t sock; + uv_os_sock_t sock; int r; addr_len = sizeof addr; @@ -496,7 +496,7 @@ static void server_poll_cb(uv_poll_t* handle, int status, int events) { static void start_server() { - uv_platform_socket_t sock; + uv_os_sock_t sock; server_context_t* context; int r; @@ -512,7 +512,7 @@ static void start_server() { static void start_client() { - uv_platform_socket_t sock; + uv_os_sock_t sock; connection_context_t* context; struct sockaddr_in server_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); int r;