uv_stream_t

This commit is contained in:
Igor Zinkovsky 2011-06-30 17:39:27 -07:00 committed by Ryan Dahl
parent 9d84f92613
commit 320057d588
17 changed files with 189 additions and 162 deletions

View File

@ -52,7 +52,7 @@ static int completed_pingers = 0;
static int64_t start_time;
static uv_buf_t buf_alloc(uv_tcp_t* tcp, size_t size) {
static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) {
buf_t* ab;
ab = buf_freelist;
@ -125,7 +125,7 @@ static void pinger_shutdown_cb(uv_handle_t* handle, int status) {
}
static void pinger_read_cb(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf) {
static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
unsigned int i;
pinger_t* pinger;
@ -171,7 +171,7 @@ static void pinger_connect_cb(uv_req_t *req, int status) {
pinger_write_ping(pinger);
if (uv_read_start((uv_tcp_t*)(req->handle), buf_alloc, pinger_read_cb)) {
if (uv_read_start((uv_stream_t*)(req->handle), buf_alloc, pinger_read_cb)) {
FATAL("uv_read_start failed");
}
}
@ -198,8 +198,8 @@ static void pinger_new() {
uv_req_init(&pinger->connect_req, (uv_handle_t*)&pinger->tcp,
pinger_connect_cb);
uv_bind(&pinger->tcp, client_addr);
r = uv_connect(&pinger->connect_req, server_addr);
uv_tcp_bind(&pinger->tcp, client_addr);
r = uv_tcp_connect(&pinger->connect_req, server_addr);
ASSERT(!r);
}

View File

@ -35,13 +35,13 @@ static int TARGET_CONNECTIONS;
#define STATS_COUNT 5
static void do_write(uv_tcp_t*);
static void do_write(uv_stream_t*);
static void maybe_connect_some();
static uv_req_t* req_alloc();
static void req_free(uv_req_t* uv_req);
static uv_buf_t buf_alloc(uv_tcp_t*, size_t size);
static uv_buf_t buf_alloc(uv_stream_t*, size_t size);
static void buf_free(uv_buf_t uv_buf_t);
@ -154,7 +154,7 @@ static void start_stats_collection() {
}
static void read_cb(uv_tcp_t* tcp, ssize_t bytes, uv_buf_t buf) {
static void read_cb(uv_stream_t* tcp, ssize_t bytes, uv_buf_t buf) {
if (nrecv_total == 0) {
ASSERT(start_time == 0);
uv_update_time();
@ -183,11 +183,11 @@ static void write_cb(uv_req_t *req, int status) {
nsent += sizeof write_buffer;
nsent_total += sizeof write_buffer;
do_write((uv_tcp_t*)req->handle);
do_write((uv_stream_t*)req->handle);
}
static void do_write(uv_tcp_t* tcp) {
static void do_write(uv_stream_t* tcp) {
uv_req_t* req;
uv_buf_t buf;
int r;
@ -221,7 +221,7 @@ static void connect_cb(uv_req_t* req, int status) {
/* Yay! start writing */
for (i = 0; i < write_sockets; i++) {
do_write(&write_handles[i]);
do_write((uv_stream_t*)&write_handles[i]);
}
}
}
@ -241,27 +241,27 @@ static void maybe_connect_some() {
req = req_alloc();
uv_req_init(req, (uv_handle_t*)tcp, connect_cb);
r = uv_connect(req, connect_addr);
r = uv_tcp_connect(req, connect_addr);
ASSERT(r == 0);
}
}
static void connection_cb(uv_tcp_t* s, int status) {
static void connection_cb(uv_handle_t* s, int status) {
uv_tcp_t* tcp;
int r;
ASSERT(&server == s);
ASSERT(&server == (uv_tcp_t*)s);
ASSERT(status == 0);
tcp = malloc(sizeof(uv_tcp_t));
uv_tcp_init(tcp);
r = uv_accept(s, tcp);
r = uv_accept(s, (uv_stream_t*)tcp);
ASSERT(r == 0);
r = uv_read_start(tcp, buf_alloc, read_cb);
r = uv_read_start((uv_stream_t*)tcp, buf_alloc, read_cb);
ASSERT(r == 0);
read_sockets++;
@ -317,7 +317,7 @@ typedef struct buf_list_s {
static buf_list_t* buf_freelist = NULL;
static uv_buf_t buf_alloc(uv_tcp_t* tcp, size_t size) {
static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) {
buf_list_t* buf;
buf = buf_freelist;
@ -351,9 +351,9 @@ HELPER_IMPL(pump_server) {
/* Server */
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, listen_addr);
r = uv_tcp_bind(&server, listen_addr);
ASSERT(r == 0);
r = uv_listen(&server, MAX_WRITE_HANDLES, connection_cb);
r = uv_tcp_listen(&server, MAX_WRITE_HANDLES, connection_cb);
ASSERT(r == 0);
uv_run();

View File

@ -52,10 +52,10 @@ static uv_tcp_t server;
static void after_write(uv_req_t* req, int status);
static void after_read(uv_tcp_t*, ssize_t nread, uv_buf_t buf);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_tcp_t*, int status);
static void on_connection(uv_handle_t*, int status);
#define WRITE_BUF_LEN (64*1024)
#define DNSREC_LEN (4)
@ -115,7 +115,7 @@ static void addrsp(write_req_t* wr, char* hdr) {
wr->buf.len += rsplen;
}
static void process_req(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
static void process_req(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
write_req_t *wr;
dnshandle* dns = (dnshandle*)handle;
char hdrbuf[DNSREC_LEN];
@ -216,7 +216,7 @@ static void process_req(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
}
}
static void after_read(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
uv_req_t* req;
if (nread < 0) {
@ -249,7 +249,7 @@ static void on_close(uv_handle_t* peer) {
}
static uv_buf_t buf_alloc(uv_tcp_t* handle, size_t suggested_size) {
static uv_buf_t buf_alloc(uv_stream_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*) malloc(suggested_size);
buf.len = suggested_size;
@ -257,7 +257,7 @@ static uv_buf_t buf_alloc(uv_tcp_t* handle, size_t suggested_size) {
}
static void on_connection(uv_tcp_t* server, int status) {
static void on_connection(uv_handle_t* server, int status) {
dnshandle* handle;
int r;
@ -273,10 +273,10 @@ static void on_connection(uv_tcp_t* server, int status) {
uv_tcp_init((uv_tcp_t*)handle);
r = uv_accept(server, (uv_tcp_t*)handle);
r = uv_accept(server, (uv_stream_t*)handle);
ASSERT(r == 0);
r = uv_read_start((uv_tcp_t*)handle, buf_alloc, after_read);
r = uv_read_start((uv_stream_t*)handle, buf_alloc, after_read);
ASSERT(r == 0);
}
@ -297,14 +297,14 @@ static int dns_start(int port) {
return 1;
}
r = uv_bind(&server, addr);
r = uv_tcp_bind(&server, addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
r = uv_listen(&server, 128, on_connection);
r = uv_tcp_listen(&server, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");

View File

@ -36,10 +36,10 @@ static uv_tcp_t server;
static void after_write(uv_req_t* req, int status);
static void after_read(uv_tcp_t*, ssize_t nread, uv_buf_t buf);
static void after_read(uv_stream_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_tcp_t*, int status);
static void on_connection(uv_handle_t*, int status);
static void after_write(uv_req_t* req, int status) {
@ -65,7 +65,7 @@ static void after_shutdown(uv_req_t* req, int status) {
}
static void after_read(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
static void after_read(uv_stream_t* handle, ssize_t nread, uv_buf_t buf) {
int i;
write_req_t *wr;
uv_req_t* req;
@ -117,7 +117,7 @@ static void on_close(uv_handle_t* peer) {
}
static uv_buf_t echo_alloc(uv_tcp_t* handle, size_t suggested_size) {
static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*) malloc(suggested_size);
buf.len = suggested_size;
@ -125,7 +125,7 @@ static uv_buf_t echo_alloc(uv_tcp_t* handle, size_t suggested_size) {
}
static void on_connection(uv_tcp_t* server, int status) {
static void on_connection(uv_handle_t* server, int status) {
uv_tcp_t* handle;
int r;
@ -136,10 +136,10 @@ static void on_connection(uv_tcp_t* server, int status) {
uv_tcp_init(handle);
r = uv_accept(server, handle);
r = uv_accept(server, (uv_stream_t*)handle);
ASSERT(r == 0);
r = uv_read_start(handle, echo_alloc, after_read);
r = uv_read_start((uv_stream_t*)handle, echo_alloc, after_read);
ASSERT(r == 0);
}
@ -160,14 +160,14 @@ static int echo_start(int port) {
return 1;
}
r = uv_bind(&server, addr);
r = uv_tcp_bind(&server, addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
r = uv_listen(&server, 128, on_connection);
r = uv_tcp_listen(&server, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");

View File

@ -43,17 +43,17 @@ TEST_IMPL(bind_error_addrinuse) {
r = uv_tcp_init(&server1);
ASSERT(r == 0);
r = uv_bind(&server1, addr);
r = uv_tcp_bind(&server1, addr);
ASSERT(r == 0);
r = uv_tcp_init(&server2);
ASSERT(r == 0);
r = uv_bind(&server2, addr);
r = uv_tcp_bind(&server2, addr);
ASSERT(r == 0);
r = uv_listen(&server1, 128, NULL);
r = uv_tcp_listen(&server1, 128, NULL);
ASSERT(r == 0);
r = uv_listen(&server2, 128, NULL);
r = uv_tcp_listen(&server2, 128, NULL);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EADDRINUSE);
@ -78,7 +78,7 @@ TEST_IMPL(bind_error_addrnotavail_1) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, addr);
r = uv_tcp_bind(&server, addr);
/* It seems that Linux is broken here - bind succeeds. */
if (r == -1) {
@ -104,7 +104,7 @@ TEST_IMPL(bind_error_addrnotavail_2) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, addr);
r = uv_tcp_bind(&server, addr);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EADDRNOTAVAIL);
@ -130,7 +130,7 @@ TEST_IMPL(bind_error_fault) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, *garbage_addr);
r = uv_tcp_bind(&server, *garbage_addr);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EFAULT);
@ -156,9 +156,9 @@ TEST_IMPL(bind_error_inval) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, addr1);
r = uv_tcp_bind(&server, addr1);
ASSERT(r == 0);
r = uv_bind(&server, addr2);
r = uv_tcp_bind(&server, addr2);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EINVAL);
@ -183,7 +183,7 @@ TEST_IMPL(bind_localhost_ok) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind(&server, addr);
r = uv_tcp_bind(&server, addr);
ASSERT(r == 0);
return 0;

View File

@ -43,17 +43,17 @@ TEST_IMPL(bind6_error_addrinuse) {
r = uv_tcp_init(&server1);
ASSERT(r == 0);
r = uv_bind6(&server1, addr);
r = uv_tcp_bind6(&server1, addr);
ASSERT(r == 0);
r = uv_tcp_init(&server2);
ASSERT(r == 0);
r = uv_bind6(&server2, addr);
r = uv_tcp_bind6(&server2, addr);
ASSERT(r == 0);
r = uv_listen(&server1, 128, NULL);
r = uv_tcp_listen(&server1, 128, NULL);
ASSERT(r == 0);
r = uv_listen(&server2, 128, NULL);
r = uv_tcp_listen(&server2, 128, NULL);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EADDRINUSE);
@ -78,7 +78,7 @@ TEST_IMPL(bind6_error_addrnotavail) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind6(&server, addr);
r = uv_tcp_bind6(&server, addr);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EADDRNOTAVAIL);
@ -104,7 +104,7 @@ TEST_IMPL(bind6_error_fault) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind6(&server, *garbage_addr);
r = uv_tcp_bind6(&server, *garbage_addr);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EFAULT);
@ -130,9 +130,9 @@ TEST_IMPL(bind6_error_inval) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind6(&server, addr1);
r = uv_tcp_bind6(&server, addr1);
ASSERT(r == 0);
r = uv_bind6(&server, addr2);
r = uv_tcp_bind6(&server, addr2);
ASSERT(r == -1);
ASSERT(uv_last_error().code == UV_EINVAL);
@ -157,7 +157,7 @@ TEST_IMPL(bind6_localhost_ok) {
r = uv_tcp_init(&server);
ASSERT(r == 0);
r = uv_bind6(&server, addr);
r = uv_tcp_bind6(&server, addr);
ASSERT(r == 0);
return 0;

View File

@ -43,7 +43,7 @@ static int bytes_received = 0;
static int shutdown_cb_called = 0;
static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
uv_buf_t buf;
buf.len = size;
buf.base = (char*) malloc(size);
@ -67,7 +67,7 @@ static void shutdown_cb(uv_req_t* req, int status) {
}
static void read_cb(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf) {
static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
ASSERT(nested == 0 && "read_cb must be called from a fresh stack");
printf("Read. nread == %d\n", nread);
@ -119,7 +119,7 @@ static void timer_cb(uv_timer_t* handle, int status) {
puts("Timeout complete. Now read data...");
nested++;
if (uv_read_start(&client, alloc_cb, read_cb)) {
if (uv_read_start((uv_stream_t*)&client, alloc_cb, read_cb)) {
FATAL("uv_read_start failed");
}
nested--;
@ -192,8 +192,8 @@ TEST_IMPL(callback_stack) {
nested++;
uv_req_init(&connect_req, (uv_handle_t*)&client, connect_cb);
if (uv_connect(&connect_req, addr)) {
FATAL("uv_connect failed");
if (uv_tcp_connect(&connect_req, addr)) {
FATAL("uv_tcp_connect failed");
}
nested--;

View File

@ -105,8 +105,8 @@ void connection_fail(uv_connect_cb connect_cb) {
/* so these handles can be pre-initialized. */
uv_req_init(&req, (uv_handle_t*)&tcp, connect_cb);
uv_bind(&tcp, client_addr);
r = uv_connect(&req, server_addr);
uv_tcp_bind(&tcp, client_addr);
r = uv_tcp_connect(&req, server_addr);
ASSERT(!r);
uv_run();

View File

@ -32,7 +32,7 @@ static int close_cb_called = 0;
static int connect_cb_called = 0;
static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;
@ -65,7 +65,7 @@ static void do_accept(uv_timer_t* timer_handle, int status) {
tcpcnt = uv_counters()->tcp_init;
server = (uv_tcp_t*)timer_handle->data;
r = uv_accept(server, accepted_handle);
r = uv_accept((uv_handle_t*)server, (uv_stream_t*)accepted_handle);
ASSERT(r == 0);
ASSERT(uv_counters()->tcp_init == tcpcnt);
@ -88,7 +88,7 @@ static void do_accept(uv_timer_t* timer_handle, int status) {
}
static void connection_cb(uv_tcp_t* tcp, int status) {
static void connection_cb(uv_handle_t* tcp, int status) {
int r;
uv_timer_t* timer_handle;
@ -122,15 +122,15 @@ static void start_server() {
ASSERT(uv_counters()->tcp_init == 1);
ASSERT(uv_counters()->handle_init == 1);
r = uv_bind(server, addr);
r = uv_tcp_bind(server, addr);
ASSERT(r == 0);
r = uv_listen(server, 128, connection_cb);
r = uv_tcp_listen(server, 128, connection_cb);
ASSERT(r == 0);
}
static void read_cb(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf) {
static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
/* The server will not send anything, it should close gracefully. */
if (buf.base) {
@ -157,7 +157,7 @@ static void connect_cb(uv_req_t* req, int status) {
/* Not that the server will send anything, but otherwise we'll never know */
/* when te server closes the connection. */
r = uv_read_start((uv_tcp_t*)(req->handle), alloc_cb, read_cb);
r = uv_read_start((uv_stream_t*)(req->handle), alloc_cb, read_cb);
ASSERT(r == 0);
connect_cb_called++;
@ -179,7 +179,7 @@ static void client_connect() {
ASSERT(r == 0);
uv_req_init(connect_req, (uv_handle_t*)client, connect_cb);
r = uv_connect(connect_req, addr);
r = uv_tcp_connect(connect_req, addr);
ASSERT(r == 0);
}

View File

@ -48,7 +48,7 @@ typedef struct {
void pinger_try_read(pinger_t* pinger);
static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;
@ -92,7 +92,7 @@ static void pinger_write_ping(pinger_t* pinger) {
}
static void pinger_read_cb(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf) {
static void pinger_read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
unsigned int i;
pinger_t* pinger;
@ -137,7 +137,7 @@ static void pinger_on_connect(uv_req_t *req, int status) {
pinger_write_ping(pinger);
uv_read_start((uv_tcp_t*)(req->handle), alloc_cb, pinger_read_cb);
uv_read_start((uv_stream_t*)(req->handle), alloc_cb, pinger_read_cb);
}
@ -160,7 +160,7 @@ static void pinger_new() {
uv_req_init(&pinger->connect_req, (uv_handle_t*)(&pinger->tcp),
pinger_on_connect);
r = uv_connect(&pinger->connect_req, server_addr);
r = uv_tcp_connect(&pinger->connect_req, server_addr);
ASSERT(!r);
}

View File

@ -37,7 +37,7 @@ static int called_timer_close_cb;
static int called_timer_cb;
static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;
@ -45,10 +45,10 @@ static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
}
static void read_cb(uv_tcp_t* t, ssize_t nread, uv_buf_t buf) {
static void read_cb(uv_stream_t* t, ssize_t nread, uv_buf_t buf) {
uv_err_t err = uv_last_error();
ASSERT(t == &tcp);
ASSERT((uv_tcp_t*)t == &tcp);
if (nread == 0) {
ASSERT(err.code == UV_EAGAIN);
@ -92,7 +92,7 @@ static void connect_cb(uv_req_t *req, int status) {
ASSERT(req == &connect_req);
/* Start reading from our connection so we can receive the EOF. */
uv_read_start(&tcp, alloc_cb, read_cb);
uv_read_start((uv_stream_t*)&tcp, alloc_cb, read_cb);
/*
* Write the letter 'Q' to gracefully kill the echo-server. This will not
@ -166,7 +166,7 @@ TEST_IMPL(shutdown_eof) {
ASSERT(!r);
uv_req_init(&connect_req, (uv_handle_t*) &tcp, connect_cb);
r = uv_connect(&connect_req, server_addr);
r = uv_tcp_connect(&connect_req, server_addr);
ASSERT(!r);
uv_run();

View File

@ -45,7 +45,7 @@ static int bytes_received = 0;
static int bytes_received_done = 0;
static uv_buf_t alloc_cb(uv_tcp_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;
@ -83,7 +83,7 @@ static void shutdown_cb(uv_req_t* req, int status) {
}
static void read_cb(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf) {
static void read_cb(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf) {
ASSERT(tcp != NULL);
if (nread < 0) {
@ -161,7 +161,7 @@ static void connect_cb(uv_req_t* req, int status) {
ASSERT(req != NULL);
uv_req_init(req, (uv_handle_t*)tcp, read_cb);
r = uv_read_start(tcp, alloc_cb, read_cb);
r = uv_read_start((uv_stream_t*)tcp, alloc_cb, read_cb);
ASSERT(r == 0);
}
@ -185,7 +185,7 @@ TEST_IMPL(tcp_writealot) {
ASSERT(r == 0);
uv_req_init(connect_req, (uv_handle_t*)client, connect_cb);
r = uv_connect(connect_req, addr);
r = uv_tcp_connect(connect_req, addr);
ASSERT(r == 0);
uv_run();

View File

@ -310,7 +310,7 @@ int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, int addrsize) {
}
int uv_bind(uv_tcp_t* tcp, struct sockaddr_in addr) {
int uv_tcp_bind(uv_tcp_t* tcp, struct sockaddr_in addr) {
if (addr.sin_family != AF_INET) {
uv_err_new((uv_handle_t*)tcp, EFAULT);
return -1;
@ -320,7 +320,7 @@ int uv_bind(uv_tcp_t* tcp, struct sockaddr_in addr) {
}
int uv_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
if (addr.sin6_family != AF_INET6) {
uv_err_new((uv_handle_t*)tcp, EFAULT);
return -1;
@ -391,12 +391,12 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
return;
} else {
uv_err_new((uv_handle_t*)tcp, errno);
tcp->connection_cb(tcp, -1);
tcp->connection_cb((uv_handle_t*)tcp, -1);
}
} else {
tcp->accepted_fd = fd;
tcp->connection_cb(tcp, 0);
tcp->connection_cb((uv_handle_t*)tcp, 0);
if (tcp->accepted_fd >= 0) {
/* The user hasn't yet accepted called uv_accept() */
ev_io_stop(EV_DEFAULT_ &tcp->read_watcher);
@ -407,26 +407,29 @@ void uv__server_io(EV_P_ ev_io* watcher, int revents) {
}
int uv_accept(uv_tcp_t* server, uv_tcp_t* client) {
if (server->accepted_fd < 0) {
uv_err_new((uv_handle_t*) server, EAGAIN);
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
uv_tcp_t* tcpServer = (uv_tcp_t*)server;
uv_tcp_t* tcpClient = (uv_tcp_t*)client;
if (tcpServer->accepted_fd < 0) {
uv_err_new(server, EAGAIN);
return -1;
}
if (uv_tcp_open(client, server->accepted_fd)) {
if (uv_tcp_open(tcpClient, tcpServer->accepted_fd)) {
/* Ignore error for now */
server->accepted_fd = -1;
close(server->accepted_fd);
tcpServer->accepted_fd = -1;
close(tcpServer->accepted_fd);
return -1;
} else {
server->accepted_fd = -1;
ev_io_start(EV_DEFAULT_ &server->read_watcher);
tcpServer->accepted_fd = -1;
ev_io_start(EV_DEFAULT_ &tcpServer->read_watcher);
return 0;
}
}
int uv_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
int r;
assert(tcp->fd >= 0);
@ -723,7 +726,7 @@ void uv__read(uv_tcp_t* tcp) {
*/
while (tcp->read_cb && uv_flag_is_set((uv_handle_t*)tcp, UV_READING)) {
assert(tcp->alloc_cb);
buf = tcp->alloc_cb(tcp, 64 * 1024);
buf = tcp->alloc_cb((uv_stream_t*)tcp, 64 * 1024);
assert(buf.len > 0);
assert(buf.base);
@ -740,12 +743,12 @@ void uv__read(uv_tcp_t* tcp) {
ev_io_start(EV_DEFAULT_UC_ &tcp->read_watcher);
}
uv_err_new((uv_handle_t*)tcp, EAGAIN);
tcp->read_cb(tcp, 0, buf);
tcp->read_cb((uv_stream_t*)tcp, 0, buf);
return;
} else {
/* Error. User should call uv_close(). */
uv_err_new((uv_handle_t*)tcp, errno);
tcp->read_cb(tcp, -1, buf);
tcp->read_cb((uv_stream_t*)tcp, -1, buf);
assert(!ev_is_active(&tcp->read_watcher));
return;
}
@ -753,11 +756,11 @@ void uv__read(uv_tcp_t* tcp) {
/* EOF */
uv_err_new_artificial((uv_handle_t*)tcp, UV_EOF);
ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher);
tcp->read_cb(tcp, -1, buf);
tcp->read_cb((uv_stream_t*)tcp, -1, buf);
return;
} else {
/* Successful read */
tcp->read_cb(tcp, nread, buf);
tcp->read_cb((uv_stream_t*)tcp, nread, buf);
}
}
}
@ -873,7 +876,7 @@ static void uv__tcp_connect(uv_tcp_t* tcp) {
}
int uv_connect(uv_req_t* req, struct sockaddr_in addr) {
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
uv_tcp_t* tcp = (uv_tcp_t*)req->handle;
int addrsize;
int r;
@ -1027,7 +1030,9 @@ int64_t uv_now() {
}
int uv_read_start(uv_tcp_t* tcp, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
uv_tcp_t* tcp = (uv_tcp_t*)stream;
/* The UV_READING flag is irrelevant of the state of the tcp - it just
* expresses the desired state of the user.
*/
@ -1052,7 +1057,9 @@ int uv_read_start(uv_tcp_t* tcp, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
}
int uv_read_stop(uv_tcp_t* tcp) {
int uv_read_stop(uv_stream_t* stream) {
uv_tcp_t* tcp = (uv_tcp_t*)stream;
uv_flag_unset((uv_handle_t*)tcp, UV_READING);
ev_io_stop(EV_DEFAULT_UC_ &tcp->read_watcher);

View File

@ -55,11 +55,14 @@ typedef struct {
ev_idle next_watcher;
#define UV_STREAM_PRIVATE_FIELDS \
uv_read_cb read_cb; \
uv_alloc_cb alloc_cb;
/* UV_TCP */
#define UV_TCP_PRIVATE_FIELDS \
int delayed_error; \
uv_read_cb read_cb; \
uv_alloc_cb alloc_cb; \
uv_connection_cb connection_cb; \
int accepted_fd; \
uv_req_t *connect_req; \

View File

@ -800,7 +800,7 @@ int uv__bind(uv_tcp_t* handle, int domain, struct sockaddr* addr, int addrsize)
}
int uv_bind(uv_tcp_t* handle, struct sockaddr_in addr) {
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in addr) {
if (addr.sin_family != AF_INET) {
uv_set_sys_error(WSAEFAULT);
return -1;
@ -810,7 +810,7 @@ int uv_bind(uv_tcp_t* handle, struct sockaddr_in addr) {
}
int uv_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6 addr) {
if (addr.sin6_family != AF_INET6) {
uv_set_sys_error(WSAEFAULT);
return -1;
@ -907,7 +907,7 @@ static void uv_queue_read(uv_tcp_t* handle) {
}
int uv_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
assert(backlog > 0);
if (handle->flags & UV_HANDLE_BIND_ERROR) {
@ -937,32 +937,34 @@ int uv_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
}
int uv_accept(uv_tcp_t* server, uv_tcp_t* client) {
int uv_accept(uv_handle_t* server, uv_stream_t* client) {
int rv = 0;
if (server->accept_socket == INVALID_SOCKET) {
uv_tcp_t* tcpServer = (uv_tcp_t*)server;
uv_tcp_t* tcpClient = (uv_tcp_t*)client;
if (tcpServer->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN);
return -1;
}
if (uv_tcp_set_socket(client, server->accept_socket) == -1) {
closesocket(server->accept_socket);
if (uv_tcp_set_socket(tcpClient, tcpServer->accept_socket) == -1) {
closesocket(tcpServer->accept_socket);
rv = -1;
} else {
uv_tcp_init_connection(client);
uv_tcp_init_connection(tcpClient);
}
server->accept_socket = INVALID_SOCKET;
tcpServer->accept_socket = INVALID_SOCKET;
if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_queue_accept(server);
if (!(tcpServer->flags & UV_HANDLE_CLOSING)) {
uv_queue_accept(tcpServer);
}
return rv;
}
int uv_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
int uv_read_start(uv_stream_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
if (!(handle->flags & UV_HANDLE_CONNECTION)) {
uv_set_sys_error(WSAEINVAL);
return -1;
@ -985,20 +987,20 @@ int uv_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb, uv_read_cb read_cb) {
/* If reading was stopped and then started again, there could stell be a */
/* read request pending. */
if (!(handle->read_req.flags & UV_REQ_PENDING))
uv_queue_read(handle);
uv_queue_read((uv_tcp_t*)handle);
return 0;
}
int uv_read_stop(uv_tcp_t* handle) {
int uv_read_stop(uv_stream_t* handle) {
handle->flags &= ~UV_HANDLE_READING;
return 0;
}
int uv_connect(uv_req_t* req, struct sockaddr_in addr) {
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in addr) {
int addrsize = sizeof(struct sockaddr_in);
BOOL success;
DWORD bytes;
@ -1017,7 +1019,7 @@ int uv_connect(uv_req_t* req, struct sockaddr_in addr) {
}
if (!(handle->flags & UV_HANDLE_BOUND) &&
uv_bind(handle, uv_addr_ip4_any_) < 0)
uv_tcp_bind(handle, uv_addr_ip4_any_) < 0)
return -1;
memset(&req->overlapped, 0, sizeof(req->overlapped));
@ -1170,13 +1172,13 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
uv_last_error_ = req->error;
buf.base = 0;
buf.len = 0;
handle->read_cb(handle, -1, buf);
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb(handle, 65536);
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
flags = 0;
if (WSARecv(handle->socket,
@ -1188,7 +1190,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
/* Successful read */
handle->read_cb(handle, bytes, buf);
handle->read_cb((uv_stream_t*)handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
@ -1199,7 +1201,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
handle->flags |= UV_HANDLE_EOF;
uv_last_error_.code = UV_EOF;
uv_last_error_.sys_errno_ = ERROR_SUCCESS;
handle->read_cb(handle, -1, buf);
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
} else {
@ -1207,11 +1209,11 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
if (err == WSAEWOULDBLOCK) {
/* Read buffer was completely empty, report a 0-byte read. */
uv_set_sys_error(WSAEWOULDBLOCK);
handle->read_cb(handle, 0, buf);
handle->read_cb((uv_stream_t*)handle, 0, buf);
} else {
/* Ouch! serious error. */
uv_set_sys_error(err);
handle->read_cb(handle, -1, buf);
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
break;
}
@ -1234,7 +1236,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
uv_last_error_ = req->error;
handle->connection_cb(handle, -1);
handle->connection_cb((uv_handle_t*)handle, -1);
}
break;
}
@ -1247,7 +1249,7 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
sizeof(handle->socket)) == 0) {
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb(handle, 0);
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
/* Error related to accepted socket is ignored because the server */

View File

@ -52,10 +52,12 @@ typedef struct uv_buf_t {
uv_err_t error; \
struct uv_req_s* next_req;
#define uv_tcp_connection_fields \
#define UV_STREAM_PRIVATE_FIELDS \
uv_alloc_cb alloc_cb; \
uv_read_cb read_cb; \
struct uv_req_s read_req; \
#define uv_tcp_connection_fields \
unsigned int write_reqs_pending; \
uv_req_t* shutdown_req;
@ -107,7 +109,6 @@ typedef struct uv_buf_t {
unsigned int flags; \
uv_err_t error;
#define UV_ARES_TASK_PRIVATE_FIELDS \
struct uv_req_s ares_req; \
SOCKET sock; \

68
uv.h
View File

@ -41,6 +41,7 @@ typedef intptr_t ssize_t;
typedef struct uv_err_s uv_err_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
@ -66,12 +67,12 @@ typedef struct uv_getaddrinfo_s uv_getaddrinfo_t;
* In the case of uv_read_cb the uv_buf_t returned should be freed by the
* user.
*/
typedef uv_buf_t (*uv_alloc_cb)(uv_tcp_t* tcp, size_t suggested_size);
typedef void (*uv_read_cb)(uv_tcp_t* tcp, ssize_t nread, uv_buf_t buf);
typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size);
typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf);
typedef void (*uv_write_cb)(uv_req_t* req, int status);
typedef void (*uv_connect_cb)(uv_req_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_req_t* req, int status);
typedef void (*uv_connection_cb)(uv_tcp_t* server, int status);
typedef void (*uv_connection_cb)(uv_handle_t* server, int status);
typedef void (*uv_close_cb)(uv_handle_t* handle);
typedef void (*uv_timer_cb)(uv_timer_t* handle, int status);
/* TODO: do these really need a status argument? */
@ -177,6 +178,8 @@ struct uv_req_s {
*/
void uv_req_init(uv_req_t* req, uv_handle_t* handle, void* cb);
int uv_shutdown(uv_req_t* req);
#define UV_HANDLE_FIELDS \
/* read-only */ \
@ -205,31 +208,21 @@ int uv_is_active(uv_handle_t* handle);
int uv_close(uv_handle_t* handle, uv_close_cb close_cb);
/*
* A subclass of uv_handle_t representing a TCP stream or TCP server. In the
* future this will probably be split into two classes - one a stream and
* the other a server.
*/
struct uv_tcp_s {
#define UV_STREAM_FIELDS \
/* number of bytes queued for writing */ \
size_t write_queue_size; \
/* private */ \
UV_STREAM_PRIVATE_FIELDS \
/* The abstract base class for all streams. */
struct uv_stream_s {
UV_HANDLE_FIELDS
size_t write_queue_size; /* number of bytes queued for writing */
UV_TCP_PRIVATE_FIELDS
UV_STREAM_FIELDS
};
int uv_tcp_init(uv_tcp_t* handle);
int uv_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_bind6(uv_tcp_t* handle, struct sockaddr_in6);
int uv_connect(uv_req_t* req, struct sockaddr_in);
int uv_shutdown(uv_req_t* req);
int uv_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
/* This call is used in conjunction with uv_listen() to accept incoming TCP
/* This call is used in conjunction with uv_listen() to accept incoming
* connections. Call uv_accept after receiving a uv_connection_cb to accept
* the connection. Before calling uv_accept use uv_tcp_init() must be
* the connection. Before calling uv_accept use uv_*_init() must be
* called on the client. Non-zero return value indicates an error.
*
* When the uv_connection_cb is called it is guaranteed that uv_accept will
@ -237,7 +230,7 @@ int uv_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
* once, it may fail. It is suggested to only call uv_accept once per
* uv_connection_cb call.
*/
int uv_accept(uv_tcp_t* server, uv_tcp_t* client);
int uv_accept(uv_handle_t* server, uv_stream_t* client);
/* Read data from an incoming stream. The callback will be made several
* several times until there is no more data to read or uv_read_stop is
@ -248,9 +241,9 @@ int uv_accept(uv_tcp_t* server, uv_tcp_t* client);
* eof; it happens when libuv requested a buffer through the alloc callback
* but then decided that it didn't need that buffer.
*/
int uv_read_start(uv_tcp_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_start(uv_stream_t*, uv_alloc_cb alloc_cb, uv_read_cb read_cb);
int uv_read_stop(uv_tcp_t*);
int uv_read_stop(uv_stream_t*);
/* Write data to stream. Buffers are written in order. Example:
*
@ -272,6 +265,27 @@ int uv_read_stop(uv_tcp_t*);
int uv_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt);
/*
* A subclass of uv_stream_t representing a TCP stream or TCP server. In the
* future this will probably be split into two classes - one a stream and
* the other a server.
*/
struct uv_tcp_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_TCP_PRIVATE_FIELDS
};
int uv_tcp_init(uv_tcp_t* handle);
int uv_tcp_bind(uv_tcp_t* handle, struct sockaddr_in);
int uv_tcp_bind6(uv_tcp_t* handle, struct sockaddr_in6);
int uv_tcp_connect(uv_req_t* req, struct sockaddr_in);
int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb);
/*
* Subclass of uv_handle_t. libev wrapper. Every active prepare handle gets
* its callback called exactly once per loop iteration, just before the