From abe0b1ea614cc2eead503d5ca88d43aeb561563b Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 13 Jul 2011 22:04:51 +0200 Subject: [PATCH] Better request API Instead of uv_shutdown, uv_write, uv_connect taking raw uv_req_t we subclass uv_req_t into uv_shutdown_t, uv_write_t, and uv_connect_t. uv_req_init is removed. --- include/uv-unix.h | 16 +++-- include/uv.h | 82 ++++++++++++++++------ src/uv-unix.c | 132 ++++++++++++++++++++---------------- test/echo-server.c | 20 +++--- test/test-callback-stack.c | 21 +++--- test/test-connection-fail.c | 14 ++-- test/test-delayed-accept.c | 7 +- test/test-ping-pong.c | 27 +++----- test/test-shutdown-eof.c | 17 +++-- test/test-tcp-writealot.c | 32 ++++----- 10 files changed, 204 insertions(+), 164 deletions(-) diff --git a/include/uv-unix.h b/include/uv-unix.h index d17ebc3b..50b29c7e 100644 --- a/include/uv-unix.h +++ b/include/uv-unix.h @@ -41,14 +41,20 @@ typedef struct { #define UV_REQ_BUFSML_SIZE (4) -#define UV_REQ_PRIVATE_FIELDS \ - int write_index; \ - ev_timer timer; \ +#define UV_REQ_PRIVATE_FIELDS /* empty */ + +#define UV_WRITE_PRIVATE_FIELDS \ ngx_queue_t queue; \ + int write_index; \ uv_buf_t* bufs; \ int bufcnt; \ uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; +#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */ + +#define UV_CONNECT_PRIVATE_FIELDS \ + ngx_queue_t queue; + /* TODO: union or classes please! */ #define UV_HANDLE_PRIVATE_FIELDS \ @@ -67,8 +73,8 @@ typedef struct { int delayed_error; \ uv_connection_cb connection_cb; \ int accepted_fd; \ - uv_req_t *connect_req; \ - uv_req_t *shutdown_req; \ + uv_connect_t *connect_req; \ + uv_shutdown_t *shutdown_req; \ ev_io read_watcher; \ ev_io write_watcher; \ ngx_queue_t write_queue; \ diff --git a/include/uv.h b/include/uv.h index d470a259..f3cd256a 100644 --- a/include/uv.h +++ b/include/uv.h @@ -48,10 +48,13 @@ typedef struct uv_timer_s uv_timer_t; typedef struct uv_prepare_s uv_prepare_t; typedef struct uv_check_s uv_check_t; typedef struct uv_idle_s uv_idle_t; -typedef struct uv_req_s uv_req_t; typedef struct uv_async_s uv_async_t; typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; - +/* Request types */ +typedef struct uv_req_s uv_req_t; +typedef struct uv_shutdown_s uv_shutdown_t; +typedef struct uv_write_s uv_write_t; +typedef struct uv_connect_s uv_connect_t; #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) # include "uv-unix.h" @@ -70,9 +73,9 @@ typedef struct uv_getaddrinfo_s uv_getaddrinfo_t; */ typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size); typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf); -typedef void (*uv_write_cb)(uv_req_t* req, int status); -typedef void (*uv_connect_cb)(uv_req_t* req, int status); -typedef void (*uv_shutdown_cb)(uv_req_t* req, int status); +typedef void (*uv_write_cb)(uv_write_t* req, int status); +typedef void (*uv_connect_cb)(uv_connect_t* req, int status); +typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); typedef void (*uv_connection_cb)(uv_handle_t* server, int status); typedef void (*uv_close_cb)(uv_handle_t* handle); typedef void (*uv_timer_cb)(uv_timer_t* handle, int status); @@ -168,23 +171,34 @@ struct uv_err_s { }; -struct uv_req_s { - /* read-only */ - uv_req_type type; - /* public */ - uv_handle_t* handle; - void *(*cb)(void *); - void* data; - /* private */ +#define UV_REQ_FIELDS \ + /* read-only */ \ + uv_req_type type; \ + /* public */ \ + void* data; \ + /* private */ \ UV_REQ_PRIVATE_FIELDS + +/* Abstract base class of all requests. */ +struct uv_req_s { + UV_REQ_FIELDS }; -/* - * Initialize a request for use with uv_write, uv_shutdown, or uv_connect. - */ -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)); -int uv_shutdown(uv_req_t* req); +/* + * Shutdown the outgoing (write) side of a duplex stream. It waits for + * pending write requests to complete. The handle should refer to a + * initialized stream. req should be an uninitalized shutdown request + * struct. The cb is a called after shutdown is complete. + */ +struct uv_shutdown_s { + UV_REQ_FIELDS + uv_stream_t* handle; + uv_shutdown_cb cb; + UV_SHUTDOWN_PRIVATE_FIELDS +}; + +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb); #define UV_HANDLE_FIELDS \ @@ -251,7 +265,8 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb); int uv_read_stop(uv_stream_t*); -/* Write data to stream. Buffers are written in order. Example: +/* + * Write data to stream. Buffers are written in order. Example: * * uv_buf_t a[] = { * { .base = "1", .len = 1 }, @@ -268,7 +283,15 @@ int uv_read_stop(uv_stream_t*); * uv_write(req, b, 2); * */ -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt); +struct uv_write_s { + UV_REQ_FIELDS + uv_write_cb cb; + uv_stream_t* handle; + UV_WRITE_PRIVATE_FIELDS +}; + +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb); /* @@ -287,8 +310,23 @@ int uv_tcp_init(uv_tcp_t* handle); int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in); int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6); -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in); -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6); +/* + * uv_tcp_connect, uv_tcp_connect6 + * These functions establish IPv4 and IPv6 TCP connections. Provide an + * initialized TCP handle and an uninitialized uv_connect_t*. The callback + * will be made when the connection is estabished. + */ +struct uv_connect_s { + UV_REQ_FIELDS + uv_connect_cb cb; + uv_tcp_t* handle; + UV_CONNECT_PRIVATE_FIELDS +}; + +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb); +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb); int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb); diff --git a/src/uv-unix.c b/src/uv-unix.c index ab8a5e6a..1dc20d84 100644 --- a/src/uv-unix.c +++ b/src/uv-unix.c @@ -59,6 +59,7 @@ struct uv_ares_data_s { static struct uv_ares_data_s ares_data; +void uv__req_init(uv_req_t*); void uv__tcp_io(EV_P_ ev_io* watcher, int revents); void uv__next(EV_P_ ev_idle* watcher, int revents); static void uv__tcp_connect(uv_tcp_t*); @@ -518,9 +519,9 @@ void uv__finish_close(uv_handle_t* handle) { } -uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { +uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) { ngx_queue_t* q; - uv_req_t* req; + uv_write_t* req; if (ngx_queue_empty(&tcp->write_queue)) { return NULL; @@ -531,7 +532,7 @@ uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) { return NULL; } - req = ngx_queue_data(q, struct uv_req_s, queue); + req = ngx_queue_data(q, struct uv_write_s, queue); assert(req); return req; @@ -552,8 +553,7 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) { static void uv__drain(uv_tcp_t* tcp) { - uv_req_t* req; - uv_shutdown_cb cb; + uv_shutdown_t* req; assert(!uv_write_queue_head(tcp)); assert(tcp->write_queue_size == 0); @@ -567,16 +567,19 @@ static void uv__drain(uv_tcp_t* tcp) { assert(tcp->shutdown_req); req = tcp->shutdown_req; - cb = (uv_shutdown_cb)req->cb; if (shutdown(tcp->fd, SHUT_WR)) { /* Error. Report it. User should call uv_close(). */ uv_err_new((uv_handle_t*)tcp, errno); - if (cb) cb(req, -1); + if (req->cb) { + req->cb(req, -1); + } } else { uv_err_new((uv_handle_t*)tcp, 0); uv_flag_set((uv_handle_t*)tcp, UV_SHUT); - if (cb) cb(req, 0); + if (req->cb) { + req->cb(req, 0); + } } } } @@ -585,8 +588,8 @@ static void uv__drain(uv_tcp_t* tcp) { /* On success returns NULL. On error returns a pointer to the write request * which had the error. */ -static uv_req_t* uv__write(uv_tcp_t* tcp) { - uv_req_t* req; +static uv_write_t* uv__write(uv_tcp_t* tcp) { + uv_write_t* req; struct iovec* iov; int iovcnt; ssize_t n; @@ -602,7 +605,7 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { return NULL; } - assert(req->handle == (uv_handle_t*)tcp); + assert(req->handle == (uv_stream_t*)tcp); /* Cast to iovec. We had to have our own uv_buf_t instead of iovec * because Windows's WSABUF is not an iovec. @@ -691,23 +694,20 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) { static void uv__write_callbacks(uv_tcp_t* tcp) { - uv_write_cb cb; int callbacks_made = 0; ngx_queue_t* q; - uv_req_t* req; + uv_write_t* req; while (!ngx_queue_empty(&tcp->write_completed_queue)) { /* Pop a req off write_completed_queue. */ q = ngx_queue_head(&tcp->write_completed_queue); assert(q); - req = ngx_queue_data(q, struct uv_req_s, queue); + req = ngx_queue_data(q, struct uv_write_s, queue); ngx_queue_remove(q); - cb = (uv_write_cb) req->cb; - /* NOTE: call callback AFTER freeing the request data. */ - if (cb) { - cb(req, 0); + if (req->cb) { + req->cb(req, 0); } callbacks_made++; @@ -772,10 +772,16 @@ void uv__read(uv_tcp_t* tcp) { } -int uv_shutdown(uv_req_t* req) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; +int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) { + uv_tcp_t* tcp = (uv_tcp_t*)handle; + assert(handle->type == UV_TCP && + "uv_shutdown (unix) only supports uv_tcp_t right now"); assert(tcp->fd >= 0); - assert(tcp->type == UV_TCP); + + /* Initialize request */ + uv__req_init((uv_req_t*)req); + req->handle = handle; + req->cb = cb; if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) || uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) || @@ -796,11 +802,11 @@ int uv_shutdown(uv_req_t* req) { void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { uv_tcp_t* tcp = watcher->data; + + assert(tcp->type == UV_TCP); assert(watcher == &tcp->read_watcher || watcher == &tcp->write_watcher); - assert(tcp->fd >= 0); - assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING)); if (tcp->connect_req) { @@ -811,13 +817,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { } if (revents & EV_WRITE) { - uv_req_t* req = uv__write(tcp); + uv_write_t* req = uv__write(tcp); if (req) { /* Error. Notify the user. */ - uv_write_cb cb = (uv_write_cb) req->cb; - - if (cb) { - cb(req, -1); + if (req->cb) { + req->cb(req, -1); } } else { uv__write_callbacks(tcp); @@ -834,13 +838,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) { */ static void uv__tcp_connect(uv_tcp_t* tcp) { int error; - uv_req_t* req; - uv_connect_cb connect_cb; + uv_connect_t* req = tcp->connect_req; socklen_t errorsize = sizeof(int); + assert(tcp->type == UV_TCP); assert(tcp->fd >= 0); - - req = tcp->connect_req; assert(req); if (tcp->delayed_error) { @@ -860,9 +862,8 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { /* Successful connection */ tcp->connect_req = NULL; - connect_cb = (uv_connect_cb) req->cb; - if (connect_cb) { - connect_cb(req, 0); + if (req->cb) { + req->cb(req, 0); } } else if (error == EINPROGRESS) { @@ -873,18 +874,15 @@ static void uv__tcp_connect(uv_tcp_t* tcp) { uv_err_new((uv_handle_t*)tcp, error); tcp->connect_req = NULL; - - connect_cb = (uv_connect_cb) req->cb; - if (connect_cb) { - connect_cb(req, -1); + if (req->cb) { + req->cb(req, -1); } } } -static int uv__connect(uv_req_t* req, struct sockaddr* addr, - socklen_t addrlen) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; +static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr, + socklen_t addrlen, uv_connect_cb cb) { int r; if (tcp->fd <= 0) { @@ -901,6 +899,9 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, } } + uv__req_init((uv_req_t*)req); + req->cb = cb; + req->handle = (uv_stream_t*)tcp; req->type = UV_CONNECT; ngx_queue_init(&req->queue); @@ -947,17 +948,21 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr, } -int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) { - assert(addr.sin_family == AF_INET); - return uv__connect(req, (struct sockaddr*) &addr, - sizeof(struct sockaddr_in)); +int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in address, uv_connect_cb cb) { + assert(handle->type == UV_TCP); + assert(address.sin_family == AF_INET); + return uv__connect(req, handle, (struct sockaddr*) &address, + sizeof(struct sockaddr_in), cb); } -int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) { - assert(addr.sin6_family == AF_INET6); - return uv__connect(req, (struct sockaddr*) &addr, - sizeof(struct sockaddr_in6)); +int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle, + struct sockaddr_in6 address, uv_connect_cb cb) { + assert(handle->type == UV_TCP); + assert(address.sin6_family == AF_INET6); + return uv__connect(req, handle, (struct sockaddr*) &address, + sizeof(struct sockaddr_in6), cb); } @@ -997,9 +1002,21 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) { /* The buffers to be written must remain valid until the callback is called. * This is not required for the uv_buf_t array. */ -int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { - uv_tcp_t* tcp = (uv_tcp_t*)req->handle; - int empty_queue = (tcp->write_queue_size == 0); +int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt, + uv_write_cb cb) { + int empty_queue; + uv_tcp_t* tcp = (uv_tcp_t*)handle; + + /* Initialize the req */ + uv__req_init((uv_req_t*) req); + req->cb = cb; + req->handle = handle; + ngx_queue_init(&req->queue); + + assert(handle->type == UV_TCP && + "uv_write (unix) does not yet support other types of streams"); + + empty_queue = (tcp->write_queue_size == 0); assert(tcp->fd >= 0); ngx_queue_init(&req->queue); @@ -1118,12 +1135,10 @@ int uv_read_stop(uv_stream_t* stream) { } -void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) { +void uv__req_init(uv_req_t* req) { uv_counters()->req_init++; req->type = UV_UNKNOWN_REQ; - req->cb = cb; - req->handle = handle; - ngx_queue_init(&req->queue); + req->data = NULL; } @@ -1634,6 +1649,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) { } -int uv_pipe_connect(uv_req_t* req, const char* name) { +int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle, + const char* name, uv_connect_cb cb) { assert(0 && "implement me"); } diff --git a/test/echo-server.c b/test/echo-server.c index 4dc0e20c..e107dc5b 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -25,7 +25,7 @@ #include typedef struct { - uv_req_t req; + uv_write_t req; uv_buf_t buf; } write_req_t; @@ -35,14 +35,14 @@ static uv_tcp_t tcpServer; static uv_pipe_t pipeServer; static uv_handle_t* server; -static void after_write(uv_req_t* req, int status); +static void after_write(uv_write_t* req, int status); static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf); static void on_close(uv_handle_t* peer); static void on_server_close(uv_handle_t* handle); static void on_connection(uv_handle_t*, int status); -static void after_write(uv_req_t* req, int status) { +static void after_write(uv_write_t* req, int status) { write_req_t* wr; if (status) { @@ -59,8 +59,8 @@ static void after_write(uv_req_t* req, int status) { } -static void after_shutdown(uv_req_t* req, int status) { - uv_close(req->handle, on_close); +static void after_shutdown(uv_shutdown_t* req, int status) { + uv_close((uv_handle_t*)req->handle, on_close); free(req); } @@ -68,7 +68,7 @@ static void after_shutdown(uv_req_t* req, int status) { static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { int i; write_req_t *wr; - uv_req_t* req; + uv_shutdown_t* req; if (nread < 0) { /* Error or EOF */ @@ -78,9 +78,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { free(buf.base); } - req = (uv_req_t*) malloc(sizeof *req); - uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown); - uv_shutdown(req); + req = (uv_shutdown_t*) malloc(sizeof *req); + uv_shutdown(req, handle, after_shutdown); return; } @@ -103,10 +102,9 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) { wr = (write_req_t*) malloc(sizeof *wr); - uv_req_init(&wr->req, (uv_handle_t*)handle, (void *(*)(void *))after_write); wr->buf.base = buf.base; wr->buf.len = nread; - if (uv_write(&wr->req, &wr->buf, 1)) { + if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) { FATAL("uv_write failed"); } } diff --git a/test/test-callback-stack.c b/test/test-callback-stack.c index 5b12c8b9..4162b221 100644 --- a/test/test-callback-stack.c +++ b/test/test-callback-stack.c @@ -32,7 +32,9 @@ static const char MESSAGE[] = "Failure is for the weak. Everyone dies alone."; static uv_tcp_t client; static uv_timer_t timer; -static uv_req_t connect_req, write_req, shutdown_req; +static uv_connect_t connect_req; +static uv_write_t write_req; +static uv_shutdown_t shutdown_req; static int nested = 0; static int close_cb_called = 0; @@ -59,7 +61,7 @@ static void close_cb(uv_handle_t* handle) { } -static void shutdown_cb(uv_req_t* req, int status) { +static void shutdown_cb(uv_shutdown_t* req, int status) { ASSERT(status == 0); ASSERT(nested == 0 && "shutdown_cb must be called from a fresh stack"); @@ -97,11 +99,10 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { /* from a fresh stack. */ if (bytes_received == sizeof MESSAGE) { nested++; - uv_req_init(&shutdown_req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb); puts("Shutdown"); - if (uv_shutdown(&shutdown_req)) { + if (uv_shutdown(&shutdown_req, (uv_stream_t*)tcp, shutdown_cb)) { FATAL("uv_shutdown failed"); } nested--; @@ -131,7 +132,7 @@ static void timer_cb(uv_timer_t* handle, int status) { } -static void write_cb(uv_req_t* req, int status) { +static void write_cb(uv_write_t* req, int status) { int r; ASSERT(status == 0); @@ -154,7 +155,7 @@ static void write_cb(uv_req_t* req, int status) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { uv_buf_t buf; puts("Connected. Write some data to echo server..."); @@ -167,9 +168,7 @@ static void connect_cb(uv_req_t* req, int status) { buf.base = (char*) &MESSAGE; buf.len = sizeof MESSAGE; - uv_req_init(&write_req, req->handle, (void *(*)(void *))write_cb); - - if (uv_write(&write_req, &buf, 1)) { + if (uv_write(&write_req, (uv_stream_t*)req->handle, &buf, 1, write_cb)) { FATAL("uv_write failed"); } @@ -191,10 +190,8 @@ TEST_IMPL(callback_stack) { puts("Connecting..."); nested++; - uv_req_init(&connect_req, (uv_handle_t*)&client, - (void *(*)(void *))connect_cb); - if (uv_tcp_connect(&connect_req, addr)) { + if (uv_tcp_connect(&connect_req, &client, addr, connect_cb)) { FATAL("uv_tcp_connect failed"); } nested--; diff --git a/test/test-connection-fail.c b/test/test-connection-fail.c index 9fc3f0ba..6c748d8d 100644 --- a/test/test-connection-fail.c +++ b/test/test-connection-fail.c @@ -27,7 +27,7 @@ static uv_tcp_t tcp; -static uv_req_t req; +static uv_connect_t req; static int connect_cb_calls; static int close_cb_calls; @@ -66,18 +66,18 @@ static void timer_cb(uv_timer_t* handle, int status) { } -static void on_connect_with_close(uv_req_t *req, int status) { - ASSERT(&tcp == (uv_tcp_t*) req->handle); +static void on_connect_with_close(uv_connect_t *req, int status) { + ASSERT(&tcp == req->handle); ASSERT(status == -1); ASSERT(uv_last_error().code == UV_ECONNREFUSED); connect_cb_calls++; ASSERT(close_cb_calls == 0); - uv_close(req->handle, on_close); + uv_close((uv_handle_t*)req->handle, on_close); } -static void on_connect_without_close(uv_req_t *req, int status) { +static void on_connect_without_close(uv_connect_t *req, int status) { ASSERT(status == -1); ASSERT(uv_last_error().code == UV_ECONNREFUSED); connect_cb_calls++; @@ -103,10 +103,8 @@ void connection_fail(uv_connect_cb connect_cb) { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&req, (uv_handle_t*)&tcp, (void *(*)(void *))connect_cb); - uv_tcp_bind(&tcp, client_addr); - r = uv_tcp_connect(&req, server_addr); + r = uv_tcp_connect(&req, &tcp, server_addr, connect_cb); ASSERT(!r); uv_run(); diff --git a/test/test-delayed-accept.c b/test/test-delayed-accept.c index 6f565184..30b63b9b 100644 --- a/test/test-delayed-accept.c +++ b/test/test-delayed-accept.c @@ -147,7 +147,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { int r; ASSERT(req != NULL); @@ -167,7 +167,7 @@ static void connect_cb(uv_req_t* req, int status) { static void client_connect() { struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); - uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req); + uv_connect_t* connect_req = malloc(sizeof *connect_req); int r; ASSERT(client != NULL); @@ -176,8 +176,7 @@ static void client_connect() { r = uv_tcp_init(client); ASSERT(r == 0); - uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(connect_req, addr); + r = uv_tcp_connect(connect_req, client, addr, connect_cb); ASSERT(r == 0); } diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index 81cd93e5..be3d97bd 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -43,8 +43,7 @@ typedef struct { uv_tcp_t tcp; uv_pipe_t pipe; }; - uv_req_t connect_req; - uv_req_t read_req; + uv_connect_t connect_req; char read_buffer[BUFSIZE]; } pinger_t; @@ -70,25 +69,22 @@ static void pinger_on_close(uv_handle_t* handle) { } -static void pinger_after_write(uv_req_t *req, int status) { +static void pinger_after_write(uv_write_t *req, int status) { ASSERT(status == 0); - free(req); } static void pinger_write_ping(pinger_t* pinger) { - uv_req_t *req; + uv_write_t *req; uv_buf_t buf; buf.base = (char*)&PING; buf.len = strlen(PING); - req = (uv_req_t*)malloc(sizeof(*req)); - uv_req_init(req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_after_write); + req = malloc(sizeof(uv_write_t)); - if (uv_write(req, &buf, 1)) { + if (uv_write(req, (uv_stream_t*)&pinger->tcp, &buf, 1, pinger_after_write)) { FATAL("uv_write failed"); } @@ -134,7 +130,7 @@ static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) { } -static void pinger_on_connect(uv_req_t *req, int status) { +static void pinger_on_connect(uv_connect_t *req, int status) { pinger_t *pinger = (pinger_t*)req->handle->data; ASSERT(status == 0); @@ -162,10 +158,8 @@ static void tcp_pinger_v6_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp), - (void *(*)(void *))pinger_on_connect); - - r = uv_tcp_connect6(&pinger->connect_req, server_addr); + r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr, + pinger_on_connect); ASSERT(!r); } @@ -209,10 +203,9 @@ static void pipe_pinger_new() { /* We are never doing multiple reads/connects at a time anyway. */ /* so these handles can be pre-initialized. */ - uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe), - (void *(*)(void *))pinger_on_connect); - r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME); + r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr, + pinger_on_connect); ASSERT(!r); } diff --git a/test/test-shutdown-eof.c b/test/test-shutdown-eof.c index 8a960c9e..67af4953 100644 --- a/test/test-shutdown-eof.c +++ b/test/test-shutdown-eof.c @@ -26,7 +26,9 @@ static uv_timer_t timer; static uv_tcp_t tcp; -static uv_req_t connect_req, write_req, shutdown_req; +static uv_connect_t connect_req; +static uv_write_t write_req; +static uv_shutdown_t shutdown_req; static uv_buf_t qbuf; static int got_q; static int got_eof; @@ -74,7 +76,7 @@ static void read_cb(uv_stream_t* t, ssize_t nread, uv_buf_t buf) { } -static void shutdown_cb(uv_req_t *req, int status) { +static void shutdown_cb(uv_shutdown_t *req, int status) { ASSERT(req == &shutdown_req); ASSERT(called_connect_cb == 1); @@ -87,7 +89,7 @@ static void shutdown_cb(uv_req_t *req, int status) { } -static void connect_cb(uv_req_t *req, int status) { +static void connect_cb(uv_connect_t *req, int status) { ASSERT(status == 0); ASSERT(req == &connect_req); @@ -98,12 +100,10 @@ static void connect_cb(uv_req_t *req, int status) { * Write the letter 'Q' to gracefully kill the echo-server. This will not * effect our connection. */ - uv_req_init(&write_req, (uv_handle_t*)&tcp, NULL); - uv_write(&write_req, &qbuf, 1); + uv_write(&write_req, (uv_stream_t*) &tcp, &qbuf, 1, NULL); /* Shutdown our end of the connection. */ - uv_req_init(&shutdown_req, (uv_handle_t*)&tcp, (void *(*)(void *))shutdown_cb); - uv_shutdown(&shutdown_req); + uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb); called_connect_cb++; ASSERT(called_shutdown_cb == 0); @@ -165,8 +165,7 @@ TEST_IMPL(shutdown_eof) { r = uv_tcp_init(&tcp); ASSERT(!r); - uv_req_init(&connect_req, (uv_handle_t*) &tcp, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(&connect_req, server_addr); + r = uv_tcp_connect(&connect_req, &tcp, server_addr, connect_cb); ASSERT(!r); uv_run(); diff --git a/test/test-tcp-writealot.c b/test/test-tcp-writealot.c index 4e305a9f..73cf45b1 100644 --- a/test/test-tcp-writealot.c +++ b/test/test-tcp-writealot.c @@ -62,7 +62,7 @@ static void close_cb(uv_handle_t* handle) { } -static void shutdown_cb(uv_req_t* req, int status) { +static void shutdown_cb(uv_shutdown_t* req, int status) { uv_tcp_t* tcp; ASSERT(req); @@ -104,7 +104,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) { } -static void write_cb(uv_req_t* req, int status) { +static void write_cb(uv_write_t* req, int status) { ASSERT(req != NULL); if (status) { @@ -120,9 +120,11 @@ static void write_cb(uv_req_t* req, int status) { } -static void connect_cb(uv_req_t* req, int status) { +static void connect_cb(uv_connect_t* req, int status) { uv_buf_t send_bufs[CHUNKS_PER_WRITE]; uv_tcp_t* tcp; + uv_write_t* write_req; + uv_shutdown_t* shutdown_req; int i, j, r; ASSERT(req != NULL); @@ -141,26 +143,21 @@ static void connect_cb(uv_req_t* req, int status) { bytes_sent += CHUNK_SIZE; } - req = (uv_req_t*)malloc(sizeof *req); - ASSERT(req != NULL); + write_req = malloc(sizeof(uv_write_t)); + ASSERT(write_req != NULL); - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))write_cb); - r = uv_write(req, (uv_buf_t*)&send_bufs, CHUNKS_PER_WRITE); + r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs, + CHUNKS_PER_WRITE, write_cb); ASSERT(r == 0); } /* Shutdown on drain. FIXME: dealloc req? */ - req = (uv_req_t*) malloc(sizeof(uv_req_t)); - ASSERT(req != NULL); - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb); - r = uv_shutdown(req); + shutdown_req = malloc(sizeof(uv_shutdown_t)); + ASSERT(shutdown_req != NULL); + r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb); ASSERT(r == 0); /* Start reading */ - req = (uv_req_t*)malloc(sizeof *req); - ASSERT(req != NULL); - - uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))read_cb); r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb); ASSERT(r == 0); } @@ -169,7 +166,7 @@ static void connect_cb(uv_req_t* req, int status) { TEST_IMPL(tcp_writealot) { struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT); uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client); - uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req); + uv_connect_t* connect_req = malloc(sizeof(uv_connect_t)); int r; ASSERT(client != NULL); @@ -184,8 +181,7 @@ TEST_IMPL(tcp_writealot) { r = uv_tcp_init(client); ASSERT(r == 0); - uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb); - r = uv_tcp_connect(connect_req, addr); + r = uv_tcp_connect(connect_req, client, addr, connect_cb); ASSERT(r == 0); uv_run();