Better request API

Instead of uv_shutdown, uv_write, uv_connect taking raw uv_req_t we subclass
uv_req_t into uv_shutdown_t, uv_write_t, and uv_connect_t.

uv_req_init is removed.
This commit is contained in:
Ryan Dahl 2011-07-13 22:04:51 +02:00
parent f5ff869488
commit abe0b1ea61
10 changed files with 204 additions and 164 deletions

View File

@ -41,14 +41,20 @@ typedef struct {
#define UV_REQ_BUFSML_SIZE (4)
#define UV_REQ_PRIVATE_FIELDS \
int write_index; \
ev_timer timer; \
#define UV_REQ_PRIVATE_FIELDS /* empty */
#define UV_WRITE_PRIVATE_FIELDS \
ngx_queue_t queue; \
int write_index; \
uv_buf_t* bufs; \
int bufcnt; \
uv_buf_t bufsml[UV_REQ_BUFSML_SIZE];
#define UV_SHUTDOWN_PRIVATE_FIELDS /* empty */
#define UV_CONNECT_PRIVATE_FIELDS \
ngx_queue_t queue;
/* TODO: union or classes please! */
#define UV_HANDLE_PRIVATE_FIELDS \
@ -67,8 +73,8 @@ typedef struct {
int delayed_error; \
uv_connection_cb connection_cb; \
int accepted_fd; \
uv_req_t *connect_req; \
uv_req_t *shutdown_req; \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
ev_io read_watcher; \
ev_io write_watcher; \
ngx_queue_t write_queue; \

View File

@ -48,10 +48,13 @@ typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_req_s uv_req_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
/* Request types */
typedef struct uv_req_s uv_req_t;
typedef struct uv_shutdown_s uv_shutdown_t;
typedef struct uv_write_s uv_write_t;
typedef struct uv_connect_s uv_connect_t;
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "uv-unix.h"
@ -70,9 +73,9 @@ typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
*/
typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size);
typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf);
typedef void (*uv_write_cb)(uv_req_t* req, int status);
typedef void (*uv_connect_cb)(uv_req_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_req_t* req, int status);
typedef void (*uv_write_cb)(uv_write_t* req, int status);
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
typedef void (*uv_connection_cb)(uv_handle_t* server, int status);
typedef void (*uv_close_cb)(uv_handle_t* handle);
typedef void (*uv_timer_cb)(uv_timer_t* handle, int status);
@ -168,23 +171,34 @@ struct uv_err_s {
};
struct uv_req_s {
/* read-only */
uv_req_type type;
/* public */
uv_handle_t* handle;
void *(*cb)(void *);
void* data;
/* private */
#define UV_REQ_FIELDS \
/* read-only */ \
uv_req_type type; \
/* public */ \
void* data; \
/* private */ \
UV_REQ_PRIVATE_FIELDS
/* Abstract base class of all requests. */
struct uv_req_s {
UV_REQ_FIELDS
};
/*
* Initialize a request for use with uv_write, uv_shutdown, or uv_connect.
*/
void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *));
int uv_shutdown(uv_req_t* req);
/*
* Shutdown the outgoing (write) side of a duplex stream. It waits for
* pending write requests to complete. The handle should refer to a
* initialized stream. req should be an uninitalized shutdown request
* struct. The cb is a called after shutdown is complete.
*/
struct uv_shutdown_s {
UV_REQ_FIELDS
uv_stream_t* handle;
uv_shutdown_cb cb;
UV_SHUTDOWN_PRIVATE_FIELDS
};
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb);
#define UV_HANDLE_FIELDS \
@ -251,7 +265,8 @@ int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_stop(uv_stream_t*);
/* Write data to stream. Buffers are written in order. Example:
/*
* Write data to stream. Buffers are written in order. Example:
*
* uv_buf_t a[] = {
* { .base = "1", .len = 1 },
@ -268,7 +283,15 @@ int uv_read_stop(uv_stream_t*);
* uv_write(req, b, 2);
*
*/
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt);
struct uv_write_s {
UV_REQ_FIELDS
uv_write_cb cb;
uv_stream_t* handle;
UV_WRITE_PRIVATE_FIELDS
};
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_write_cb);
/*
@ -287,8 +310,23 @@ int uv_tcp_init(uv_tcp_t* handle);
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in);
int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6);
/*
* uv_tcp_connect, uv_tcp_connect6
* These functions establish IPv4 and IPv6 TCP connections. Provide an
* initialized TCP handle and an uninitialized uv_connect_t*. The callback
* will be made when the connection is estabished.
*/
struct uv_connect_s {
UV_REQ_FIELDS
uv_connect_cb cb;
uv_tcp_t* handle;
UV_CONNECT_PRIVATE_FIELDS
};
int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
struct sockaddr_in address, uv_connect_cb cb);
int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
struct sockaddr_in6 address, uv_connect_cb cb);
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);

View File

@ -59,6 +59,7 @@ struct uv_ares_data_s {
static struct uv_ares_data_s ares_data;
void uv__req_init(uv_req_t*);
void uv__tcp_io(EV_P_ ev_io* watcher, int revents);
void uv__next(EV_P_ ev_idle* watcher, int revents);
static void uv__tcp_connect(uv_tcp_t*);
@ -518,9 +519,9 @@ void uv__finish_close(uv_handle_t* handle) {
}
uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) {
uv_write_t* uv_write_queue_head(uv_tcp_t* tcp) {
ngx_queue_t* q;
uv_req_t* req;
uv_write_t* req;
if (ngx_queue_empty(&tcp->write_queue)) {
return NULL;
@ -531,7 +532,7 @@ uv_req_t* uv_write_queue_head(uv_tcp_t* tcp) {
return NULL;
}
req = ngx_queue_data(q, struct uv_req_s, queue);
req = ngx_queue_data(q, struct uv_write_s, queue);
assert(req);
return req;
@ -552,8 +553,7 @@ void uv__next(EV_P_ ev_idle* watcher, int revents) {
static void uv__drain(uv_tcp_t* tcp) {
uv_req_t* req;
uv_shutdown_cb cb;
uv_shutdown_t* req;
assert(!uv_write_queue_head(tcp));
assert(tcp->write_queue_size == 0);
@ -567,16 +567,19 @@ static void uv__drain(uv_tcp_t* tcp) {
assert(tcp->shutdown_req);
req = tcp->shutdown_req;
cb = (uv_shutdown_cb)req->cb;
if (shutdown(tcp->fd, SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
uv_err_new((uv_handle_t*)tcp, errno);
if (cb) cb(req, -1);
if (req->cb) {
req->cb(req, -1);
}
} else {
uv_err_new((uv_handle_t*)tcp, 0);
uv_flag_set((uv_handle_t*)tcp, UV_SHUT);
if (cb) cb(req, 0);
if (req->cb) {
req->cb(req, 0);
}
}
}
}
@ -585,8 +588,8 @@ static void uv__drain(uv_tcp_t* tcp) {
/* On success returns NULL. On error returns a pointer to the write request
* which had the error.
*/
static uv_req_t* uv__write(uv_tcp_t* tcp) {
uv_req_t* req;
static uv_write_t* uv__write(uv_tcp_t* tcp) {
uv_write_t* req;
struct iovec* iov;
int iovcnt;
ssize_t n;
@ -602,7 +605,7 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) {
return NULL;
}
assert(req->handle == (uv_handle_t*)tcp);
assert(req->handle == (uv_stream_t*)tcp);
/* Cast to iovec. We had to have our own uv_buf_t instead of iovec
* because Windows's WSABUF is not an iovec.
@ -691,23 +694,20 @@ static uv_req_t* uv__write(uv_tcp_t* tcp) {
static void uv__write_callbacks(uv_tcp_t* tcp) {
uv_write_cb cb;
int callbacks_made = 0;
ngx_queue_t* q;
uv_req_t* req;
uv_write_t* req;
while (!ngx_queue_empty(&tcp->write_completed_queue)) {
/* Pop a req off write_completed_queue. */
q = ngx_queue_head(&tcp->write_completed_queue);
assert(q);
req = ngx_queue_data(q, struct uv_req_s, queue);
req = ngx_queue_data(q, struct uv_write_s, queue);
ngx_queue_remove(q);
cb = (uv_write_cb) req->cb;
/* NOTE: call callback AFTER freeing the request data. */
if (cb) {
cb(req, 0);
if (req->cb) {
req->cb(req, 0);
}
callbacks_made++;
@ -772,10 +772,16 @@ void uv__read(uv_tcp_t* tcp) {
}
int uv_shutdown(uv_req_t* req) {
uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* handle, uv_shutdown_cb cb) {
uv_tcp_t* tcp = (uv_tcp_t*)handle;
assert(handle->type == UV_TCP &&
"uv_shutdown (unix) only supports uv_tcp_t right now");
assert(tcp->fd >= 0);
assert(tcp->type == UV_TCP);
/* Initialize request */
uv__req_init((uv_req_t*)req);
req->handle = handle;
req->cb = cb;
if (uv_flag_is_set((uv_handle_t*)tcp, UV_SHUT) ||
uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSED) ||
@ -796,11 +802,11 @@ int uv_shutdown(uv_req_t* req) {
void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
uv_tcp_t* tcp = watcher->data;
assert(tcp->type == UV_TCP);
assert(watcher == &tcp->read_watcher ||
watcher == &tcp->write_watcher);
assert(tcp->fd >= 0);
assert(!uv_flag_is_set((uv_handle_t*)tcp, UV_CLOSING));
if (tcp->connect_req) {
@ -811,13 +817,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
}
if (revents & EV_WRITE) {
uv_req_t* req = uv__write(tcp);
uv_write_t* req = uv__write(tcp);
if (req) {
/* Error. Notify the user. */
uv_write_cb cb = (uv_write_cb) req->cb;
if (cb) {
cb(req, -1);
if (req->cb) {
req->cb(req, -1);
}
} else {
uv__write_callbacks(tcp);
@ -834,13 +838,11 @@ void uv__tcp_io(EV_P_ ev_io* watcher, int revents) {
*/
static void uv__tcp_connect(uv_tcp_t* tcp) {
int error;
uv_req_t* req;
uv_connect_cb connect_cb;
uv_connect_t* req = tcp->connect_req;
socklen_t errorsize = sizeof(int);
assert(tcp->type == UV_TCP);
assert(tcp->fd >= 0);
req = tcp->connect_req;
assert(req);
if (tcp->delayed_error) {
@ -860,9 +862,8 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
/* Successful connection */
tcp->connect_req = NULL;
connect_cb = (uv_connect_cb) req->cb;
if (connect_cb) {
connect_cb(req, 0);
if (req->cb) {
req->cb(req, 0);
}
} else if (error == EINPROGRESS) {
@ -873,18 +874,15 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
uv_err_new((uv_handle_t*)tcp, error);
tcp->connect_req = NULL;
connect_cb = (uv_connect_cb) req->cb;
if (connect_cb) {
connect_cb(req, -1);
if (req->cb) {
req->cb(req, -1);
}
}
}
static int uv__connect(uv_req_t* req, struct sockaddr* addr,
socklen_t addrlen) {
uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
static int uv__connect(uv_connect_t* req, uv_tcp_t* tcp, struct sockaddr* addr,
socklen_t addrlen, uv_connect_cb cb) {
int r;
if (tcp->fd <= 0) {
@ -901,6 +899,9 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr,
}
}
uv__req_init((uv_req_t*)req);
req->cb = cb;
req->handle = (uv_stream_t*)tcp;
req->type = UV_CONNECT;
ngx_queue_init(&req->queue);
@ -947,17 +948,21 @@ static int uv__connect(uv_req_t* req, struct sockaddr* addr,
}
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
assert(addr.sin_family == AF_INET);
return uv__connect(req, (struct sockaddr*) &addr,
sizeof(struct sockaddr_in));
int uv_tcp_connect(uv_connect_t* req, uv_tcp_t* handle,
struct sockaddr_in address, uv_connect_cb cb) {
assert(handle->type == UV_TCP);
assert(address.sin_family == AF_INET);
return uv__connect(req, handle, (struct sockaddr*) &address,
sizeof(struct sockaddr_in), cb);
}
int uv_tcp_connect6(uv_req_t* req, struct sockaddr_in6 addr) {
assert(addr.sin6_family == AF_INET6);
return uv__connect(req, (struct sockaddr*) &addr,
sizeof(struct sockaddr_in6));
int uv_tcp_connect6(uv_connect_t* req, uv_tcp_t* handle,
struct sockaddr_in6 address, uv_connect_cb cb) {
assert(handle->type == UV_TCP);
assert(address.sin6_family == AF_INET6);
return uv__connect(req, handle, (struct sockaddr*) &address,
sizeof(struct sockaddr_in6), cb);
}
@ -997,9 +1002,21 @@ static size_t uv__buf_count(uv_buf_t bufs[], int bufcnt) {
/* The buffers to be written must remain valid until the callback is called.
* This is not required for the uv_buf_t array.
*/
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
int empty_queue = (tcp->write_queue_size == 0);
int uv_write(uv_write_t* req, uv_stream_t* handle, uv_buf_t bufs[], int bufcnt,
uv_write_cb cb) {
int empty_queue;
uv_tcp_t* tcp = (uv_tcp_t*)handle;
/* Initialize the req */
uv__req_init((uv_req_t*) req);
req->cb = cb;
req->handle = handle;
ngx_queue_init(&req->queue);
assert(handle->type == UV_TCP &&
"uv_write (unix) does not yet support other types of streams");
empty_queue = (tcp->write_queue_size == 0);
assert(tcp->fd >= 0);
ngx_queue_init(&req->queue);
@ -1118,12 +1135,10 @@ int uv_read_stop(uv_stream_t* stream) {
}
void uv_req_init(uv_req_t* req, uv_handle_t* handle, void *(*cb)(void *)) {
void uv__req_init(uv_req_t* req) {
uv_counters()->req_init++;
req->type = UV_UNKNOWN_REQ;
req->cb = cb;
req->handle = handle;
ngx_queue_init(&req->queue);
req->data = NULL;
}
@ -1634,6 +1649,7 @@ int uv_pipe_listen(uv_pipe_t* handle, uv_connection_cb cb) {
}
int uv_pipe_connect(uv_req_t* req, const char* name) {
int uv_pipe_connect(uv_connect_t* req, uv_pipe_t* handle,
const char* name, uv_connect_cb cb) {
assert(0 && "implement me");
}

View File

@ -25,7 +25,7 @@
#include <stdlib.h>
typedef struct {
uv_req_t req;
uv_write_t req;
uv_buf_t buf;
} write_req_t;
@ -35,14 +35,14 @@ static uv_tcp_t tcpServer;
static uv_pipe_t pipeServer;
static uv_handle_t* server;
static void after_write(uv_req_t* req, int status);
static void after_write(uv_write_t* req, int status);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_handle_t*, int status);
static void after_write(uv_req_t* req, int status) {
static void after_write(uv_write_t* req, int status) {
write_req_t* wr;
if (status) {
@ -59,8 +59,8 @@ static void after_write(uv_req_t* req, int status) {
}
static void after_shutdown(uv_req_t* req, int status) {
uv_close(req->handle, on_close);
static void after_shutdown(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, on_close);
free(req);
}
@ -68,7 +68,7 @@ static void after_shutdown(uv_req_t* req, int status) {
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
int i;
write_req_t *wr;
uv_req_t* req;
uv_shutdown_t* req;
if (nread < 0) {
/* Error or EOF */
@ -78,9 +78,8 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
free(buf.base);
}
req = (uv_req_t*) malloc(sizeof *req);
uv_req_init(req, (uv_handle_t*)handle, (void *(*)(void *))after_shutdown);
uv_shutdown(req);
req = (uv_shutdown_t*) malloc(sizeof *req);
uv_shutdown(req, handle, after_shutdown);
return;
}
@ -103,10 +102,9 @@ static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
wr = (write_req_t*) malloc(sizeof *wr);
uv_req_init(&wr->req, (uv_handle_t*)handle, (void *(*)(void *))after_write);
wr->buf.base = buf.base;
wr->buf.len = nread;
if (uv_write(&wr->req, &wr->buf, 1)) {
if (uv_write(&wr->req, handle, &wr->buf, 1, after_write)) {
FATAL("uv_write failed");
}
}

View File

@ -32,7 +32,9 @@ static const char MESSAGE[] = "Failure is for the weak. Everyone dies alone.";
static uv_tcp_t client;
static uv_timer_t timer;
static uv_req_t connect_req, write_req, shutdown_req;
static uv_connect_t connect_req;
static uv_write_t write_req;
static uv_shutdown_t shutdown_req;
static int nested = 0;
static int close_cb_called = 0;
@ -59,7 +61,7 @@ static void close_cb(uv_handle_t* handle) {
}
static void shutdown_cb(uv_req_t* req, int status) {
static void shutdown_cb(uv_shutdown_t* req, int status) {
ASSERT(status == 0);
ASSERT(nested == 0 && "shutdown_cb must be called from a fresh stack");
@ -97,11 +99,10 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
/* from a fresh stack. */
if (bytes_received == sizeof MESSAGE) {
nested++;
uv_req_init(&shutdown_req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
puts("Shutdown");
if (uv_shutdown(&shutdown_req)) {
if (uv_shutdown(&shutdown_req, (uv_stream_t*)tcp, shutdown_cb)) {
FATAL("uv_shutdown failed");
}
nested--;
@ -131,7 +132,7 @@ static void timer_cb(uv_timer_t* handle, int status) {
}
static void write_cb(uv_req_t* req, int status) {
static void write_cb(uv_write_t* req, int status) {
int r;
ASSERT(status == 0);
@ -154,7 +155,7 @@ static void write_cb(uv_req_t* req, int status) {
}
static void connect_cb(uv_req_t* req, int status) {
static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t buf;
puts("Connected. Write some data to echo server...");
@ -167,9 +168,7 @@ static void connect_cb(uv_req_t* req, int status) {
buf.base = (char*) &MESSAGE;
buf.len = sizeof MESSAGE;
uv_req_init(&write_req, req->handle, (void *(*)(void *))write_cb);
if (uv_write(&write_req, &buf, 1)) {
if (uv_write(&write_req, (uv_stream_t*)req->handle, &buf, 1, write_cb)) {
FATAL("uv_write failed");
}
@ -191,10 +190,8 @@ TEST_IMPL(callback_stack) {
puts("Connecting...");
nested++;
uv_req_init(&connect_req, (uv_handle_t*)&client,
(void *(*)(void *))connect_cb);
if (uv_tcp_connect(&connect_req, addr)) {
if (uv_tcp_connect(&connect_req, &client, addr, connect_cb)) {
FATAL("uv_tcp_connect failed");
}
nested--;

View File

@ -27,7 +27,7 @@
static uv_tcp_t tcp;
static uv_req_t req;
static uv_connect_t req;
static int connect_cb_calls;
static int close_cb_calls;
@ -66,18 +66,18 @@ static void timer_cb(uv_timer_t* handle, int status) {
}
static void on_connect_with_close(uv_req_t *req, int status) {
ASSERT(&tcp == (uv_tcp_t*) req->handle);
static void on_connect_with_close(uv_connect_t *req, int status) {
ASSERT(&tcp == req->handle);
ASSERT(status == -1);
ASSERT(uv_last_error().code == UV_ECONNREFUSED);
connect_cb_calls++;
ASSERT(close_cb_calls == 0);
uv_close(req->handle, on_close);
uv_close((uv_handle_t*)req->handle, on_close);
}
static void on_connect_without_close(uv_req_t *req, int status) {
static void on_connect_without_close(uv_connect_t *req, int status) {
ASSERT(status == -1);
ASSERT(uv_last_error().code == UV_ECONNREFUSED);
connect_cb_calls++;
@ -103,10 +103,8 @@ void connection_fail(uv_connect_cb connect_cb) {
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&req, (uv_handle_t*)&tcp, (void *(*)(void *))connect_cb);
uv_tcp_bind(&tcp, client_addr);
r = uv_tcp_connect(&req, server_addr);
r = uv_tcp_connect(&req, &tcp, server_addr, connect_cb);
ASSERT(!r);
uv_run();

View File

@ -147,7 +147,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
}
static void connect_cb(uv_req_t* req, int status) {
static void connect_cb(uv_connect_t* req, int status) {
int r;
ASSERT(req != NULL);
@ -167,7 +167,7 @@ static void connect_cb(uv_req_t* req, int status) {
static void client_connect() {
struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
uv_connect_t* connect_req = malloc(sizeof *connect_req);
int r;
ASSERT(client != NULL);
@ -176,8 +176,7 @@ static void client_connect() {
r = uv_tcp_init(client);
ASSERT(r == 0);
uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
r = uv_tcp_connect(connect_req, addr);
r = uv_tcp_connect(connect_req, client, addr, connect_cb);
ASSERT(r == 0);
}

View File

@ -43,8 +43,7 @@ typedef struct {
uv_tcp_t tcp;
uv_pipe_t pipe;
};
uv_req_t connect_req;
uv_req_t read_req;
uv_connect_t connect_req;
char read_buffer[BUFSIZE];
} pinger_t;
@ -70,25 +69,22 @@ static void pinger_on_close(uv_handle_t* handle) {
}
static void pinger_after_write(uv_req_t *req, int status) {
static void pinger_after_write(uv_write_t *req, int status) {
ASSERT(status == 0);
free(req);
}
static void pinger_write_ping(pinger_t* pinger) {
uv_req_t *req;
uv_write_t *req;
uv_buf_t buf;
buf.base = (char*)&PING;
buf.len = strlen(PING);
req = (uv_req_t*)malloc(sizeof(*req));
uv_req_init(req, (uv_handle_t*)(&pinger->tcp),
(void *(*)(void *))pinger_after_write);
req = malloc(sizeof(uv_write_t));
if (uv_write(req, &buf, 1)) {
if (uv_write(req, (uv_stream_t*)&pinger->tcp, &buf, 1, pinger_after_write)) {
FATAL("uv_write failed");
}
@ -134,7 +130,7 @@ static void pinger_read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf) {
}
static void pinger_on_connect(uv_req_t *req, int status) {
static void pinger_on_connect(uv_connect_t *req, int status) {
pinger_t *pinger = (pinger_t*)req->handle->data;
ASSERT(status == 0);
@ -162,10 +158,8 @@ static void tcp_pinger_v6_new() {
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
(void *(*)(void *))pinger_on_connect);
r = uv_tcp_connect6(&pinger->connect_req, server_addr);
r = uv_tcp_connect(&pinger->connect_req, &pinger->tcp, server_addr,
pinger_on_connect);
ASSERT(!r);
}
@ -209,10 +203,9 @@ static void pipe_pinger_new() {
/* We are never doing multiple reads/connects at a time anyway. */
/* so these handles can be pre-initialized. */
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->pipe),
(void *(*)(void *))pinger_on_connect);
r = uv_pipe_connect(&pinger->connect_req, TEST_PIPENAME);
r = uv_tcp_connect6(&pinger->connect_req, &pinger->tcp, server_addr,
pinger_on_connect);
ASSERT(!r);
}

View File

@ -26,7 +26,9 @@
static uv_timer_t timer;
static uv_tcp_t tcp;
static uv_req_t connect_req, write_req, shutdown_req;
static uv_connect_t connect_req;
static uv_write_t write_req;
static uv_shutdown_t shutdown_req;
static uv_buf_t qbuf;
static int got_q;
static int got_eof;
@ -74,7 +76,7 @@ static void read_cb(uv_stream_t* t, ssize_t nread, uv_buf_t buf) {
}
static void shutdown_cb(uv_req_t *req, int status) {
static void shutdown_cb(uv_shutdown_t *req, int status) {
ASSERT(req == &shutdown_req);
ASSERT(called_connect_cb == 1);
@ -87,7 +89,7 @@ static void shutdown_cb(uv_req_t *req, int status) {
}
static void connect_cb(uv_req_t *req, int status) {
static void connect_cb(uv_connect_t *req, int status) {
ASSERT(status == 0);
ASSERT(req == &connect_req);
@ -98,12 +100,10 @@ static void connect_cb(uv_req_t *req, int status) {
* Write the letter 'Q' to gracefully kill the echo-server. This will not
* effect our connection.
*/
uv_req_init(&write_req, (uv_handle_t*)&tcp, NULL);
uv_write(&write_req, &qbuf, 1);
uv_write(&write_req, (uv_stream_t*) &tcp, &qbuf, 1, NULL);
/* Shutdown our end of the connection. */
uv_req_init(&shutdown_req, (uv_handle_t*)&tcp, (void *(*)(void *))shutdown_cb);
uv_shutdown(&shutdown_req);
uv_shutdown(&shutdown_req, (uv_stream_t*) &tcp, shutdown_cb);
called_connect_cb++;
ASSERT(called_shutdown_cb == 0);
@ -165,8 +165,7 @@ TEST_IMPL(shutdown_eof) {
r = uv_tcp_init(&tcp);
ASSERT(!r);
uv_req_init(&connect_req, (uv_handle_t*) &tcp, (void *(*)(void *))connect_cb);
r = uv_tcp_connect(&connect_req, server_addr);
r = uv_tcp_connect(&connect_req, &tcp, server_addr, connect_cb);
ASSERT(!r);
uv_run();

View File

@ -62,7 +62,7 @@ static void close_cb(uv_handle_t* handle) {
}
static void shutdown_cb(uv_req_t* req, int status) {
static void shutdown_cb(uv_shutdown_t* req, int status) {
uv_tcp_t* tcp;
ASSERT(req);
@ -104,7 +104,7 @@ static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
}
static void write_cb(uv_req_t* req, int status) {
static void write_cb(uv_write_t* req, int status) {
ASSERT(req != NULL);
if (status) {
@ -120,9 +120,11 @@ static void write_cb(uv_req_t* req, int status) {
}
static void connect_cb(uv_req_t* req, int status) {
static void connect_cb(uv_connect_t* req, int status) {
uv_buf_t send_bufs[CHUNKS_PER_WRITE];
uv_tcp_t* tcp;
uv_write_t* write_req;
uv_shutdown_t* shutdown_req;
int i, j, r;
ASSERT(req != NULL);
@ -141,26 +143,21 @@ static void connect_cb(uv_req_t* req, int status) {
bytes_sent += CHUNK_SIZE;
}
req = (uv_req_t*)malloc(sizeof *req);
ASSERT(req != NULL);
write_req = malloc(sizeof(uv_write_t));
ASSERT(write_req != NULL);
uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))write_cb);
r = uv_write(req, (uv_buf_t*)&send_bufs, CHUNKS_PER_WRITE);
r = uv_write(write_req, (uv_stream_t*) tcp, (uv_buf_t*)&send_bufs,
CHUNKS_PER_WRITE, write_cb);
ASSERT(r == 0);
}
/* Shutdown on drain. FIXME: dealloc req? */
req = (uv_req_t*) malloc(sizeof(uv_req_t));
ASSERT(req != NULL);
uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))shutdown_cb);
r = uv_shutdown(req);
shutdown_req = malloc(sizeof(uv_shutdown_t));
ASSERT(shutdown_req != NULL);
r = uv_shutdown(shutdown_req, (uv_stream_t*)tcp, shutdown_cb);
ASSERT(r == 0);
/* Start reading */
req = (uv_req_t*)malloc(sizeof *req);
ASSERT(req != NULL);
uv_req_init(req, (uv_handle_t*)tcp, (void *(*)(void *))read_cb);
r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb);
ASSERT(r == 0);
}
@ -169,7 +166,7 @@ static void connect_cb(uv_req_t* req, int status) {
TEST_IMPL(tcp_writealot) {
struct sockaddr_in addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
uv_tcp_t* client = (uv_tcp_t*)malloc(sizeof *client);
uv_req_t* connect_req = (uv_req_t*)malloc(sizeof *connect_req);
uv_connect_t* connect_req = malloc(sizeof(uv_connect_t));
int r;
ASSERT(client != NULL);
@ -184,8 +181,7 @@ TEST_IMPL(tcp_writealot) {
r = uv_tcp_init(client);
ASSERT(r == 0);
uv_req_init(connect_req, (uv_handle_t*)client, (void *(*)(void *))connect_cb);
r = uv_tcp_connect(connect_req, addr);
r = uv_tcp_connect(connect_req, client, addr, connect_cb);
ASSERT(r == 0);
uv_run();