diff --git a/CMakeLists.txt b/CMakeLists.txt index ce086f4a..40c808b8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -693,6 +693,7 @@ if(LIBUV_BUILD_TESTS) test/test-udp-send-unreachable.c test/test-udp-try-send.c test/test-udp-recv-in-a-row.c + test/test-udp-reuseport.c test/test-uname.c test/test-walk-handles.c test/test-watcher-cross-stop.c) diff --git a/Makefile.am b/Makefile.am index 585934bd..9379b671 100644 --- a/Makefile.am +++ b/Makefile.am @@ -326,6 +326,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \ test/test-udp-send-unreachable.c \ test/test-udp-try-send.c \ test/test-udp-recv-in-a-row.c \ + test/test-udp-reuseport.c \ test/test-uname.c \ test/test-walk-handles.c \ test/test-watcher-cross-stop.c diff --git a/docs/src/tcp.rst b/docs/src/tcp.rst index 5b5453b0..571b2930 100644 --- a/docs/src/tcp.rst +++ b/docs/src/tcp.rst @@ -129,7 +129,8 @@ API .. note:: ``UV_TCP_REUSEPORT`` flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, - FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ at the moment. + FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ at the moment. On other platforms + this function will return an UV_ENOTSUP error. .. c:function:: int uv_tcp_getsockname(const uv_tcp_t* handle, struct sockaddr* name, int* namelen) diff --git a/docs/src/udp.rst b/docs/src/udp.rst index 9e075d7b..57d4c77e 100644 --- a/docs/src/udp.rst +++ b/docs/src/udp.rst @@ -28,19 +28,21 @@ Data types /* Disables dual stack mode. */ UV_UDP_IPV6ONLY = 1, /* - * Indicates message was truncated because read buffer was too small. The - * remainder was discarded by the OS. Used in uv_udp_recv_cb. - */ + * Indicates message was truncated because read buffer was too small. The + * remainder was discarded by the OS. Used in uv_udp_recv_cb. + */ UV_UDP_PARTIAL = 2, /* - * Indicates if SO_REUSEADDR will be set when binding the handle in - * uv_udp_bind. - * This sets the SO_REUSEPORT socket flag on the BSDs and OS X. On other - * Unix platforms, it sets the SO_REUSEADDR flag. What that means is that - * multiple threads or processes can bind to the same address without error - * (provided they all set the flag) but only the last one to bind will receive - * any traffic, in effect "stealing" the port from the previous listener. - */ + * Indicates if SO_REUSEADDR will be set when binding the handle. + * This sets the SO_REUSEPORT socket flag on the BSDs (except for + * DragonFlyBSD), OS X, and other platforms where SO_REUSEPORTs don't + * have the capability of load balancing, as the opposite of what + * UV_UDP_REUSEPORT would do. On other Unix platforms, it sets the + * SO_REUSEADDR flag. What that means is that multiple threads or + * processes can bind to the same address without error (provided + * they all set the flag) but only the last one to bind will receive + * any traffic, in effect "stealing" the port from the previous listener. + */ UV_UDP_REUSEADDR = 4, /* * Indicates that the message was received by recvmmsg, so the buffer provided @@ -62,8 +64,20 @@ Data types */ UV_UDP_LINUX_RECVERR = 32, /* - * Indicates that recvmmsg should be used, if available. - */ + * Indicates if SO_REUSEPORT will be set when binding the handle. + * This sets the SO_REUSEPORT socket option on supported platforms. + * Unlike UV_UDP_REUSEADDR, this flag will make multiple threads or + * processes that are binding to the same address and port "share" + * the port, which means incoming datagrams are distributed across + * the receiving sockets among threads or processes. + * + * This flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + * FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ for now. + */ + UV_UDP_REUSEPORT = 64, + /* + * Indicates that recvmmsg should be used, if available. + */ UV_UDP_RECVMMSG = 256 }; @@ -186,11 +200,24 @@ API with the address and port to bind to. :param flags: Indicate how the socket will be bound, - ``UV_UDP_IPV6ONLY``, ``UV_UDP_REUSEADDR``, and ``UV_UDP_RECVERR`` - are supported. + ``UV_UDP_IPV6ONLY``, ``UV_UDP_REUSEADDR``, ``UV_UDP_REUSEPORT``, + and ``UV_UDP_RECVERR`` are supported. :returns: 0 on success, or an error code < 0 on failure. + .. versionchanged:: 1.49.0 added the ``UV_UDP_REUSEPORT`` flag. + + .. note:: + ``UV_UDP_REUSEPORT`` flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ at the moment. On other platforms + this function will return an UV_ENOTSUP error. + For platforms where `SO_REUSEPORT`s have the capability of load balancing, + specifying both ``UV_UDP_REUSEADDR`` and ``UV_UDP_REUSEPORT`` in flags is allowed + and `SO_REUSEPORT` will always override the behavior of `SO_REUSEADDR`. + For platforms where `SO_REUSEPORT`s don't have the capability of load balancing, + specifying both ``UV_UDP_REUSEADDR`` and ``UV_UDP_REUSEPORT`` in flags will fail, + returning an UV_ENOTSUP error. + .. c:function:: int uv_udp_connect(uv_udp_t* handle, const struct sockaddr* addr) Associate the UDP handle to a remote address and port, so every diff --git a/include/uv.h b/include/uv.h index 3b6b7224..f1d976ba 100644 --- a/include/uv.h +++ b/include/uv.h @@ -656,10 +656,13 @@ enum uv_udp_flags { UV_UDP_PARTIAL = 2, /* * Indicates if SO_REUSEADDR will be set when binding the handle. - * This sets the SO_REUSEPORT socket flag on the BSDs and OS X. On other - * Unix platforms, it sets the SO_REUSEADDR flag. What that means is that - * multiple threads or processes can bind to the same address without error - * (provided they all set the flag) but only the last one to bind will receive + * This sets the SO_REUSEPORT socket flag on the BSDs (except for + * DragonFlyBSD), OS X, and other platforms where SO_REUSEPORTs don't + * have the capability of load balancing, as the opposite of what + * UV_UDP_REUSEPORT would do. On other Unix platforms, it sets the + * SO_REUSEADDR flag. What that means is that multiple threads or + * processes can bind to the same address without error (provided + * they all set the flag) but only the last one to bind will receive * any traffic, in effect "stealing" the port from the previous listener. */ UV_UDP_REUSEADDR = 4, @@ -682,6 +685,18 @@ enum uv_udp_flags { * This flag is no-op on platforms other than Linux. */ UV_UDP_LINUX_RECVERR = 32, + /* + * Indicates if SO_REUSEPORT will be set when binding the handle. + * This sets the SO_REUSEPORT socket option on supported platforms. + * Unlike UV_UDP_REUSEADDR, this flag will make multiple threads or + * processes that are binding to the same address and port "share" + * the port, which means incoming datagrams are distributed across + * the receiving sockets among threads or processes. + * + * This flag is available only on Linux 3.9+, DragonFlyBSD 3.6+, + * FreeBSD 12.0+, Solaris 11.4, and AIX 7.2.5+ for now. + */ + UV_UDP_REUSEPORT = 64, /* * Indicates that recvmmsg should be used, if available. */ diff --git a/src/unix/core.c b/src/unix/core.c index 8ea606c7..25249bcb 100644 --- a/src/unix/core.c +++ b/src/unix/core.c @@ -1910,3 +1910,48 @@ unsigned int uv_available_parallelism(void) { return (unsigned) rc; #endif /* __linux__ */ } + +int uv__sock_reuseport(int fd) { + int on = 1; +#if defined(__FreeBSD__) && __FreeBSD__ >= 12 && defined(SO_REUSEPORT_LB) + /* FreeBSD 12 introduced a new socket option named SO_REUSEPORT_LB + * with the capability of load balancing, it's the substitution of + * the SO_REUSEPORTs on Linux and DragonFlyBSD. */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &on, sizeof(on))) + return UV__ERR(errno); +#elif (defined(__linux__) || \ + defined(_AIX73) || \ + (defined(__DragonFly__) && __DragonFly_version >= 300600) || \ + (defined(__sun) && defined(SO_FLOW_NAME))) && \ + defined(SO_REUSEPORT) + /* On Linux 3.9+, the SO_REUSEPORT implementation distributes connections + * evenly across all of the threads (or processes) that are blocked in + * accept() on the same port. As with TCP, SO_REUSEPORT distributes datagrams + * evenly across all of the receiving threads (or process). + * + * DragonFlyBSD 3.6.0 extended SO_REUSEPORT to distribute workload to + * available sockets, which made it the equivalent of Linux's SO_REUSEPORT. + * + * AIX 7.2.5 added the feature that would add the capability to distribute + * incoming connections or datagrams across all listening ports for SO_REUSEPORT. + * + * Solaris 11 supported SO_REUSEPORT, but it's implemented only for + * binding to the same address and port, without load balancing. + * Solaris 11.4 extended SO_REUSEPORT with the capability of load balancing. + * Since it's impossible to detect the Solaris 11.4 version via OS macros, + * so we check the presence of the socket option SO_FLOW_NAME that was first + * introduced to Solaris 11.4. */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on))) + return UV__ERR(errno); +#else + (void) (fd); + (void) (on); + /* SO_REUSEPORTs do not have the capability of load balancing on platforms + * other than those mentioned above. The semantics are completely different, + * therefore we shouldn't enable it, but fail this operation to indicate that + * UV_[TCP/UDP]_REUSEPORT is not supported on these platforms. */ + return UV_ENOTSUP; +#endif + + return 0; +} diff --git a/src/unix/internal.h b/src/unix/internal.h index 3ad37052..1c0c88c7 100644 --- a/src/unix/internal.h +++ b/src/unix/internal.h @@ -243,6 +243,7 @@ int uv__close(int fd); /* preserves errno */ int uv__close_nocheckstdio(int fd); int uv__close_nocancel(int fd); int uv__socket(int domain, int type, int protocol); +int uv__sock_reuseport(int fd); ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags); void uv__make_close_pending(uv_handle_t* handle); int uv__getiovmax(void); diff --git a/src/unix/tcp.c b/src/unix/tcp.c index 5b8df37f..eba8c99f 100644 --- a/src/unix/tcp.c +++ b/src/unix/tcp.c @@ -148,50 +148,6 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) { } -static int uv__tcp_reuseport(int fd) { - int on = 1; -#if defined(__FreeBSD__) && __FreeBSD__ >= 12 && defined(SO_REUSEPORT_LB) - /* FreeBSD 12 introduced a new socket option named SO_REUSEPORT_LB - * with the capability of load balancing, it's the substitution of - * the SO_REUSEPORTs on Linux and DragonFlyBSD. */ - if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT_LB, &on, sizeof(on))) - return UV__ERR(errno); -#elif (defined(__linux__) || \ - defined(_AIX73) || \ - (defined(__DragonFly__) && __DragonFly_version >= 300600) || \ - (defined(__sun) && defined(SO_FLOW_NAME))) && \ - defined(SO_REUSEPORT) - /* On Linux 3.9+, the SO_REUSEPORT implementation distributes connections - * evenly across all of the threads (or processes) that are blocked in - * accept() on the same port. - * - * DragonFlyBSD 3.6.0 extended SO_REUSEPORT to distribute workload to - * available sockets, which made it the equivalent of Linux's SO_REUSEPORT. - * - * AIX 7.2.5 added the feature that would add the capability to distribute - * incoming connections across all listening ports for SO_REUSEPORT. - * - * Solaris 11 supported SO_REUSEPORT, but it's implemented only for - * binding to the same address and port, without load balancing. - * Solaris 11.4 extended SO_REUSEPORT with the capability of load balancing. - * Since it's impossible to detect the Solaris 11.4 version via OS macros, - * so we check the presence of the socket option SO_FLOW_NAME that was first - * introduced to Solaris 11.4. */ - if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &on, sizeof(on))) - return UV__ERR(errno); -#else - (void) (fd); - (void) (on); - /* SO_REUSEPORTs do not have the capability of load balancing on platforms - * other than those mentioned above. The semantics are completely different, - * therefore we shouldn't enable it, but fail this operation to indicate that - * UV_TCP_REUSEPORT is not supported on these platforms. */ - return UV_ENOTSUP; -#endif - - return 0; -} - int uv__tcp_bind(uv_tcp_t* tcp, const struct sockaddr* addr, unsigned int addrlen, @@ -212,7 +168,7 @@ int uv__tcp_bind(uv_tcp_t* tcp, return UV__ERR(errno); if (flags & UV_TCP_REUSEPORT) { - err = uv__tcp_reuseport(tcp->io_watcher.fd); + err = uv__sock_reuseport(tcp->io_watcher.fd); if (err) return err; } diff --git a/src/unix/udp.c b/src/unix/udp.c index e398509b..4e75a019 100644 --- a/src/unix/udp.c +++ b/src/unix/udp.c @@ -434,17 +434,20 @@ static void uv__udp_sendmsg(uv_udp_t* handle) { } /* On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional - * refinements for programs that use multicast. + * refinements for programs that use multicast. Therefore we preferentially + * set SO_REUSEPORT over SO_REUSEADDR here, but we set SO_REUSEPORT only + * when that socket option doesn't have the capability of load balancing. + * Otherwise, we fall back to SO_REUSEADDR. * - * Linux as of 3.9 and DragonflyBSD 3.6 have the SO_REUSEPORT socket option but - * with semantics that are different from the BSDs: it _shares_ the port rather - * than steals it from the current listener. While useful, it's not something we - * can emulate on other platforms so we don't enable it. + * Linux as of 3.9, DragonflyBSD 3.6, AIX 7.2.5 have the SO_REUSEPORT socket + * option but with semantics that are different from the BSDs: it _shares_ + * the port rather than steals it from the current listener. While useful, + * it's not something we can emulate on other platforms so we don't enable it. * * zOS does not support getsockname with SO_REUSEPORT option when using * AF_UNIX. */ -static int uv__set_reuse(int fd) { +static int uv__sock_reuseaddr(int fd) { int yes; yes = 1; @@ -461,7 +464,7 @@ static int uv__set_reuse(int fd) { return UV__ERR(errno); } #elif defined(SO_REUSEPORT) && !defined(__linux__) && !defined(__GNU__) && \ - !defined(__sun__) && !defined(__DragonFly__) + !defined(__sun__) && !defined(__DragonFly__) && !defined(_AIX73) if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes))) return UV__ERR(errno); #else @@ -504,7 +507,8 @@ int uv__udp_bind(uv_udp_t* handle, int fd; /* Check for bad flags. */ - if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | UV_UDP_LINUX_RECVERR)) + if (flags & ~(UV_UDP_IPV6ONLY | UV_UDP_REUSEADDR | + UV_UDP_REUSEPORT | UV_UDP_LINUX_RECVERR)) return UV_EINVAL; /* Cannot set IPv6-only mode on non-IPv6 socket. */ @@ -527,7 +531,13 @@ int uv__udp_bind(uv_udp_t* handle, } if (flags & UV_UDP_REUSEADDR) { - err = uv__set_reuse(fd); + err = uv__sock_reuseaddr(fd); + if (err) + return err; + } + + if (flags & UV_UDP_REUSEPORT) { + err = uv__sock_reuseport(fd); if (err) return err; } @@ -1049,7 +1059,7 @@ int uv_udp_open(uv_udp_t* handle, uv_os_sock_t sock) { if (err) return err; - err = uv__set_reuse(sock); + err = uv__sock_reuseaddr(sock); if (err) return err; diff --git a/src/win/udp.c b/src/win/udp.c index eab53842..16531195 100644 --- a/src/win/udp.c +++ b/src/win/udp.c @@ -200,6 +200,12 @@ static int uv__udp_maybe_bind(uv_udp_t* handle, if (handle->flags & UV_HANDLE_BOUND) return 0; + /* There is no SO_REUSEPORT on Windows, Windows only knows SO_REUSEADDR. + * so we just return an error directly when UV_UDP_REUSEPORT is requested + * for binding the socket. */ + if (flags & UV_UDP_REUSEPORT) + return ERROR_NOT_SUPPORTED; + if ((flags & UV_UDP_IPV6ONLY) && addr->sa_family != AF_INET6) { /* UV_UDP_IPV6ONLY is supported only for IPV6 sockets */ return ERROR_INVALID_PARAMETER; diff --git a/test/test-list.h b/test/test-list.h index ad4593d8..d600aef2 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -190,6 +190,7 @@ TEST_DECLARE (udp_open_twice) TEST_DECLARE (udp_open_bound) TEST_DECLARE (udp_open_connect) TEST_DECLARE (udp_recv_in_a_row) +TEST_DECLARE (udp_reuseport) #ifndef _WIN32 TEST_DECLARE (udp_send_unix) #endif @@ -804,6 +805,7 @@ TASK_LIST_START TEST_ENTRY (udp_sendmmsg_error) TEST_ENTRY (udp_try_send) TEST_ENTRY (udp_recv_in_a_row) + TEST_ENTRY (udp_reuseport) TEST_ENTRY (udp_open) TEST_ENTRY (udp_open_twice) diff --git a/test/test-tcp-reuseport.c b/test/test-tcp-reuseport.c index 2e5adec2..f108b9bb 100644 --- a/test/test-tcp-reuseport.c +++ b/test/test-tcp-reuseport.c @@ -69,8 +69,8 @@ static unsigned int thread_loop2_accepted; static unsigned int connected; static uv_loop_t* main_loop; -static uv_loop_t* thread_loop1; -static uv_loop_t* thread_loop2; +static uv_loop_t thread_loop1; +static uv_loop_t thread_loop2; static uv_tcp_t thread_handle1; static uv_tcp_t thread_handle2; static uv_timer_t thread_timer_handle1; @@ -92,9 +92,9 @@ static void ticktack(uv_timer_t* timer) { if (done) { uv_close((uv_handle_t*) timer, NULL); - if (timer->loop == thread_loop1) + if (timer->loop == &thread_loop1) uv_close((uv_handle_t*) &thread_handle1, NULL); - if (timer->loop == thread_loop2) + if (timer->loop == &thread_loop2) uv_close((uv_handle_t*) &thread_handle2, NULL); } } @@ -110,10 +110,10 @@ static void on_connection(uv_stream_t* server, int status) ASSERT_OK(uv_accept(server, (uv_stream_t*) client)); uv_close((uv_handle_t*) client, on_close); - if (server->loop == thread_loop1) + if (server->loop == &thread_loop1) thread_loop1_accepted++; - if (server->loop == thread_loop2) + if (server->loop == &thread_loop2) thread_loop2_accepted++; uv_mutex_lock(&mutex); @@ -130,17 +130,6 @@ static void on_connect(uv_connect_t* req, int status) { uv_close((uv_handle_t*) req->handle, NULL); } -static void run_event_loop(void* arg) { - int r; - uv_loop_t* loop = (uv_loop_t*) arg; - ASSERT(loop == thread_loop1 || loop == thread_loop2); - - /* Notify the main thread to start connecting. */ - uv_sem_post(&semaphore); - r = uv_run(loop, UV_RUN_DEFAULT); - ASSERT_OK(r); -} - static void create_listener(uv_loop_t* loop, uv_tcp_t* handle) { struct sockaddr_in addr; int r; @@ -157,12 +146,40 @@ static void create_listener(uv_loop_t* loop, uv_tcp_t* handle) { ASSERT_OK(r); } +static void run_event_loop(void* arg) { + int r; + uv_tcp_t* handle; + uv_timer_t* timer; + uv_loop_t* loop = (uv_loop_t*) arg; + ASSERT(loop == &thread_loop1 || loop == &thread_loop2); + + if (loop == &thread_loop1) { + handle = &thread_handle1; + timer = &thread_timer_handle1; + } else { + handle = &thread_handle2; + timer = &thread_timer_handle2; + } + + create_listener(loop, handle); + r = uv_timer_init(loop, timer); + ASSERT_OK(r); + r = uv_timer_start(timer, ticktack, 0, 10); + ASSERT_OK(r); + + /* Notify the main thread to start connecting. */ + uv_sem_post(&semaphore); + r = uv_run(loop, UV_RUN_DEFAULT); + ASSERT_OK(r); +} + TEST_IMPL(tcp_reuseport) { struct sockaddr_in addr; int r; int i; r = uv_mutex_init(&mutex); + ASSERT_OK(r); r = uv_sem_init(&semaphore, 0); ASSERT_OK(r); @@ -170,25 +187,13 @@ TEST_IMPL(tcp_reuseport) { main_loop = uv_default_loop(); ASSERT_NOT_NULL(main_loop); - /* Create listener per event loop. */ - - thread_loop1 = uv_loop_new(); - ASSERT_NOT_NULL(thread_loop1); - create_listener(thread_loop1, &thread_handle1); - uv_timer_init(thread_loop1, &thread_timer_handle1); - uv_timer_start(&thread_timer_handle1, ticktack, 0, 10); - - thread_loop2 = uv_loop_new(); - ASSERT_NOT_NULL(thread_loop2); - create_listener(thread_loop2, &thread_handle2); - uv_timer_init(thread_loop2, &thread_timer_handle2); - uv_timer_start(&thread_timer_handle2, ticktack, 0, 10); - /* Run event loops of listeners in separate threads. */ + uv_loop_init(&thread_loop1); + uv_loop_init(&thread_loop2); uv_thread_t thread_loop_id1; uv_thread_t thread_loop_id2; - uv_thread_create(&thread_loop_id1, run_event_loop, thread_loop1); - uv_thread_create(&thread_loop_id2, run_event_loop, thread_loop2); + uv_thread_create(&thread_loop_id1, run_event_loop, &thread_loop1); + uv_thread_create(&thread_loop_id2, run_event_loop, &thread_loop2); /* Wait until all threads to poll for accepting connections * before we start to connect. Otherwise the incoming connections @@ -233,8 +238,8 @@ TEST_IMPL(tcp_reuseport) { uv_sem_destroy(&semaphore); - uv_loop_delete(thread_loop1); - uv_loop_delete(thread_loop2); + uv_loop_close(&thread_loop1); + uv_loop_close(&thread_loop2); MAKE_VALGRIND_HAPPY(main_loop); return 0; diff --git a/test/test-udp-reuseport.c b/test/test-udp-reuseport.c new file mode 100644 index 00000000..7d4db408 --- /dev/null +++ b/test/test-udp-reuseport.c @@ -0,0 +1,287 @@ +/* Copyright libuv project contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#if !defined(__linux__) && !defined(__FreeBSD__) && \ + !defined(__DragonFly__) && !defined(__sun) && !defined(_AIX73) + +TEST_IMPL(udp_reuseport) { + struct sockaddr_in addr1, addr2, addr3; + uv_loop_t* loop; + uv_udp_t handle1, handle2, handle3; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr1)); + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_2, &addr2)); + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT_3, &addr3)); + + loop = uv_default_loop(); + ASSERT_NOT_NULL(loop); + + r = uv_udp_init(loop, &handle1); + ASSERT_OK(r); + + r = uv_udp_bind(&handle1, (const struct sockaddr*) &addr1, UV_UDP_REUSEADDR); + ASSERT_OK(r); + + r = uv_udp_init(loop, &handle2); + ASSERT_OK(r); + + r = uv_udp_bind(&handle2, (const struct sockaddr*) &addr2, UV_UDP_REUSEPORT); + ASSERT_EQ(r, UV_ENOTSUP); + + r = uv_udp_init(loop, &handle3); + ASSERT_OK(r); + + /* For platforms where SO_REUSEPORTs don't have the capability of + * load balancing, specifying both UV_UDP_REUSEADDR and UV_UDP_REUSEPORT + * in flags will fail, returning an UV_ENOTSUP error. */ + r = uv_udp_bind(&handle3, (const struct sockaddr*) &addr3, + UV_UDP_REUSEADDR | UV_UDP_REUSEPORT); + ASSERT_EQ(r, UV_ENOTSUP); + + MAKE_VALGRIND_HAPPY(loop); + + return 0; +} + +#else + +#define NUM_RECEIVING_THREADS 2 +#define MAX_UDP_DATAGRAMS 10 + +static uv_udp_t udp_send_handles[MAX_UDP_DATAGRAMS]; +static uv_udp_send_t udp_send_requests[MAX_UDP_DATAGRAMS]; + +static uv_sem_t semaphore; + +static uv_mutex_t mutex; +static unsigned int received; + +static unsigned int thread_loop1_recv; +static unsigned int thread_loop2_recv; +static unsigned int sent; + +static uv_loop_t* main_loop; +static uv_loop_t thread_loop1; +static uv_loop_t thread_loop2; +static uv_udp_t thread_handle1; +static uv_udp_t thread_handle2; +static uv_timer_t thread_timer_handle1; +static uv_timer_t thread_timer_handle2; + +static void alloc_cb(uv_handle_t* handle, + size_t suggested_size, + uv_buf_t* buf) { + buf->base = malloc(suggested_size); + buf->len = (int) suggested_size; +} + +static void ticktack(uv_timer_t* timer) { + int done = 0; + + ASSERT(timer == &thread_timer_handle1 || timer == &thread_timer_handle2); + + uv_mutex_lock(&mutex); + if (received == MAX_UDP_DATAGRAMS) { + done = 1; + } + uv_mutex_unlock(&mutex); + + if (done) { + uv_close((uv_handle_t*) timer, NULL); + if (timer->loop == &thread_loop1) + uv_close((uv_handle_t*) &thread_handle1, NULL); + if (timer->loop == &thread_loop2) + uv_close((uv_handle_t*) &thread_handle2, NULL); + } +} + +static void on_recv(uv_udp_t* handle, + ssize_t nr, + const uv_buf_t* buf, + const struct sockaddr* addr, + unsigned flags) { + ASSERT_OK(flags); + ASSERT(handle == &thread_handle1 || handle == &thread_handle2); + + ASSERT_GE(nr, 0); + + if (nr == 0) { + ASSERT_NULL(addr); + free(buf->base); + return; + } + + ASSERT_NOT_NULL(addr); + ASSERT_EQ(5, nr); + ASSERT(!memcmp("Hello", buf->base, nr)); + free(buf->base); + + if (handle->loop == &thread_loop1) + thread_loop1_recv++; + + if (handle->loop == &thread_loop2) + thread_loop2_recv++; + + uv_mutex_lock(&mutex); + received++; + uv_mutex_unlock(&mutex); +} + +static void on_send(uv_udp_send_t* req, int status) { + ASSERT_OK(status); + ASSERT_PTR_EQ(req->handle->loop, main_loop); + + if (++sent == MAX_UDP_DATAGRAMS) + uv_close((uv_handle_t*) req->handle, NULL); +} + +static void bind_socket_and_prepare_recv(uv_loop_t* loop, uv_udp_t* handle) { + struct sockaddr_in addr; + int r; + + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + + r = uv_udp_init(loop, handle); + ASSERT_OK(r); + + /* For platforms where SO_REUSEPORTs have the capability of + * load balancing, specifying both UV_UDP_REUSEADDR and + * UV_UDP_REUSEPORT in flags is allowed and SO_REUSEPORT will + * always override the behavior of SO_REUSEADDR. */ + r = uv_udp_bind(handle, (const struct sockaddr*) &addr, + UV_UDP_REUSEADDR | UV_UDP_REUSEPORT); + ASSERT_OK(r); + + r = uv_udp_recv_start(handle, alloc_cb, on_recv); + ASSERT_OK(r); +} + +static void run_event_loop(void* arg) { + int r; + uv_udp_t* handle; + uv_timer_t* timer; + uv_loop_t* loop = (uv_loop_t*) arg; + ASSERT(loop == &thread_loop1 || loop == &thread_loop2); + + if (loop == &thread_loop1) { + handle = &thread_handle1; + timer = &thread_timer_handle1; + } else { + handle = &thread_handle2; + timer = &thread_timer_handle2; + } + + bind_socket_and_prepare_recv(loop, handle); + r = uv_timer_init(loop, timer); + ASSERT_OK(r); + r = uv_timer_start(timer, ticktack, 0, 10); + ASSERT_OK(r); + + /* Notify the main thread to start sending data. */ + uv_sem_post(&semaphore); + r = uv_run(loop, UV_RUN_DEFAULT); + ASSERT_OK(r); +} + +TEST_IMPL(udp_reuseport) { + struct sockaddr_in addr; + uv_buf_t buf; + int r; + int i; + + r = uv_mutex_init(&mutex); + ASSERT_OK(r); + + r = uv_sem_init(&semaphore, 0); + ASSERT_OK(r); + + main_loop = uv_default_loop(); + ASSERT_NOT_NULL(main_loop); + + /* Run event loops of receiving sockets in separate threads. */ + uv_loop_init(&thread_loop1); + uv_loop_init(&thread_loop2); + uv_thread_t thread_loop_id1; + uv_thread_t thread_loop_id2; + uv_thread_create(&thread_loop_id1, run_event_loop, &thread_loop1); + uv_thread_create(&thread_loop_id2, run_event_loop, &thread_loop2); + + /* Wait until all threads to poll for receiving datagrams + * before we start to sending. Otherwise the incoming datagrams + * might not be distributed across all receiving threads. */ + for (i = 0; i < NUM_RECEIVING_THREADS; i++) + uv_sem_wait(&semaphore); + /* Now we know all threads are up and entering the uv_run(), + * but we still sleep a little bit just for dual fail-safe. */ + uv_sleep(100); + + /* Start sending datagrams to the peers. */ + buf = uv_buf_init("Hello", 5); + ASSERT_OK(uv_ip4_addr("127.0.0.1", TEST_PORT, &addr)); + for (i = 0; i < MAX_UDP_DATAGRAMS; i++) { + r = uv_udp_init(main_loop, &udp_send_handles[i]); + ASSERT_OK(r); + r = uv_udp_send(&udp_send_requests[i], + &udp_send_handles[i], + &buf, + 1, + (const struct sockaddr*) &addr, + on_send); + ASSERT_OK(r); + } + + r = uv_run(main_loop, UV_RUN_DEFAULT); + ASSERT_OK(r); + + /* Wait for all threads to exit. */ + uv_thread_join(&thread_loop_id1); + uv_thread_join(&thread_loop_id2); + + /* Verify if each receiving socket per event loop received datagrams + * and the amount of received datagrams matches the one of sent datagrams. + */ + ASSERT_EQ(received, MAX_UDP_DATAGRAMS); + ASSERT_EQ(sent, MAX_UDP_DATAGRAMS); + ASSERT_GT(thread_loop1_recv, 0); + ASSERT_GT(thread_loop2_recv, 0); + ASSERT_EQ(thread_loop1_recv + thread_loop2_recv, sent); + + /* Clean up. */ + uv_mutex_destroy(&mutex); + + uv_sem_destroy(&semaphore); + + uv_loop_close(&thread_loop1); + uv_loop_close(&thread_loop2); + MAKE_VALGRIND_HAPPY(main_loop); + + return 0; +} + +#endif