Benchmarks use the improved request api

This commit is contained in:
Bert Belder 2011-07-14 03:28:47 +02:00 committed by Ryan Dahl
parent 2f83928b6f
commit 902dd567b1
3 changed files with 36 additions and 49 deletions

View File

@ -34,8 +34,8 @@ typedef struct {
int pongs; int pongs;
int state; int state;
uv_tcp_t tcp; uv_tcp_t tcp;
uv_req_t connect_req; uv_connect_t connect_req;
uv_req_t shutdown_req; uv_shutdown_t shutdown_req;
} pinger_t; } pinger_t;
typedef struct buf_s { typedef struct buf_s {
@ -90,7 +90,7 @@ static void pinger_close_cb(uv_handle_t* handle) {
} }
static void pinger_write_cb(uv_req_t *req, int status) { static void pinger_write_cb(uv_write_t* req, int status) {
ASSERT(status == 0); ASSERT(status == 0);
free(req); free(req);
@ -98,22 +98,20 @@ static void pinger_write_cb(uv_req_t *req, int status) {
static void pinger_write_ping(pinger_t* pinger) { static void pinger_write_ping(pinger_t* pinger) {
uv_req_t *req; uv_write_t* req;
uv_buf_t buf; uv_buf_t buf;
buf.base = (char*)&PING; buf.base = (char*)&PING;
buf.len = strlen(PING); buf.len = strlen(PING);
req = (uv_req_t*)malloc(sizeof(*req)); req = malloc(sizeof *req);
uv_req_init(req, (uv_handle_t*)(&pinger->tcp), pinger_write_cb); if (uv_write(req, (uv_stream_t*) &pinger->tcp, &buf, 1, pinger_write_cb)) {
if (uv_write(req, &buf, 1)) {
FATAL("uv_write failed"); FATAL("uv_write failed");
} }
} }
static void pinger_shutdown_cb(uv_handle_t* handle, int status) { static void pinger_shutdown_cb(uv_shutdown_t* req, int status) {
ASSERT(status == 0); ASSERT(status == 0);
pinger_shutdown_cb_called++; pinger_shutdown_cb_called++;
@ -151,8 +149,7 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
if (pinger->state == 0) { if (pinger->state == 0) {
pinger->pongs++; pinger->pongs++;
if (uv_now() - start_time > TIME) { if (uv_now() - start_time > TIME) {
uv_req_init(&pinger->shutdown_req, (uv_handle_t*)tcp, pinger_shutdown_cb); uv_shutdown(&pinger->shutdown_req, (uv_stream_t*) tcp, pinger_shutdown_cb);
uv_shutdown(&pinger->shutdown_req);
break; break;
} else { } else {
pinger_write_ping(pinger); pinger_write_ping(pinger);
@ -164,14 +161,14 @@ static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
} }
static void pinger_connect_cb(uv_req_t *req, int status) { static void pinger_connect_cb(uv_connect_t* req, int status) {
pinger_t *pinger = (pinger_t*)req->handle->data; pinger_t *pinger = (pinger_t*)req->handle->data;
ASSERT(status == 0); ASSERT(status == 0);
pinger_write_ping(pinger); pinger_write_ping(pinger);
if (uv_read_start((uv_stream_t*)(req->handle), buf_alloc, pinger_read_cb)) { if (uv_read_start(req->handle, buf_alloc, pinger_read_cb)) {
FATAL("uv_read_start failed"); FATAL("uv_read_start failed");
} }
} }
@ -193,13 +190,9 @@ static void pinger_new() {
pinger->tcp.data = pinger; pinger->tcp.data = pinger;
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)&pinger->tcp,
pinger_connect_cb);
uv_tcp_bind(&pinger->tcp, client_addr); uv_tcp_bind(&pinger->tcp, client_addr);
r = uv_tcp_connect(&pinger->connect_req, server_addr);
r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, pinger_connect_cb);
ASSERT(!r); ASSERT(!r);
} }

View File

@ -183,12 +183,12 @@ static void read_cb(uv_stream_t* stream, ssize_t bytes, uv_buf_t buf) {
} }
static void write_cb(uv_req_t *req, int status) { static void write_cb(uv_write_t* req, int status) {
uv_buf_t* buf = (uv_buf_t*) req->data; uv_buf_t* buf = (uv_buf_t*) req->data;
ASSERT(status == 0); ASSERT(status == 0);
req_free(req); req_free((uv_req_t*) req);
nsent += sizeof write_buffer; nsent += sizeof write_buffer;
nsent_total += sizeof write_buffer; nsent_total += sizeof write_buffer;
@ -198,7 +198,7 @@ static void write_cb(uv_req_t *req, int status) {
static void do_write(uv_stream_t* stream) { static void do_write(uv_stream_t* stream) {
uv_req_t* req; uv_write_t* req;
uv_buf_t buf; uv_buf_t buf;
int r; int r;
@ -206,23 +206,21 @@ static void do_write(uv_stream_t* stream) {
buf.len = sizeof write_buffer; buf.len = sizeof write_buffer;
while (stream->write_queue_size == 0) { while (stream->write_queue_size == 0) {
req = req_alloc(); req = (uv_write_t*) req_alloc();
uv_req_init(req, (uv_handle_t*)stream, write_cb); r = uv_write(req, stream, &buf, 1, write_cb);
r = uv_write(req, &buf, 1);
ASSERT(r == 0); ASSERT(r == 0);
} }
} }
static void connect_cb(uv_req_t* req, int status) { static void connect_cb(uv_connect_t* req, int status) {
int i; int i;
if (status) LOG(uv_strerror(uv_last_error())); if (status) LOG(uv_strerror(uv_last_error()));
ASSERT(status == 0); ASSERT(status == 0);
write_sockets++; write_sockets++;
req_free(req); req_free((uv_req_t*) req);
maybe_connect_some(); maybe_connect_some();
@ -238,7 +236,7 @@ static void connect_cb(uv_req_t* req, int status) {
static void maybe_connect_some() { static void maybe_connect_some() {
uv_req_t* req; uv_connect_t* req;
uv_tcp_t* tcp; uv_tcp_t* tcp;
uv_pipe_t* pipe; uv_pipe_t* pipe;
int r; int r;
@ -251,9 +249,8 @@ static void maybe_connect_some() {
r = uv_tcp_init(tcp); r = uv_tcp_init(tcp);
ASSERT(r == 0); ASSERT(r == 0);
req = req_alloc(); req = (uv_connect_t*) req_alloc();
uv_req_init(req, (uv_handle_t*)tcp, connect_cb); r = uv_tcp_connect(req, tcp, connect_addr, connect_cb);
r = uv_tcp_connect(req, connect_addr);
ASSERT(r == 0); ASSERT(r == 0);
} else { } else {
pipe = &pipe_write_handles[max_connect_socket++]; pipe = &pipe_write_handles[max_connect_socket++];
@ -261,9 +258,8 @@ static void maybe_connect_some() {
r = uv_pipe_init(pipe); r = uv_pipe_init(pipe);
ASSERT(r == 0); ASSERT(r == 0);
req = req_alloc(); req = (uv_connect_t*) req_alloc();
uv_req_init(req, (uv_handle_t*)pipe, connect_cb); r = uv_pipe_connect(req, pipe, TEST_PIPENAME, connect_cb);
r = uv_pipe_connect(req, TEST_PIPENAME);
ASSERT(r == 0); ASSERT(r == 0);
#ifdef _WIN32 #ifdef _WIN32
@ -308,7 +304,7 @@ static void connection_cb(uv_handle_t* s, int status) {
*/ */
typedef struct req_list_s { typedef struct req_list_s {
uv_req_t uv_req; union uv_any_req uv_req;
struct req_list_s* next; struct req_list_s* next;
} req_list_t; } req_list_t;

View File

@ -27,7 +27,7 @@
typedef struct { typedef struct {
uv_req_t req; uv_write_t req;
uv_buf_t buf; uv_buf_t buf;
} write_req_t; } write_req_t;
@ -51,7 +51,7 @@ static int server_closed;
static uv_tcp_t server; static uv_tcp_t server;
static void after_write(uv_req_t* req, int status); static void after_write(uv_write_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer); static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle); static void on_server_close(uv_handle_t* handle);
@ -67,7 +67,7 @@ unsigned char qrecord[] = {5, 'e', 'c', 'h', 'o', 's', 3, 's', 'r', 'v', 0, 0, 1
unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 }; unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 };
static void after_write(uv_req_t* req, int status) { static void after_write(uv_write_t* req, int status) {
write_req_t* wr; write_req_t* wr;
if (status) { if (status) {
@ -84,8 +84,8 @@ static void after_write(uv_req_t* req, int status) {
} }
static void after_shutdown(uv_req_t* req, int status) { static void after_shutdown(uv_shutdown_t* req, int status) {
uv_close(req->handle, on_close); uv_close((uv_handle_t*) req->handle, on_close);
free(req); free(req);
} }
@ -127,7 +127,6 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
int usingprev = 0; int usingprev = 0;
wr = (write_req_t*) malloc(sizeof *wr); wr = (write_req_t*) malloc(sizeof *wr);
uv_req_init(&wr->req, (uv_handle_t*)handle, after_write);
wr->buf.base = (char*)malloc(WRITE_BUF_LEN); wr->buf.base = (char*)malloc(WRITE_BUF_LEN);
wr->buf.len = 0; wr->buf.len = 0;
@ -197,7 +196,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
/* send write buffer */ /* send write buffer */
if (wr->buf.len > 0) { if (wr->buf.len > 0) {
if (uv_write(&wr->req, &wr->buf, 1)) { if (uv_write((uv_write_t*) &wr->req, handle, &wr->buf, 1, after_write)) {
FATAL("uv_write failed"); FATAL("uv_write failed");
} }
} }
@ -217,7 +216,7 @@ static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
} }
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
uv_req_t* req; uv_shutdown_t* req;
if (nread < 0) { if (nread < 0) {
/* Error or EOF */ /* Error or EOF */
@ -227,9 +226,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
free(buf.base); free(buf.base);
} }
req = (uv_req_t*) malloc(sizeof *req); req = malloc(sizeof *req);
uv_req_init(req, (uv_handle_t*)handle, after_shutdown); uv_shutdown(req, handle, after_shutdown);
uv_shutdown(req);
return; return;
} }