diff --git a/include/uv-win.h b/include/uv-win.h index 07aeabb3..a3ad4577 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -48,7 +48,8 @@ typedef struct uv_buf_t { UV_ARES_CLEANUP_REQ, \ UV_GETADDRINFO_REQ, \ UV_PROCESS_EXIT, \ - UV_PROCESS_CLOSE + UV_PROCESS_CLOSE, \ + UV_UDP_RECV #define UV_REQ_PRIVATE_FIELDS \ union { \ @@ -69,6 +70,9 @@ typedef struct uv_buf_t { #define UV_SHUTDOWN_PRIVATE_FIELDS \ /* empty */ +#define UV_UDP_SEND_PRIVATE_FIELDS \ + /* empty */ + #define UV_PRIVATE_REQ_TYPES \ typedef struct uv_pipe_accept_s { \ UV_REQ_FIELDS \ @@ -114,7 +118,15 @@ typedef struct uv_buf_t { struct { uv_tcp_connection_fields }; \ }; -#define UV_UDP_PRIVATE_FIELDS +#define UV_UDP_PRIVATE_FIELDS \ + SOCKET socket; \ + unsigned int reqs_pending; \ + uv_req_t recv_req; \ + uv_buf_t recv_buffer; \ + struct sockaddr_storage recv_from; \ + int recv_from_len; \ + uv_udp_recv_cb recv_cb; \ + uv_alloc_cb alloc_cb; #define uv_pipe_server_fields \ uv_pipe_accept_t accept_reqs[4]; \ diff --git a/include/uv.h b/include/uv.h index 040d423a..00b286fc 100644 --- a/include/uv.h +++ b/include/uv.h @@ -491,6 +491,7 @@ struct uv_udp_s { struct uv_udp_send_s { UV_REQ_FIELDS uv_udp_t* handle; + uv_udp_send_cb cb; UV_UDP_SEND_PRIVATE_FIELDS }; diff --git a/src/win/error.c b/src/win/error.c index 36868b58..1e00fc6e 100644 --- a/src/win/error.c +++ b/src/win/error.c @@ -109,6 +109,7 @@ uv_err_code uv_translate_sys_error(int sys_errno) { case WSAEINVAL: return UV_EINVAL; case ERROR_TOO_MANY_OPEN_FILES: return UV_EMFILE; case WSAEMFILE: return UV_EMFILE; + case WSAEMSGSIZE: return UV_EMSGSIZE; case ERROR_NETWORK_UNREACHABLE: return UV_ENETUNREACH; case WSAENETUNREACH: return UV_ENETUNREACH; case ERROR_OUTOFMEMORY: return UV_ENOMEM; diff --git a/src/win/handle.c b/src/win/handle.c index 715cec46..2a1d5974 100644 --- a/src/win/handle.c +++ b/src/win/handle.c @@ -42,6 +42,7 @@ int uv_is_active(uv_handle_t* handle) { void uv_close(uv_handle_t* handle, uv_close_cb cb) { uv_tcp_t* tcp; uv_pipe_t* pipe; + uv_udp_t* udp; uv_process_t* process; if (handle->flags & UV_HANDLE_CLOSING) { @@ -77,6 +78,15 @@ void uv_close(uv_handle_t* handle, uv_close_cb cb) { } return; + case UV_UDP: + udp = (uv_udp_t*) handle; + uv_udp_recv_stop(udp); + closesocket(udp->socket); + if (udp->reqs_pending == 0) { + uv_want_endgame(handle); + } + return; + case UV_TIMER: uv_timer_stop((uv_timer_t*)handle); uv_want_endgame(handle); @@ -143,6 +153,10 @@ void uv_process_endgames() { uv_pipe_endgame((uv_pipe_t*)handle); break; + case UV_UDP: + uv_udp_endgame((uv_udp_t*) handle); + break; + case UV_TIMER: uv_timer_endgame((uv_timer_t*)handle); break; diff --git a/src/win/internal.h b/src/win/internal.h index 54ee9772..97f30f16 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -177,6 +177,15 @@ void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req); void uv_tcp_endgame(uv_tcp_t* handle); +/* + * UDP + */ +void uv_udp_endgame(uv_udp_t* handle); + +void uv_process_udp_recv_req(uv_udp_t* handle, uv_req_t* req); +void uv_process_udp_send_req(uv_udp_t* handle, uv_udp_send_t* req); + + /* * Pipes */ diff --git a/src/win/req.c b/src/win/req.c index b2ab962d..5b88ff9b 100644 --- a/src/win/req.c +++ b/src/win/req.c @@ -117,6 +117,15 @@ void uv_process_reqs() { (uv_pipe_t*) ((uv_shutdown_t*) req)->handle, (uv_shutdown_t*) req); break; + case UV_UDP_RECV: + uv_process_udp_recv_req((uv_udp_t*) req->data, req); + break; + + case UV_UDP_SEND: + uv_process_udp_send_req(((uv_udp_send_t*) req)->handle, + (uv_udp_send_t*) req); + break; + case UV_WAKEUP: uv_process_async_wakeup_req((uv_async_t*) req->data, req); break; diff --git a/src/win/udp.c b/src/win/udp.c new file mode 100644 index 00000000..0428f97b --- /dev/null +++ b/src/win/udp.c @@ -0,0 +1,558 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include + +#include "uv.h" +#include "../uv-common.h" +#include "internal.h" +#include + +#if 0 +/* + * Threshold of active udp streams for which to preallocate udp read buffers. + */ +const unsigned int uv_active_udp_streams_threshold = 0; + +/* A zero-size buffer for use by uv_udp_read */ +static char uv_zero_[] = ""; +#endif + +/* Counter to keep track of active udp streams */ +static unsigned int active_udp_streams = 0; + + +static int uv_udp_set_socket(uv_udp_t* handle, SOCKET socket) { + DWORD yes = 1; + + assert(handle->socket == INVALID_SOCKET); + + /* Set the socket to nonblocking mode */ + if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) { + uv_set_sys_error(WSAGetLastError()); + return -1; + } + + /* Make the socket non-inheritable */ + if (!SetHandleInformation((HANDLE)socket, HANDLE_FLAG_INHERIT, 0)) { + uv_set_sys_error(GetLastError()); + return -1; + } + + /* Associate it with the I/O completion port. */ + /* Use uv_handle_t pointer as completion key. */ + if (CreateIoCompletionPort((HANDLE)socket, + LOOP->iocp, + (ULONG_PTR)socket, + 0) == NULL) { + uv_set_sys_error(GetLastError()); + return -1; + } + + if (pSetFileCompletionNotificationModes) { + if (!pSetFileCompletionNotificationModes((HANDLE)socket, FILE_SKIP_SET_EVENT_ON_HANDLE | + FILE_SKIP_COMPLETION_PORT_ON_SUCCESS)) { + uv_set_sys_error(GetLastError()); + return -1; + } + + handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP; + } + + handle->socket = socket; + + return 0; +} + + +int uv_udp_init(uv_udp_t* handle) { + handle->type = UV_UDP; + handle->socket = INVALID_SOCKET; + handle->reqs_pending = 0; + handle->flags = 0; + + uv_req_init((uv_req_t*) &(handle->recv_req)); + handle->recv_req.type = UV_UDP_RECV; + handle->recv_req.data = handle; + + uv_ref(); + + uv_counters()->handle_init++; + uv_counters()->udp_init++; + + return 0; +} + + +void uv_udp_endgame(uv_udp_t* handle) { + if (handle->flags & UV_HANDLE_CLOSING && + handle->reqs_pending == 0) { + assert(!(handle->flags & UV_HANDLE_CLOSED)); + handle->flags |= UV_HANDLE_CLOSED; + + if (handle->close_cb) { + handle->close_cb((uv_handle_t*)handle); + } + + uv_unref(); + } +} + + +static int uv__bind(uv_udp_t* handle, int domain, struct sockaddr* addr, + int addrsize, unsigned int flags) { + DWORD err; + int r; + SOCKET sock; + + if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) { + /* UV_UDP_IPV6ONLY is supported only for IPV6 sockets */ + uv_set_sys_error(UV_EINVAL); + } + + if (handle->socket == INVALID_SOCKET) { + sock = socket(domain, SOCK_DGRAM, 0); + if (sock == INVALID_SOCKET) { + uv_set_sys_error(WSAGetLastError()); + return -1; + } + + if (uv_udp_set_socket(handle, sock) == -1) { + closesocket(sock); + return -1; + } + } + + if (domain == AF_INET6 && !(flags & UV_UDP_IPV6ONLY)) { + DWORD off = 0; + /* On windows IPV6ONLY is on by default. */ + /* If the user doesn't specify it libuv turns it off. */ + + /* TODO: how to handle errors? This may fail if there is no ipv4 stack */ + /* available, or when run on XP/2003 which have no support for dualstack */ + /* sockets. For now we're silently ignoring the error. */ + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, (const char*) &off, sizeof off); + } + + r = bind(handle->socket, addr, addrsize); + + if (r == SOCKET_ERROR) { + err = WSAGetLastError(); + uv_set_sys_error(WSAGetLastError()); + return -1; + } + + handle->flags |= UV_HANDLE_BOUND; + + return 0; +} + + +int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned int flags) { + if (addr.sin_family != AF_INET) { + uv_set_sys_error(WSAEFAULT); + return -1; + } + + return uv__bind(handle, + AF_INET, + (struct sockaddr*) &addr, + sizeof(struct sockaddr_in), + flags); +} + + +int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned int flags) { + if (addr.sin6_family != AF_INET6) { + uv_set_sys_error(WSAEFAULT); + return -1; + } + + if (uv_allow_ipv6) { + handle->flags |= UV_HANDLE_IPV6; + return uv__bind(handle, + AF_INET6, + (struct sockaddr*) &addr, + sizeof(struct sockaddr_in6), + flags); + } else { + uv_new_sys_error(WSAEAFNOSUPPORT); + return -1; + } +} + + +static void uv_udp_queue_recv(uv_udp_t* handle) { + uv_req_t* req; + uv_buf_t buf; + DWORD bytes, flags; + int result; + + assert(handle->flags & UV_HANDLE_READING); + assert(!(handle->flags & UV_HANDLE_READ_PENDING)); + + req = &handle->recv_req; + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + /* + * Preallocate a read buffer if the number of active streams is below + * the threshold. + */ +#if 0 + if (active_udp_streams < uv_active_udp_streams_threshold) { + handle->flags &= ~UV_HANDLE_ZERO_READ; +#endif + handle->recv_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536); + assert(handle->recv_buffer.len > 0); + + buf = handle->recv_buffer; + memset(&handle->recv_from, 0, sizeof handle->recv_from); + handle->recv_from_len = sizeof handle->recv_from; + flags = 0; + + result = WSARecvFrom(handle->socket, + (WSABUF*) &buf, + 1, + &bytes, + &flags, + (struct sockaddr*) &handle->recv_from, + &handle->recv_from_len, + &req->overlapped, + NULL); + + if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { + /* Process the req without IOCP. */ + handle->flags |= UV_HANDLE_READ_PENDING; + req->overlapped.InternalHigh = bytes; + handle->reqs_pending++; + uv_insert_pending_req(req); + } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { + /* The req will be processed with IOCP. */ + handle->flags |= UV_HANDLE_READ_PENDING; + handle->reqs_pending++; + } else { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, WSAGetLastError()); + uv_insert_pending_req(req); + handle->reqs_pending++; + } +#if 0 + } else { + handle->flags |= UV_HANDLE_ZERO_READ; + + buf.base = (char*) uv_zero_; + buf.len = 0; + flags = MSG_PARTIAL; + + result = WSARecv(handle->socket, + (WSABUF*) &buf, + 1, + &bytes, + &flags, + &req->overlapped, + NULL); + + if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { + /* Process the req without IOCP. */ + handle->flags |= UV_HANDLE_READ_PENDING; + req->overlapped.InternalHigh = bytes; + handle->reqs_pending++; + uv_insert_pending_req(req); + } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { + /* The req will be processed with IOCP. */ + handle->flags |= UV_HANDLE_READ_PENDING; + handle->reqs_pending++; + } else { + /* Make this req pending reporting an error. */ + SET_REQ_ERROR(req, WSAGetLastError()); + uv_insert_pending_req(req); + handle->reqs_pending++; + } + } +#endif +} + + +int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, uv_udp_recv_cb recv_cb) { + if (handle->flags & UV_HANDLE_READING) { + uv_set_sys_error(WSAEALREADY); + return -1; + } + + if (!(handle->flags & UV_HANDLE_BOUND) && + uv_udp_bind(handle, uv_addr_ip4_any_, 0) < 0) { + return -1; + } + + handle->flags |= UV_HANDLE_READING; + active_udp_streams++; + + handle->recv_cb = recv_cb; + handle->alloc_cb = alloc_cb; + + /* If reading was stopped and then started again, there could stell be a */ + /* recv request pending. */ + if (!(handle->flags & UV_HANDLE_READ_PENDING)) + uv_udp_queue_recv(handle); + + return 0; +} + + +int uv_udp_recv_stop(uv_udp_t* handle) { + if (handle->flags & UV_HANDLE_READING) { + handle->flags &= ~UV_HANDLE_READING; + active_udp_streams--; + } + + return 0; +} + + +int uv_udp_connect6(uv_connect_t* req, uv_udp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb) { + int addrsize = sizeof(struct sockaddr_in6); + BOOL success; + DWORD bytes; + + if (!uv_allow_ipv6) { + uv_new_sys_error(WSAEAFNOSUPPORT); + return -1; + } + + if (address.sin6_family != AF_INET6) { + uv_set_sys_error(WSAEFAULT); + return -1; + } + + if (!(handle->flags & UV_HANDLE_BOUND) && + uv_udp_bind6(handle, uv_addr_ip6_any_, 0) < 0) + return -1; + + uv_req_init((uv_req_t*) req); + req->type = UV_CONNECT; + req->handle = (uv_stream_t*) handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + success = pConnectEx6(handle->socket, + (struct sockaddr*) &address, + addrsize, + NULL, + 0, + &bytes, + &req->overlapped); + + if (UV_SUCCEEDED_WITHOUT_IOCP(success)) { + handle->reqs_pending++; + uv_insert_pending_req((uv_req_t*)req); + } else if (UV_SUCCEEDED_WITH_IOCP(success)) { + handle->reqs_pending++; + } else { + uv_set_sys_error(WSAGetLastError()); + return -1; + } + + return 0; +} + + +static int uv__udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], + int bufcnt, struct sockaddr* addr, int addr_len, uv_udp_send_cb cb) { + DWORD result, bytes; + + uv_req_init((uv_req_t*) req); + req->type = UV_UDP_SEND; + req->handle = handle; + req->cb = cb; + memset(&req->overlapped, 0, sizeof(req->overlapped)); + + result = WSASendTo(handle->socket, + (WSABUF*)bufs, + bufcnt, + &bytes, + 0, + addr, + addr_len, + &req->overlapped, + NULL); + + if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) { + /* Request completed immediately. */ + req->queued_bytes = 0; + handle->reqs_pending++; + uv_insert_pending_req((uv_req_t*)req); + } else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) { + /* Request queued by the kernel. */ + req->queued_bytes = uv_count_bufs(bufs, bufcnt); + handle->reqs_pending++; + } else { + /* Send failed due to an error. */ + uv_set_sys_error(WSAGetLastError()); + return -1; + } + + return 0; +} + + +int uv_udp_send(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], + int bufcnt, struct sockaddr_in addr, uv_udp_send_cb cb) { + + if (!(handle->flags & UV_HANDLE_BOUND) && + uv_udp_bind(handle, uv_addr_ip4_any_, 0) < 0) { + return -1; + } + + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*) &addr, + sizeof addr, + cb); +} + + +int uv_udp_send6(uv_udp_send_t* req, uv_udp_t* handle, uv_buf_t bufs[], + int bufcnt, struct sockaddr_in6 addr, uv_udp_send_cb cb) { + + if (!(handle->flags & UV_HANDLE_BOUND) && + uv_udp_bind6(handle, uv_addr_ip6_any_, 0) < 0) { + return -1; + } + + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*) &addr, + sizeof addr, + cb); +} + + +void uv_process_udp_recv_req(uv_udp_t* handle, uv_req_t* req) { + uv_buf_t buf; + int partial; + + assert(handle->type == UV_UDP); + + handle->flags &= ~UV_HANDLE_READ_PENDING; + + if (!REQ_SUCCESS(req) && + GET_REQ_STATUS(req) != STATUS_RECEIVE_EXPEDITED) { + /* An error occurred doing the read. */ + if ((handle->flags & UV_HANDLE_READING)) { + LOOP->last_error = GET_REQ_UV_SOCK_ERROR(req); + uv_udp_recv_stop(handle); +#if 0 + buf = (handle->flags & UV_HANDLE_ZERO_READ) ? + uv_buf_init(NULL, 0) : handle->recv_buffer; +#else + buf = handle->recv_buffer; +#endif + handle->recv_cb(handle, -1, buf, NULL, 0); + } + goto done; + } + +#if 0 + if (!(handle->flags & UV_HANDLE_ZERO_READ)) { +#endif + /* Successful read */ + partial = (GET_REQ_STATUS(req) == STATUS_RECEIVE_EXPEDITED); + handle->recv_cb(handle, + req->overlapped.InternalHigh, + handle->recv_buffer, + (struct sockaddr*) &handle->recv_from, + partial ? UV_UDP_PARTIAL : 0); +#if 0 + } else { + DWORD bytes, err, flags; + struct sockaddr_storage from; + int from_len; + + /* Do a nonblocking receive */ + /* TODO: try to read multiple datagrams at once. FIONREAD maybe? */ + buf = handle->alloc_cb((uv_handle_t*) handle, 65536); + assert(buf.len > 0); + + memset(&from, 0, sizeof from); + from_len = sizeof from; + flags = MSG_PARTIAL; + + if (WSARecvFrom(handle->socket, + (WSABUF*)&buf, + 1, + &bytes, + &flags, + (struct sockaddr*) &from, + &from_len, + NULL, + NULL) != SOCKET_ERROR) { + + /* Message received */ + handle->recv_cb(handle, + bytes, + buf, + (struct sockaddr*) &from, + (flags & MSG_PARTIAL) ? UV_UDP_PARTIAL : 0); + } else { + err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK) { + uv_set_sys_error(WSAEWOULDBLOCK); + handle->recv_cb(handle, 0, buf, NULL, 0); + } else { + /* Ouch! serious error. */ + uv_set_sys_error(err); + handle->recv_cb(handle, -1, buf, NULL, 0); + } + } + } +#endif + +done: + /* Post another read if still reading and not closing. */ + if ((handle->flags & UV_HANDLE_READING) && + !(handle->flags & UV_HANDLE_READ_PENDING)) { + uv_udp_queue_recv(handle); + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +void uv_process_udp_send_req(uv_udp_t* handle, uv_udp_send_t* req) { + assert(handle->type == UV_UDP); + + if (req->cb) { + if (REQ_SUCCESS(req)) { + req->cb(req, 0); + } else { + LOOP->last_error = GET_REQ_UV_SOCK_ERROR(req); + req->cb(req, -1); + } + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + diff --git a/uv.gyp b/uv.gyp index e202e13b..ca90b826 100644 --- a/uv.gyp +++ b/uv.gyp @@ -109,6 +109,7 @@ 'src/win/stream.c', 'src/win/tcp.c', 'src/win/timer.c', + 'src/win/udp.c', 'src/win/util.c', 'src/win/winapi.c', 'src/win/winapi.h', @@ -232,6 +233,9 @@ 'test/test-tcp-writealot.c', 'test/test-timer-again.c', 'test/test-timer.c', + 'test/test-udp-dgram-too-big.c', + 'test/test-udp-ipv6.c', + 'test/test-udp-send-and-recv.c', ], 'conditions': [ [ 'OS=="win"', { @@ -269,6 +273,7 @@ 'test/benchmark-pump.c', 'test/benchmark-sizes.c', 'test/benchmark-spawn.c', + 'test/benchmark-udp-packet-storm.c', 'test/dns-server.c', 'test/echo-server.c', 'test/run-benchmarks.c',