bench: add tcp accept benchmarks
This commit is contained in:
parent
cc1c1912ca
commit
b74b1c4c56
@ -33,6 +33,10 @@ BENCHMARK_DECLARE (tcp_pump1_client)
|
||||
BENCHMARK_DECLARE (pipe_pump100_client)
|
||||
BENCHMARK_DECLARE (pipe_pump1_client)
|
||||
|
||||
BENCHMARK_DECLARE (tcp_multi_accept2)
|
||||
BENCHMARK_DECLARE (tcp_multi_accept4)
|
||||
BENCHMARK_DECLARE (tcp_multi_accept8)
|
||||
|
||||
/* Run until X packets have been sent/received. */
|
||||
BENCHMARK_DECLARE (udp_pummel_1v1)
|
||||
BENCHMARK_DECLARE (udp_pummel_1v10)
|
||||
@ -112,6 +116,10 @@ TASK_LIST_START
|
||||
BENCHMARK_ENTRY (pipe_pound_1000)
|
||||
BENCHMARK_HELPER (pipe_pound_1000, pipe_echo_server)
|
||||
|
||||
BENCHMARK_ENTRY (tcp_multi_accept2)
|
||||
BENCHMARK_ENTRY (tcp_multi_accept4)
|
||||
BENCHMARK_ENTRY (tcp_multi_accept8)
|
||||
|
||||
BENCHMARK_ENTRY (udp_pummel_1v1)
|
||||
BENCHMARK_ENTRY (udp_pummel_1v10)
|
||||
BENCHMARK_ENTRY (udp_pummel_1v100)
|
||||
|
||||
409
test/benchmark-multi-accept.c
Normal file
409
test/benchmark-multi-accept.c
Normal file
@ -0,0 +1,409 @@
|
||||
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to
|
||||
* deal in the Software without restriction, including without limitation the
|
||||
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
* sell copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
* IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "task.h"
|
||||
#include "uv.h"
|
||||
|
||||
#define IPC_PIPE_NAME TEST_PIPENAME
|
||||
#define NUM_CONNECTS (250 * 1000)
|
||||
|
||||
union stream_handle {
|
||||
uv_pipe_t pipe;
|
||||
uv_tcp_t tcp;
|
||||
};
|
||||
|
||||
/* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it
|
||||
* avoids aliasing warnings.
|
||||
*/
|
||||
typedef unsigned char handle_storage_t[sizeof(union stream_handle)];
|
||||
|
||||
/* Used for passing around the listen handle, not part of the benchmark proper.
|
||||
* We have an overabundance of server types here. It works like this:
|
||||
*
|
||||
* 1. The main thread starts an IPC pipe server.
|
||||
* 2. The worker threads connect to the IPC server and obtain a listen handle.
|
||||
* 3. The worker threads start accepting requests on the listen handle.
|
||||
* 4. The main thread starts connecting repeatedly.
|
||||
*
|
||||
* Step #4 should perhaps be farmed out over several threads.
|
||||
*/
|
||||
struct ipc_server_ctx {
|
||||
handle_storage_t server_handle;
|
||||
unsigned int num_connects;
|
||||
uv_pipe_t ipc_pipe;
|
||||
};
|
||||
|
||||
struct ipc_peer_ctx {
|
||||
handle_storage_t peer_handle;
|
||||
uv_write_t write_req;
|
||||
};
|
||||
|
||||
struct ipc_client_ctx {
|
||||
uv_connect_t connect_req;
|
||||
uv_stream_t* server_handle;
|
||||
uv_pipe_t ipc_pipe;
|
||||
char scratch[16];
|
||||
};
|
||||
|
||||
/* Used in the actual benchmark. */
|
||||
struct server_ctx {
|
||||
handle_storage_t server_handle;
|
||||
unsigned int num_connects;
|
||||
uv_async_t async_handle;
|
||||
uv_thread_t thread_id;
|
||||
uv_sem_t semaphore;
|
||||
};
|
||||
|
||||
struct client_ctx {
|
||||
handle_storage_t client_handle;
|
||||
unsigned int num_connects;
|
||||
uv_connect_t connect_req;
|
||||
};
|
||||
|
||||
static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status);
|
||||
static void ipc_write_cb(uv_write_t* req, int status);
|
||||
static void ipc_close_cb(uv_handle_t* handle);
|
||||
static void ipc_connect_cb(uv_connect_t* req, int status);
|
||||
static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
|
||||
ssize_t nread,
|
||||
uv_buf_t buf,
|
||||
uv_handle_type type);
|
||||
static uv_buf_t ipc_alloc_cb(uv_handle_t* handle, size_t suggested_size);
|
||||
|
||||
static void sv_async_cb(uv_async_t* handle, int status);
|
||||
static void sv_connection_cb(uv_stream_t* server_handle, int status);
|
||||
|
||||
static void cl_connect_cb(uv_connect_t* req, int status);
|
||||
static void cl_close_cb(uv_handle_t* handle);
|
||||
|
||||
static struct sockaddr_in listen_addr;
|
||||
|
||||
|
||||
static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) {
|
||||
struct ipc_server_ctx* sc;
|
||||
struct ipc_peer_ctx* pc;
|
||||
uv_loop_t* loop;
|
||||
uv_buf_t buf;
|
||||
|
||||
loop = ipc_pipe->loop;
|
||||
buf = uv_buf_init("PING", 4);
|
||||
sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe);
|
||||
pc = calloc(1, sizeof(*pc));
|
||||
ASSERT(pc != NULL);
|
||||
|
||||
if (ipc_pipe->type == UV_TCP)
|
||||
ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle));
|
||||
else if (ipc_pipe->type == UV_NAMED_PIPE)
|
||||
ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1));
|
||||
else
|
||||
ASSERT(0);
|
||||
|
||||
ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle));
|
||||
ASSERT(0 == uv_write2(&pc->write_req,
|
||||
(uv_stream_t*) &pc->peer_handle,
|
||||
&buf,
|
||||
1,
|
||||
(uv_stream_t*) &sc->server_handle,
|
||||
ipc_write_cb));
|
||||
|
||||
if (--sc->num_connects == 0)
|
||||
uv_close((uv_handle_t*) ipc_pipe, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void ipc_write_cb(uv_write_t* req, int status) {
|
||||
struct ipc_peer_ctx* ctx;
|
||||
ctx = container_of(req, struct ipc_peer_ctx, write_req);
|
||||
uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb);
|
||||
}
|
||||
|
||||
|
||||
static void ipc_close_cb(uv_handle_t* handle) {
|
||||
struct ipc_peer_ctx* ctx;
|
||||
ctx = container_of(handle, struct ipc_peer_ctx, peer_handle);
|
||||
free(ctx);
|
||||
}
|
||||
|
||||
|
||||
static void ipc_connect_cb(uv_connect_t* req, int status) {
|
||||
struct ipc_client_ctx* ctx;
|
||||
ctx = container_of(req, struct ipc_client_ctx, connect_req);
|
||||
ASSERT(0 == status);
|
||||
ASSERT(0 == uv_read2_start((uv_stream_t*) &ctx->ipc_pipe,
|
||||
ipc_alloc_cb,
|
||||
ipc_read2_cb));
|
||||
}
|
||||
|
||||
|
||||
static uv_buf_t ipc_alloc_cb(uv_handle_t* handle, size_t suggested_size) {
|
||||
struct ipc_client_ctx* ctx;
|
||||
ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe);
|
||||
return uv_buf_init(ctx->scratch, sizeof(ctx->scratch));
|
||||
}
|
||||
|
||||
|
||||
static void ipc_read2_cb(uv_pipe_t* ipc_pipe,
|
||||
ssize_t nread,
|
||||
uv_buf_t buf,
|
||||
uv_handle_type type) {
|
||||
struct ipc_client_ctx* ctx;
|
||||
uv_loop_t* loop;
|
||||
|
||||
ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe);
|
||||
loop = ipc_pipe->loop;
|
||||
|
||||
if (type == UV_TCP)
|
||||
ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle));
|
||||
else if (type == UV_NAMED_PIPE)
|
||||
ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0));
|
||||
else
|
||||
ASSERT(0);
|
||||
|
||||
ASSERT(0 == uv_accept((uv_stream_t*) &ctx->ipc_pipe, ctx->server_handle));
|
||||
uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL);
|
||||
}
|
||||
|
||||
|
||||
/* Set up an IPC pipe server that hands out listen sockets to the worker
|
||||
* threads. It's kind of cumbersome for such a simple operation, maybe we
|
||||
* should revive uv_import() and uv_export().
|
||||
*/
|
||||
static void send_listen_handles(uv_handle_type type,
|
||||
unsigned int num_servers,
|
||||
struct server_ctx* servers) {
|
||||
struct ipc_server_ctx ctx;
|
||||
uv_loop_t* loop;
|
||||
unsigned int i;
|
||||
|
||||
loop = uv_default_loop();
|
||||
ctx.num_connects = num_servers;
|
||||
|
||||
if (type == UV_TCP) {
|
||||
ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle));
|
||||
ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle, listen_addr));
|
||||
}
|
||||
else if (type == UV_NAMED_PIPE) {
|
||||
ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &ctx.server_handle, 0));
|
||||
ASSERT(0 == uv_pipe_bind((uv_pipe_t*) &ctx.server_handle, IPC_PIPE_NAME));
|
||||
}
|
||||
else
|
||||
ASSERT(0);
|
||||
|
||||
ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
|
||||
ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME));
|
||||
ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb));
|
||||
|
||||
for (i = 0; i < num_servers; i++)
|
||||
uv_sem_post(&servers[i].semaphore);
|
||||
|
||||
ASSERT(0 == uv_run(loop));
|
||||
uv_close((uv_handle_t*) &ctx.server_handle, NULL);
|
||||
ASSERT(0 == uv_run(loop));
|
||||
|
||||
for (i = 0; i < num_servers; i++)
|
||||
uv_sem_wait(&servers[i].semaphore);
|
||||
}
|
||||
|
||||
|
||||
static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) {
|
||||
struct ipc_client_ctx ctx;
|
||||
|
||||
ctx.server_handle = server_handle;
|
||||
ctx.server_handle->data = "server handle";
|
||||
|
||||
ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1));
|
||||
uv_pipe_connect(&ctx.connect_req,
|
||||
&ctx.ipc_pipe,
|
||||
IPC_PIPE_NAME,
|
||||
ipc_connect_cb);
|
||||
ASSERT(0 == uv_run(loop));
|
||||
}
|
||||
|
||||
|
||||
static void server_cb(void *arg) {
|
||||
struct server_ctx *ctx;
|
||||
uv_loop_t* loop;
|
||||
|
||||
ctx = arg;
|
||||
loop = uv_loop_new();
|
||||
ASSERT(loop != NULL);
|
||||
|
||||
ASSERT(0 == uv_async_init(loop, &ctx->async_handle, sv_async_cb));
|
||||
uv_unref((uv_handle_t*) &ctx->async_handle);
|
||||
|
||||
/* Wait until the main thread is ready. */
|
||||
uv_sem_wait(&ctx->semaphore);
|
||||
get_listen_handle(loop, (uv_stream_t*) &ctx->server_handle);
|
||||
uv_sem_post(&ctx->semaphore);
|
||||
|
||||
/* Now start the actual benchmark. */
|
||||
ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle,
|
||||
128,
|
||||
sv_connection_cb));
|
||||
ASSERT(0 == uv_run(loop));
|
||||
|
||||
uv_loop_delete(loop);
|
||||
}
|
||||
|
||||
|
||||
static void sv_async_cb(uv_async_t* handle, int status) {
|
||||
struct server_ctx* ctx;
|
||||
ctx = container_of(handle, struct server_ctx, async_handle);
|
||||
uv_close((uv_handle_t*) &ctx->server_handle, NULL);
|
||||
uv_close((uv_handle_t*) &ctx->async_handle, NULL);
|
||||
}
|
||||
|
||||
|
||||
static void sv_connection_cb(uv_stream_t* server_handle, int status) {
|
||||
handle_storage_t* storage;
|
||||
struct server_ctx* ctx;
|
||||
|
||||
ctx = container_of(server_handle, struct server_ctx, server_handle);
|
||||
ASSERT(status == 0);
|
||||
|
||||
storage = malloc(sizeof(*storage));
|
||||
ASSERT(storage != NULL);
|
||||
|
||||
if (server_handle->type == UV_TCP)
|
||||
ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage));
|
||||
else if (server_handle->type == UV_NAMED_PIPE)
|
||||
ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0));
|
||||
else
|
||||
ASSERT(0);
|
||||
|
||||
ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage));
|
||||
uv_close((uv_handle_t*) storage, (uv_close_cb) free);
|
||||
ctx->num_connects++;
|
||||
}
|
||||
|
||||
|
||||
static void cl_connect_cb(uv_connect_t* req, int status) {
|
||||
struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req);
|
||||
uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb);
|
||||
ASSERT(status == 0);
|
||||
}
|
||||
|
||||
|
||||
static void cl_close_cb(uv_handle_t* handle) {
|
||||
struct client_ctx* ctx;
|
||||
|
||||
ctx = container_of(handle, struct client_ctx, client_handle);
|
||||
|
||||
if (--ctx->num_connects == 0)
|
||||
return;
|
||||
|
||||
ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle));
|
||||
ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
|
||||
(uv_tcp_t*) &ctx->client_handle,
|
||||
listen_addr,
|
||||
cl_connect_cb));
|
||||
}
|
||||
|
||||
|
||||
static int test_tcp(unsigned int num_servers, unsigned int num_clients) {
|
||||
struct server_ctx* servers;
|
||||
struct client_ctx* clients;
|
||||
uv_loop_t* loop;
|
||||
uv_tcp_t* handle;
|
||||
unsigned int i;
|
||||
double time;
|
||||
|
||||
listen_addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
|
||||
loop = uv_default_loop();
|
||||
|
||||
servers = calloc(num_servers, sizeof(servers[0]));
|
||||
clients = calloc(num_clients, sizeof(clients[0]));
|
||||
ASSERT(servers != NULL);
|
||||
ASSERT(clients != NULL);
|
||||
|
||||
/* We're making the assumption here that from the perspective of the
|
||||
* OS scheduler, threads are functionally equivalent to and interchangeable
|
||||
* with full-blown processes.
|
||||
*/
|
||||
for (i = 0; i < num_servers; i++) {
|
||||
struct server_ctx* ctx = servers + i;
|
||||
ASSERT(0 == uv_sem_init(&ctx->semaphore, 0));
|
||||
ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx));
|
||||
}
|
||||
|
||||
send_listen_handles(UV_TCP, num_servers, servers);
|
||||
|
||||
for (i = 0; i < num_clients; i++) {
|
||||
struct client_ctx* ctx = clients + i;
|
||||
ctx->num_connects = NUM_CONNECTS / num_clients;
|
||||
handle = (uv_tcp_t*) &ctx->client_handle;
|
||||
handle->data = "client handle";
|
||||
ASSERT(0 == uv_tcp_init(loop, handle));
|
||||
ASSERT(0 == uv_tcp_connect(&ctx->connect_req,
|
||||
handle,
|
||||
listen_addr,
|
||||
cl_connect_cb));
|
||||
}
|
||||
|
||||
{
|
||||
uint64_t t = uv_hrtime();
|
||||
ASSERT(0 == uv_run(loop));
|
||||
t = uv_hrtime() - t;
|
||||
time = t / 1e9;
|
||||
}
|
||||
|
||||
for (i = 0; i < num_servers; i++) {
|
||||
struct server_ctx* ctx = servers + i;
|
||||
uv_async_send(&ctx->async_handle);
|
||||
ASSERT(0 == uv_thread_join(&ctx->thread_id));
|
||||
uv_sem_destroy(&ctx->semaphore);
|
||||
}
|
||||
|
||||
printf("accept%u: %.0f accepts/sec (%u total)\n",
|
||||
num_servers,
|
||||
NUM_CONNECTS / time,
|
||||
NUM_CONNECTS);
|
||||
|
||||
for (i = 0; i < num_servers; i++) {
|
||||
struct server_ctx* ctx = servers + i;
|
||||
printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n",
|
||||
i,
|
||||
ctx->num_connects / time,
|
||||
ctx->num_connects,
|
||||
ctx->num_connects * 100.0 / NUM_CONNECTS);
|
||||
}
|
||||
|
||||
free(clients);
|
||||
free(servers);
|
||||
uv_loop_delete(uv_default_loop()); /* Silence valgrind. */
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
BENCHMARK_IMPL(tcp_multi_accept2) {
|
||||
return test_tcp(2, 40);
|
||||
}
|
||||
|
||||
|
||||
BENCHMARK_IMPL(tcp_multi_accept4) {
|
||||
return test_tcp(4, 40);
|
||||
}
|
||||
|
||||
|
||||
BENCHMARK_IMPL(tcp_multi_accept8) {
|
||||
return test_tcp(8, 40);
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user