From f0ebf0c7fc060cf85d18b856df03a12a34eb64b5 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 27 May 2011 01:32:38 -0700 Subject: [PATCH] Split pump bench into two processes. And add a pump test with 1 client. --- test/benchmark-list.h | 10 +- test/benchmark-pump.c | 208 +++++++++++++++++++++++++++--------------- 2 files changed, 141 insertions(+), 77 deletions(-) diff --git a/test/benchmark-list.h b/test/benchmark-list.h index d90475e9..33ef97e2 100644 --- a/test/benchmark-list.h +++ b/test/benchmark-list.h @@ -21,7 +21,9 @@ BENCHMARK_DECLARE (sizes) BENCHMARK_DECLARE (ping_pongs) -BENCHMARK_DECLARE (pump) +BENCHMARK_DECLARE (pump100_client) +BENCHMARK_DECLARE (pump1_client) +HELPER_DECLARE (pump_server) HELPER_DECLARE (echo_server) TASK_LIST_START @@ -30,5 +32,9 @@ TASK_LIST_START BENCHMARK_ENTRY (ping_pongs) BENCHMARK_HELPER (ping_pongs, echo_server) - BENCHMARK_ENTRY (pump) + BENCHMARK_ENTRY (pump100_client) + BENCHMARK_HELPER (pump100_client, pump_server) + + BENCHMARK_ENTRY (pump1_client) + BENCHMARK_HELPER (pump1_client, pump_server) TASK_LIST_END diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index e9b3031c..72daa331 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -26,7 +26,7 @@ #include -#define TARGET_CONNECTIONS 100 +static int TARGET_CONNECTIONS; #define WRITE_BUFFER_SIZE 8192 #define MAX_SIMULTANEOUS_CONNECTS 100 @@ -45,12 +45,14 @@ static uv_buf_t buf_alloc(uv_handle_t* handle, size_t size); static void buf_free(uv_buf_t uv_buf_t); +static uv_handle_t server; static struct sockaddr_in listen_addr; static struct sockaddr_in connect_addr; static int64_t start_time; static int max_connect_socket = 0; +static int max_read_sockets = 0; static int read_sockets = 0; static int write_sockets = 0; @@ -63,8 +65,10 @@ static int stats_left = 0; static char write_buffer[WRITE_BUFFER_SIZE]; -static uv_handle_t read_handles[TARGET_CONNECTIONS]; -static uv_handle_t write_handles[TARGET_CONNECTIONS]; +/* Make this as large as you need. */ +#define MAX_WRITE_HANDLES 1000 + +static uv_handle_t write_handles[MAX_WRITE_HANDLES]; static uv_handle_t timer_handle; @@ -79,9 +83,8 @@ static void show_stats(uv_handle_t *handle, int status) { int64_t diff; #if PRINT_STATS - LOGF("connections: %d, read: %.1f gbit/s, write: %.1f gbit/s\n", - read_sockets, - gbit(nrecv, STATS_INTERVAL), + LOGF("connections: %d, write: %.1f gbit/s\n", + write_sockets, gbit(nsent, STATS_INTERVAL)); #endif @@ -91,8 +94,8 @@ static void show_stats(uv_handle_t *handle, int status) { uv_update_time(); diff = uv_now() - start_time; - LOGF("pump_%d: %.1f gbit/s\n", read_sockets, - gbit(nrecv_total, diff)); + LOGF("pump%d_client: %.1f gbit/s\n", write_sockets, + gbit(nsent_total, diff)); exit(0); } @@ -103,8 +106,38 @@ static void show_stats(uv_handle_t *handle, int status) { } -void close_cb(uv_handle_t* handle, int status) { +static void read_show_stats() { + int64_t diff; + + uv_update_time(); + diff = uv_now() - start_time; + + LOGF("pump%d_server: %.1f gbit/s\n", max_read_sockets, + gbit(nrecv_total, diff)); +} + + + +void write_sockets_close_cb(uv_handle_t* handle, int status) { ASSERT(status == 0); + /* If any client closes, the process is done. */ + exit(0); +} + + +void read_sockets_close_cb(uv_handle_t* handle, int status) { + ASSERT(status == 0); + + free(handle); + read_sockets--; + + /* If it's past the first second and everyone has closed their connection + * Then print stats. + */ + if (uv_now() - start_time > 1000 && read_sockets == 0) { + read_show_stats(); + uv_close(&server); + } } @@ -114,15 +147,27 @@ static void start_stats_collection() { /* Show-stats timer */ stats_left = STATS_COUNT; - r = uv_timer_init(&timer_handle, close_cb, NULL); + r = uv_timer_init(&timer_handle, NULL, NULL); ASSERT(r == 0); r = uv_timer_start(&timer_handle, show_stats, STATS_INTERVAL, STATS_INTERVAL); ASSERT(r == 0); + + uv_update_time(); + start_time = uv_now(); } static void read_cb(uv_handle_t* handle, int bytes, uv_buf_t buf) { - ASSERT(bytes >= 0); + if (nrecv_total == 0) { + ASSERT(start_time == 0); + uv_update_time(); + start_time = uv_now(); + } + + if (bytes < 0) { + uv_close(handle); + return; + } buf_free(buf); @@ -162,11 +207,19 @@ static void do_write(uv_handle_t* handle) { } } -static void maybe_start_writing() { + +static void connect_cb(uv_req_t* req, int status) { int i; - if (read_sockets == TARGET_CONNECTIONS && - write_sockets == TARGET_CONNECTIONS) { + if (status) LOG(uv_strerror(uv_last_error())); + ASSERT(status == 0); + + write_sockets++; + req_free(req); + + maybe_connect_some(); + + if (write_sockets == TARGET_CONNECTIONS) { start_stats_collection(); /* Yay! start writing */ @@ -177,86 +230,42 @@ static void maybe_start_writing() { } -static void connect_cb(uv_req_t* req, int status) { - if (status) LOG(uv_strerror(uv_last_error())); - ASSERT(status == 0); - - write_sockets++; - req_free(req); - - maybe_connect_some(); - maybe_start_writing(); -} - - -static void do_connect(uv_handle_t* handle, struct sockaddr* addr) { +static void maybe_connect_some() { uv_req_t* req; + uv_handle_t* handle; int r; - r = uv_tcp_init(handle, close_cb, NULL); - ASSERT(r == 0); - - req = req_alloc(); - uv_req_init(req, handle, connect_cb); - r = uv_connect(req, addr); - ASSERT(r == 0); -} - - -static void maybe_connect_some() { while (max_connect_socket < TARGET_CONNECTIONS && max_connect_socket < write_sockets + MAX_SIMULTANEOUS_CONNECTS) { - do_connect(&write_handles[max_connect_socket++], - (struct sockaddr*) &connect_addr); + handle = &write_handles[max_connect_socket++]; + + r = uv_tcp_init(handle, write_sockets_close_cb, NULL); + ASSERT(r == 0); + + req = req_alloc(); + uv_req_init(req, handle, connect_cb); + r = uv_connect(req, (struct sockaddr*) &connect_addr); + ASSERT(r == 0); } } -static void accept_cb(uv_handle_t* server) { +static void accept_cb(uv_handle_t* s) { uv_handle_t* handle; int r; - ASSERT(read_sockets < TARGET_CONNECTIONS); - handle = &read_handles[read_sockets]; + ASSERT(&server == s); - r = uv_accept(server, handle, close_cb, NULL); + handle = malloc(sizeof(uv_handle_t)); + + r = uv_accept(s, handle, read_sockets_close_cb, NULL); ASSERT(r == 0); r = uv_read_start(handle, read_cb); ASSERT(r == 0); read_sockets++; - - maybe_start_writing(); -} - - -BENCHMARK_IMPL(pump) { - uv_handle_t server; - int r; - - uv_init(buf_alloc); - - listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT); - connect_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); - - /* Server */ - r = uv_tcp_init(&server, close_cb, NULL); - ASSERT(r == 0); - r = uv_bind(&server, (struct sockaddr*) &listen_addr); - ASSERT(r == 0); - r = uv_listen(&server, TARGET_CONNECTIONS, accept_cb); - ASSERT(r == 0); - - uv_update_time(); - start_time = uv_now(); - - /* Start making connections */ - maybe_connect_some(); - - uv_run(); - - return 0; + max_read_sockets++; } @@ -331,3 +340,52 @@ static void buf_free(uv_buf_t uv_buf_t) { buf->next = buf_freelist; buf_freelist = buf; } + + +HELPER_IMPL(pump_server) { + int r; + + uv_init(buf_alloc); + listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT); + + /* Server */ + r = uv_tcp_init(&server, NULL, NULL); + ASSERT(r == 0); + r = uv_bind(&server, (struct sockaddr*) &listen_addr); + ASSERT(r == 0); + r = uv_listen(&server, MAX_WRITE_HANDLES, accept_cb); + ASSERT(r == 0); + + uv_run(); + + return 0; +} + + +void pump(int n) { + int r; + + ASSERT(n <= MAX_WRITE_HANDLES); + TARGET_CONNECTIONS = n; + + uv_init(buf_alloc); + + connect_addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + /* Start making connections */ + maybe_connect_some(); + + uv_run(); +} + + +BENCHMARK_IMPL(pump100_client) { + pump(100); + return 0; +} + + +BENCHMARK_IMPL(pump1_client) { + pump(1); + return 0; +}