win: optimize udp receive performance (#3807)
Do at most 32 nonblocking udp receive in a row. Fixes: https://github.com/libuv/libuv/issues/3704
This commit is contained in:
parent
8a1f378f05
commit
dff3f8ccab
@ -648,6 +648,7 @@ if(LIBUV_BUILD_TESTS)
|
||||
test/test-udp-sendmmsg-error.c
|
||||
test/test-udp-send-unreachable.c
|
||||
test/test-udp-try-send.c
|
||||
test/test-udp-recv-in-a-row.c
|
||||
test/test-uname.c
|
||||
test/test-walk-handles.c
|
||||
test/test-watcher-cross-stop.c)
|
||||
|
||||
@ -314,6 +314,7 @@ test_run_tests_SOURCES = test/blackhole-server.c \
|
||||
test/test-udp-sendmmsg-error.c \
|
||||
test/test-udp-send-unreachable.c \
|
||||
test/test-udp-try-send.c \
|
||||
test/test-udp-recv-in-a-row.c \
|
||||
test/test-uname.c \
|
||||
test/test-walk-handles.c \
|
||||
test/test-watcher-cross-stop.c
|
||||
|
||||
@ -441,57 +441,68 @@ void uv__process_udp_recv_req(uv_loop_t* loop, uv_udp_t* handle,
|
||||
DWORD bytes, err, flags;
|
||||
struct sockaddr_storage from;
|
||||
int from_len;
|
||||
int count;
|
||||
|
||||
/* Do a nonblocking receive.
|
||||
* TODO: try to read multiple datagrams at once. FIONREAD maybe? */
|
||||
buf = uv_buf_init(NULL, 0);
|
||||
handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
|
||||
if (buf.base == NULL || buf.len == 0) {
|
||||
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
|
||||
goto done;
|
||||
}
|
||||
assert(buf.base != NULL);
|
||||
/* Prevent loop starvation when the data comes in as fast as
|
||||
* (or faster than) we can read it. */
|
||||
count = 32;
|
||||
|
||||
memset(&from, 0, sizeof from);
|
||||
from_len = sizeof from;
|
||||
do {
|
||||
/* Do at most `count` nonblocking receive. */
|
||||
buf = uv_buf_init(NULL, 0);
|
||||
handle->alloc_cb((uv_handle_t*) handle, UV__UDP_DGRAM_MAXSIZE, &buf);
|
||||
if (buf.base == NULL || buf.len == 0) {
|
||||
handle->recv_cb(handle, UV_ENOBUFS, &buf, NULL, 0);
|
||||
goto done;
|
||||
}
|
||||
|
||||
flags = 0;
|
||||
memset(&from, 0, sizeof from);
|
||||
from_len = sizeof from;
|
||||
|
||||
if (WSARecvFrom(handle->socket,
|
||||
(WSABUF*)&buf,
|
||||
1,
|
||||
&bytes,
|
||||
&flags,
|
||||
(struct sockaddr*) &from,
|
||||
&from_len,
|
||||
NULL,
|
||||
NULL) != SOCKET_ERROR) {
|
||||
flags = 0;
|
||||
|
||||
/* Message received */
|
||||
handle->recv_cb(handle, bytes, &buf, (const struct sockaddr*) &from, 0);
|
||||
} else {
|
||||
err = WSAGetLastError();
|
||||
if (err == WSAEMSGSIZE) {
|
||||
/* Message truncated */
|
||||
handle->recv_cb(handle,
|
||||
bytes,
|
||||
&buf,
|
||||
(const struct sockaddr*) &from,
|
||||
UV_UDP_PARTIAL);
|
||||
} else if (err == WSAEWOULDBLOCK) {
|
||||
/* Kernel buffer empty */
|
||||
handle->recv_cb(handle, 0, &buf, NULL, 0);
|
||||
} else if (err == WSAECONNRESET || err == WSAENETRESET) {
|
||||
/* WSAECONNRESET/WSANETRESET is ignored because this just indicates
|
||||
* that a previous sendto operation failed.
|
||||
*/
|
||||
handle->recv_cb(handle, 0, &buf, NULL, 0);
|
||||
if (WSARecvFrom(handle->socket,
|
||||
(WSABUF*)&buf,
|
||||
1,
|
||||
&bytes,
|
||||
&flags,
|
||||
(struct sockaddr*) &from,
|
||||
&from_len,
|
||||
NULL,
|
||||
NULL) != SOCKET_ERROR) {
|
||||
|
||||
/* Message received */
|
||||
err = ERROR_SUCCESS;
|
||||
handle->recv_cb(handle, bytes, &buf, (const struct sockaddr*) &from, 0);
|
||||
} else {
|
||||
/* Any other error that we want to report back to the user. */
|
||||
uv_udp_recv_stop(handle);
|
||||
handle->recv_cb(handle, uv_translate_sys_error(err), &buf, NULL, 0);
|
||||
err = WSAGetLastError();
|
||||
if (err == WSAEMSGSIZE) {
|
||||
/* Message truncated */
|
||||
handle->recv_cb(handle,
|
||||
bytes,
|
||||
&buf,
|
||||
(const struct sockaddr*) &from,
|
||||
UV_UDP_PARTIAL);
|
||||
} else if (err == WSAEWOULDBLOCK) {
|
||||
/* Kernel buffer empty */
|
||||
handle->recv_cb(handle, 0, &buf, NULL, 0);
|
||||
} else if (err == WSAECONNRESET || err == WSAENETRESET) {
|
||||
/* WSAECONNRESET/WSANETRESET is ignored because this just indicates
|
||||
* that a previous sendto operation failed.
|
||||
*/
|
||||
handle->recv_cb(handle, 0, &buf, NULL, 0);
|
||||
} else {
|
||||
/* Any other error that we want to report back to the user. */
|
||||
uv_udp_recv_stop(handle);
|
||||
handle->recv_cb(handle, uv_translate_sys_error(err), &buf, NULL, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
while (err == ERROR_SUCCESS &&
|
||||
count-- > 0 &&
|
||||
/* The recv_cb callback may decide to pause or close the handle. */
|
||||
(handle->flags & UV_HANDLE_READING) &&
|
||||
!(handle->flags & UV_HANDLE_READ_PENDING));
|
||||
}
|
||||
|
||||
done:
|
||||
|
||||
@ -185,6 +185,7 @@ TEST_DECLARE (udp_open)
|
||||
TEST_DECLARE (udp_open_twice)
|
||||
TEST_DECLARE (udp_open_bound)
|
||||
TEST_DECLARE (udp_open_connect)
|
||||
TEST_DECLARE (udp_recv_in_a_row)
|
||||
#ifndef _WIN32
|
||||
TEST_DECLARE (udp_send_unix)
|
||||
#endif
|
||||
@ -770,6 +771,7 @@ TASK_LIST_START
|
||||
TEST_ENTRY (udp_multicast_ttl)
|
||||
TEST_ENTRY (udp_sendmmsg_error)
|
||||
TEST_ENTRY (udp_try_send)
|
||||
TEST_ENTRY (udp_recv_in_a_row)
|
||||
|
||||
TEST_ENTRY (udp_open)
|
||||
TEST_ENTRY (udp_open_twice)
|
||||
|
||||
121
test/test-udp-recv-in-a-row.c
Normal file
121
test/test-udp-recv-in-a-row.c
Normal file
@ -0,0 +1,121 @@
|
||||
/* Copyright The libuv project and contributors. All rights reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "uv.h"
|
||||
#include "task.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
|
||||
static uv_udp_t server;
|
||||
static uv_udp_t client;
|
||||
static uv_check_t check_handle;
|
||||
static uv_buf_t buf;
|
||||
static struct sockaddr_in addr;
|
||||
static char send_data[10];
|
||||
static int check_cb_called;
|
||||
|
||||
#define N 5
|
||||
static int recv_cnt;
|
||||
|
||||
static void alloc_cb(uv_handle_t* handle,
|
||||
size_t suggested_size,
|
||||
uv_buf_t* buf) {
|
||||
static char slab[sizeof(send_data)];
|
||||
buf->base = slab;
|
||||
buf->len = sizeof(slab);
|
||||
}
|
||||
|
||||
static void sv_recv_cb(uv_udp_t* handle,
|
||||
ssize_t nread,
|
||||
const uv_buf_t* rcvbuf,
|
||||
const struct sockaddr* addr,
|
||||
unsigned flags) {
|
||||
if (++ recv_cnt < N) {
|
||||
ASSERT_EQ(sizeof(send_data), nread);
|
||||
} else {
|
||||
ASSERT_EQ(0, nread);
|
||||
}
|
||||
}
|
||||
|
||||
static void check_cb(uv_check_t* handle) {
|
||||
ASSERT_PTR_EQ(&check_handle, handle);
|
||||
|
||||
/**
|
||||
* sv_recv_cb() is called with nread set to zero to indicate
|
||||
* there is no more udp packet in the kernel, so the actual
|
||||
* recv_cnt is one larger than N.
|
||||
*/
|
||||
ASSERT_EQ(N+1, recv_cnt);
|
||||
check_cb_called = 1;
|
||||
|
||||
/* we are done */
|
||||
ASSERT_EQ(0, uv_check_stop(handle));
|
||||
uv_close((uv_handle_t*) &client, NULL);
|
||||
uv_close((uv_handle_t*) &check_handle, NULL);
|
||||
uv_close((uv_handle_t*) &server, NULL);
|
||||
}
|
||||
|
||||
|
||||
TEST_IMPL(udp_recv_in_a_row) {
|
||||
int i, r;
|
||||
|
||||
ASSERT_EQ(0, uv_check_init(uv_default_loop(), &check_handle));
|
||||
ASSERT_EQ(0, uv_check_start(&check_handle, check_cb));
|
||||
|
||||
ASSERT_EQ(0, uv_ip4_addr("127.0.0.1", TEST_PORT, &addr));
|
||||
|
||||
ASSERT_EQ(0, uv_udp_init(uv_default_loop(), &server));
|
||||
ASSERT_EQ(0, uv_udp_bind(&server, (const struct sockaddr*) &addr, 0));
|
||||
ASSERT_EQ(0, uv_udp_recv_start(&server, alloc_cb, sv_recv_cb));
|
||||
|
||||
ASSERT_EQ(0, uv_udp_init(uv_default_loop(), &client));
|
||||
|
||||
/* send N-1 udp packets */
|
||||
buf = uv_buf_init(send_data, sizeof(send_data));
|
||||
for (i = 0; i < N - 1; i ++) {
|
||||
r = uv_udp_try_send(&client,
|
||||
&buf,
|
||||
1,
|
||||
(const struct sockaddr*) &addr);
|
||||
ASSERT_EQ(sizeof(send_data), r);
|
||||
}
|
||||
|
||||
/* send an empty udp packet */
|
||||
buf = uv_buf_init(NULL, 0);
|
||||
r = uv_udp_try_send(&client,
|
||||
&buf,
|
||||
1,
|
||||
(const struct sockaddr*) &addr);
|
||||
ASSERT_EQ(0, r);
|
||||
|
||||
/* check_cb() asserts that the N packets can be received
|
||||
* before it gets called.
|
||||
*/
|
||||
|
||||
ASSERT_EQ(0, uv_run(uv_default_loop(), UV_RUN_DEFAULT));
|
||||
|
||||
ASSERT(check_cb_called);
|
||||
|
||||
MAKE_VALGRIND_HAPPY();
|
||||
return 0;
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user