From e7497227bd0d51b881b476c569f5c8422ebaecca Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Fri, 22 Jul 2011 15:32:27 -0700 Subject: [PATCH] merge uv_tcp_listen and uv_pipe_listen into uv_listen --- include/uv.h | 10 +++---- src/uv-unix.c | 31 ++++++++++++++----- src/win/internal.h | 2 ++ src/win/pipe.c | 4 +-- src/win/stream.c | 59 ++++++++++++++++++++++++------------- src/win/tcp.c | 4 +-- test/benchmark-pump.c | 14 ++++----- test/dns-server.c | 6 ++-- test/echo-server.c | 10 +++---- test/test-delayed-accept.c | 6 ++-- test/test-getsockname.c | 4 +-- test/test-pipe-bind-error.c | 6 ++-- test/test-tcp-bind-error.c | 6 ++-- test/test-tcp-bind6-error.c | 4 +-- 14 files changed, 100 insertions(+), 66 deletions(-) diff --git a/include/uv.h b/include/uv.h index 6f302dcd..c5a721c0 100644 --- a/include/uv.h +++ b/include/uv.h @@ -76,7 +76,7 @@ typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf); typedef void (*uv_write_cb)(uv_write_t* req, int status); typedef void (*uv_connect_cb)(uv_connect_t* req, int status); typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); -typedef void (*uv_connection_cb)(uv_handle_t* server, int status); +typedef void (*uv_connection_cb)(uv_stream_t* server, int status); typedef void (*uv_close_cb)(uv_handle_t* handle); typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); /* TODO: do these really need a status argument? */ @@ -244,6 +244,8 @@ struct uv_stream_s { UV_STREAM_FIELDS }; +int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb); + /* This call is used in conjunction with uv_listen() to accept incoming * connections. Call uv_accept after receiving a uv_connection_cb to accept * the connection. Before calling uv_accept use uv_*_init() must be @@ -254,7 +256,7 @@ struct uv_stream_s { * once, it may fail. It is suggested to only call uv_accept once per * uv_connection_cb call. */ -int uv_accept(uv_handle_t* server, uv_stream_t* client); +int uv_accept(uv_stream_t* server, uv_stream_t* client); /* Read data from an incoming stream. The callback will be made several * several times until there is no more data to read or uv_read_stop is @@ -332,8 +334,6 @@ int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, struct sockaddr_in6 address, uv_connect_cb cb); -int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); - int uv_getsockname(uv_tcp_t* handle, struct sockaddr* name, int* namelen); @@ -350,8 +350,6 @@ int uv_pipe_init(uv_pipe_t* handle); int uv_pipe_bind(uv_pipe_t* handle, const char* name); -int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb); - int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, const char* name, uv_connect_cb cb); diff --git a/src/uv-unix.c b/src/uv-unix.c index c09eef70..72f77df5 100644 --- a/src/uv-unix.c +++ b/src/uv-unix.c @@ -72,6 +72,8 @@ static int uv__stream_open(uv_stream_t*, int fd); static void uv__finish_close(uv_handle_t* handle); static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error); +static int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb); +static int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); static uv_write_t* uv__write(uv_stream_t* stream); static void uv__read(uv_stream_t* stream); static void uv__stream_connect(uv_stream_t*); @@ -432,12 +434,12 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { return; } else { uv_err_new((uv_handle_t*)stream, errno); - stream->connection_cb((uv_handle_t*)stream, -1); + stream->connection_cb((uv_stream_t*)stream, -1); } } else { stream->accepted_fd = fd; - stream->connection_cb((uv_handle_t*)stream, 0); + stream->connection_cb((uv_stream_t*)stream, 0); if (stream->accepted_fd >= 0) { /* The user hasn't yet accepted called uv_accept() */ ev_io_stop(EV_DEFAULT_ &stream->read_watcher); @@ -448,7 +450,7 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) { } -int uv_accept(uv_handle_t* server, uv_stream_t* client) { +int uv_accept(uv_stream_t* server, uv_stream_t* client) { uv_stream_t* streamServer; uv_stream_t* streamClient; int saved_errno; @@ -461,7 +463,7 @@ int uv_accept(uv_handle_t* server, uv_stream_t* client) { streamClient = (uv_stream_t*)client; if (streamServer->accepted_fd < 0) { - uv_err_new(server, EAGAIN); + uv_err_new((uv_handle_t*)server, EAGAIN); goto out; } @@ -482,7 +484,20 @@ out: } -int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { +int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { + switch (stream->type) { + case UV_TCP: + return uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); + case UV_NAMED_PIPE: + return uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); + default: + assert(0); + return -1; + } +} + + +static int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) { int r; int fd; @@ -1873,7 +1888,7 @@ out: } -int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { +static int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { int saved_errno; int status; @@ -1886,7 +1901,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { } assert(handle->fd >= 0); - if ((status = listen(handle->fd, SOMAXCONN)) == -1) { + if ((status = listen(handle->fd, backlog)) == -1) { uv_err_new((uv_handle_t*)handle, errno); } else { handle->connection_cb = cb; @@ -1987,7 +2002,7 @@ static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents) { } } else { pipe->accepted_fd = sockfd; - pipe->connection_cb((uv_handle_t*)pipe, 0); + pipe->connection_cb((uv_stream_t*)pipe, 0); if (pipe->accepted_fd == sockfd) { /* The user hasn't yet accepted called uv_accept() */ ev_io_stop(EV_DEFAULT_ &pipe->read_watcher); diff --git a/src/win/internal.h b/src/win/internal.h index 87b9c37b..85df9cb3 100644 --- a/src/win/internal.h +++ b/src/win/internal.h @@ -144,6 +144,7 @@ void uv_winsock_startup(); void uv_tcp_endgame(uv_tcp_t* handle); +int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client); int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); @@ -162,6 +163,7 @@ void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_connect_t* req); void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); void uv_pipe_endgame(uv_pipe_t* handle); +int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb); int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client); int uv_pipe_read_start(uv_pipe_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb); diff --git a/src/win/pipe.c b/src/win/pipe.c index fe47b393..0ed1fda0 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -416,7 +416,7 @@ int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { /* Starts listening for connections for the given pipe. */ -int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { +int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) { int i, errno; if (handle->flags & UV_HANDLE_BIND_ERROR) { @@ -678,7 +678,7 @@ void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* raw_req) { handle->pending_accepts = req; if (handle->connection_cb) { - handle->connection_cb((uv_handle_t*)handle, 0); + handle->connection_cb((uv_stream_t*)handle, 0); } } else { if (req->pipeHandle != INVALID_HANDLE_VALUE) { diff --git a/src/win/stream.c b/src/win/stream.c index acd67a0c..fcd88b8d 100644 --- a/src/win/stream.c +++ b/src/win/stream.c @@ -48,27 +48,44 @@ void uv_connection_init(uv_stream_t* handle) { } -int uv_accept(uv_handle_t* server, uv_stream_t* client) { +int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) { + switch (stream->type) { + case UV_TCP: + return uv_tcp_listen((uv_tcp_t*)stream, backlog, cb); + case UV_NAMED_PIPE: + return uv_pipe_listen((uv_pipe_t*)stream, backlog, cb); + default: + assert(0); + return -1; + } +} + + +int uv_accept(uv_stream_t* server, uv_stream_t* client) { assert(client->type == server->type); - if (server->type == UV_TCP) { - return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); - } else if (server->type == UV_NAMED_PIPE) { - return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + switch (server->type) { + case UV_TCP: + return uv_tcp_accept((uv_tcp_t*)server, (uv_tcp_t*)client); + case UV_NAMED_PIPE: + return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); + default: + assert(0); + return -1; } - - return -1; } int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) { - if (handle->type == UV_TCP) { - return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb); - } else if (handle->type == UV_NAMED_PIPE) { - return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb); + switch (handle->type) { + case UV_TCP: + return uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb); + case UV_NAMED_PIPE: + return uv_pipe_read_start((uv_pipe_t*)handle, alloc_cb, read_cb); + default: + assert(0); + return -1; } - - return -1; } @@ -81,14 +98,16 @@ int uv_read_stop(uv_stream_t* handle) { int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, uv_write_cb cb) { - if (handle->type == UV_TCP) { - return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb); - } else if (handle->type == UV_NAMED_PIPE) { - return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb); + switch (handle->type) { + case UV_TCP: + return uv_tcp_write(req, (uv_tcp_t*) handle, bufs, bufcnt, cb); + case UV_NAMED_PIPE: + return uv_pipe_write(req, (uv_pipe_t*) handle, bufs, bufcnt, cb); + default: + assert(0); + uv_set_sys_error(WSAEINVAL); + return -1; } - - uv_set_sys_error(WSAEINVAL); - return -1; } diff --git a/src/win/tcp.c b/src/win/tcp.c index ace5353d..c8d252c1 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -842,7 +842,7 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { handle->flags &= ~UV_HANDLE_LISTENING; if (handle->connection_cb) { LOOP->last_error = req->error; - handle->connection_cb((uv_handle_t*)handle, -1); + handle->connection_cb((uv_stream_t*)handle, -1); } } } else if (req->error.code == UV_OK && @@ -853,7 +853,7 @@ void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { sizeof(handle->socket)) == 0) { /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */ if (handle->connection_cb) { - handle->connection_cb((uv_handle_t*)handle, 0); + handle->connection_cb((uv_stream_t*)handle, 0); } } else { /* Error related to accepted socket is ignored because the server */ diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index 1732e84f..4e7f1d7a 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -47,7 +47,7 @@ static void buf_free(uv_buf_t uv_buf_t); static uv_tcp_t tcpServer; static uv_pipe_t pipeServer; -static uv_handle_t* server; +static uv_stream_t* server; static struct sockaddr_in listen_addr; static struct sockaddr_in connect_addr; @@ -143,7 +143,7 @@ void read_sockets_close_cb(uv_handle_t* handle) { */ if (uv_now() - start_time > 1000 && read_sockets == 0) { read_show_stats(); - uv_close(server, NULL); + uv_close((uv_handle_t*)server, NULL); } } @@ -266,7 +266,7 @@ static void maybe_connect_some() { } -static void connection_cb(uv_handle_t* s, int status) { +static void connection_cb(uv_stream_t* s, int status) { uv_stream_t* stream; int r; @@ -373,12 +373,12 @@ HELPER_IMPL(tcp_pump_server) { listen_addr = uv_ip4_addr("0.0.0.0", TEST_PORT); /* Server */ - server = (uv_handle_t*)&tcpServer; + server = (uv_stream_t*)&tcpServer; r = uv_tcp_init(&tcpServer); ASSERT(r == 0); r = uv_tcp_bind(&tcpServer, listen_addr); ASSERT(r == 0); - r = uv_tcp_listen(&tcpServer, MAX_WRITE_HANDLES, connection_cb); + r = uv_listen((uv_stream_t*)&tcpServer, MAX_WRITE_HANDLES, connection_cb); ASSERT(r == 0); uv_run(); @@ -394,12 +394,12 @@ HELPER_IMPL(pipe_pump_server) { uv_init(); /* Server */ - server = (uv_handle_t*)&pipeServer; + server = (uv_stream_t*)&pipeServer; r = uv_pipe_init(&pipeServer); ASSERT(r == 0); r = uv_pipe_bind(&pipeServer, TEST_PIPENAME); ASSERT(r == 0); - r = uv_pipe_listen(&pipeServer, connection_cb); + r = uv_listen((uv_stream_t*)&pipeServer, MAX_WRITE_HANDLES, connection_cb); ASSERT(r == 0); uv_run(); diff --git a/test/dns-server.c b/test/dns-server.c index 1c6b78fd..f1593528 100644 --- a/test/dns-server.c +++ b/test/dns-server.c @@ -55,7 +55,7 @@ static void after_write(uv_write_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void on_close(uv_handle_t* peer); static void on_server_close(uv_handle_t* handle); -static void on_connection(uv_handle_t*, int status); +static void on_connection(uv_stream_t*, int status); #define WRITE_BUF_LEN (64*1024) #define DNSREC_LEN (4) @@ -255,7 +255,7 @@ static uv_buf_t buf_alloc(uv_stream_t* handle, size_t suggested_size) { } -static void on_connection(uv_handle_t* server, int status) { +static void on_connection(uv_stream_t* server, int status) { dnshandle* handle; int r; @@ -302,7 +302,7 @@ static int dns_start(int port) { return 1; } - r = uv_tcp_listen(&server, 128, on_connection); + r = uv_listen((uv_stream_t*)&server, 128, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error\n"); diff --git a/test/echo-server.c b/test/echo-server.c index 992c88bb..add23839 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -39,7 +39,7 @@ static void after_write(uv_write_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void on_close(uv_handle_t* peer); static void on_server_close(uv_handle_t* handle); -static void on_connection(uv_handle_t*, int status); +static void on_connection(uv_stream_t*, int status); static void after_write(uv_write_t* req, int status) { @@ -123,7 +123,7 @@ static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) { } -static void on_connection(uv_handle_t* server, int status) { +static void on_connection(uv_stream_t* server, int status) { uv_stream_t* stream; int r; @@ -187,7 +187,7 @@ static int tcp4_echo_start(int port) { return 1; } - r = uv_tcp_listen(&tcpServer, 128, on_connection); + r = uv_listen((uv_stream_t*)&tcpServer, 128, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error\n"); @@ -220,7 +220,7 @@ static int tcp6_echo_start(int port) { return 0; } - r = uv_tcp_listen(&tcpServer, 128, on_connection); + r = uv_listen((uv_stream_t*)&tcpServer, 128, on_connection); if (r) { /* TODO: Error codes */ fprintf(stderr, "Listen error\n"); @@ -249,7 +249,7 @@ static int pipe_echo_start(char* pipeName) { return 1; } - r = uv_pipe_listen(&pipeServer, on_connection); + r = uv_listen((uv_stream_t*)&pipeServer, SOMAXCONN, on_connection); if (r) { fprintf(stderr, "uv_pipe_listen: %s\n", uv_strerror(uv_last_error())); return 1; diff --git a/test/test-delayed-accept.c b/test/test-delayed-accept.c index 8e3dfc14..10f041b2 100644 --- a/test/test-delayed-accept.c +++ b/test/test-delayed-accept.c @@ -63,7 +63,7 @@ static void do_accept(uv_timer_t* timer_handle, int status) { tcpcnt = uv_counters()->tcp_init; server = (uv_tcp_t*)timer_handle->data; - r = uv_accept((uv_handle_t*)server, (uv_stream_t*)accepted_handle); + r = uv_accept((uv_stream_t*)server, (uv_stream_t*)accepted_handle); ASSERT(r == 0); ASSERT(uv_counters()->tcp_init == tcpcnt); @@ -83,7 +83,7 @@ static void do_accept(uv_timer_t* timer_handle, int status) { } -static void connection_cb(uv_handle_t* tcp, int status) { +static void connection_cb(uv_stream_t* tcp, int status) { int r; uv_timer_t* timer_handle; @@ -120,7 +120,7 @@ static void start_server() { r = uv_tcp_bind(server, addr); ASSERT(r == 0); - r = uv_tcp_listen(server, 128, connection_cb); + r = uv_listen((uv_stream_t*)server, 128, connection_cb); ASSERT(r == 0); } diff --git a/test/test-getsockname.c b/test/test-getsockname.c index 6e4f6436..8b783ec5 100644 --- a/test/test-getsockname.c +++ b/test/test-getsockname.c @@ -67,7 +67,7 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { } -static void on_connection(uv_handle_t* server, int status) { +static void on_connection(uv_stream_t* server, int status) { struct sockaddr sockname; int namelen = sizeof(sockname); uv_handle_t* handle; @@ -141,7 +141,7 @@ static int tcp_listener(int port) { return 1; } - r = uv_tcp_listen(&tcpServer, 128, on_connection); + r = uv_listen((uv_stream_t*)&tcpServer, 128, on_connection); if (r) { fprintf(stderr, "Listen error\n"); return 1; diff --git a/test/test-pipe-bind-error.c b/test/test-pipe-bind-error.c index 6faccbe2..69aaaa20 100644 --- a/test/test-pipe-bind-error.c +++ b/test/test-pipe-bind-error.c @@ -59,9 +59,9 @@ TEST_IMPL(pipe_bind_error_addrinuse) { ASSERT(uv_last_error().code == UV_EADDRINUSE); - r = uv_pipe_listen(&server1, NULL); + r = uv_listen((uv_stream_t*)&server1, SOMAXCONN, NULL); ASSERT(r == 0); - r = uv_pipe_listen(&server2, NULL); + r = uv_listen((uv_stream_t*)&server2, SOMAXCONN, NULL); ASSERT(r == -1); ASSERT(uv_last_error().code == UV_EADDRINUSE); @@ -133,7 +133,7 @@ TEST_IMPL(pipe_listen_without_bind) { r = uv_pipe_init(&server); ASSERT(r == 0); - r = uv_pipe_listen(&server, NULL); + r = uv_listen((uv_stream_t*)&server, SOMAXCONN, NULL); ASSERT(r == -1); ASSERT(uv_last_error().code == UV_ENOTCONN); diff --git a/test/test-tcp-bind-error.c b/test/test-tcp-bind-error.c index 9034438d..b466ef8b 100644 --- a/test/test-tcp-bind-error.c +++ b/test/test-tcp-bind-error.c @@ -51,9 +51,9 @@ TEST_IMPL(tcp_bind_error_addrinuse) { r = uv_tcp_bind(&server2, addr); ASSERT(r == 0); - r = uv_tcp_listen(&server1, 128, NULL); + r = uv_listen((uv_stream_t*)&server1, 128, NULL); ASSERT(r == 0); - r = uv_tcp_listen(&server2, 128, NULL); + r = uv_listen((uv_stream_t*)&server2, 128, NULL); ASSERT(r == -1); ASSERT(uv_last_error().code == UV_EADDRINUSE); @@ -197,7 +197,7 @@ TEST_IMPL(tcp_listen_without_bind) { uv_init(); r = uv_tcp_init(&server); ASSERT(r == 0); - r = uv_tcp_listen(&server, 128, NULL); + r = uv_listen((uv_stream_t*)&server, 128, NULL); ASSERT(r == 0); return 0; diff --git a/test/test-tcp-bind6-error.c b/test/test-tcp-bind6-error.c index d2127858..cf283fa0 100644 --- a/test/test-tcp-bind6-error.c +++ b/test/test-tcp-bind6-error.c @@ -51,9 +51,9 @@ TEST_IMPL(tcp_bind6_error_addrinuse) { r = uv_tcp_bind6(&server2, addr); ASSERT(r == 0); - r = uv_tcp_listen(&server1, 128, NULL); + r = uv_listen((uv_stream_t*)&server1, 128, NULL); ASSERT(r == 0); - r = uv_tcp_listen(&server2, 128, NULL); + r = uv_listen((uv_stream_t*)&server2, 128, NULL); ASSERT(r == -1); ASSERT(uv_last_error().code == UV_EADDRINUSE);