c-ares integration on windows

This commit is contained in:
Henry Rawas 2011-06-20 09:41:57 -07:00 committed by Bert Belder
parent 31516fa150
commit 4aeee38484
15 changed files with 1016 additions and 2 deletions

View File

@ -5,3 +5,4 @@ Josh Roesslein <jroesslein@gmail.com>
Alan Gutierrez <alan@prettyrobots.com>
Vanilla Hsu <vanilla@fatpipi.com>
Ben Noordhuis <info@bnoordhuis.nl>
Henry Rawas <henryr@schakra.com>

View File

@ -27,7 +27,7 @@ include config-unix.mk
endif
TESTS=test/echo-server.c test/test-*.c
BENCHMARKS=test/echo-server.c test/benchmark-*.c
BENCHMARKS=test/echo-server.c test/dns-server.c test/benchmark-*.c
all: uv.a test/run-tests test/run-benchmarks

View File

@ -135,9 +135,11 @@
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="..\test\benchmark-ares.c" />
<ClCompile Include="..\test\benchmark-ping-pongs.c" />
<ClCompile Include="..\test\benchmark-pump.c" />
<ClCompile Include="..\test\benchmark-sizes.c" />
<ClCompile Include="..\test\dns-server.c" />
<ClCompile Include="..\test\echo-server.c" />
<ClCompile Include="..\test\run-benchmarks.c" />
<ClCompile Include="..\test\runner-unix.c">

View File

@ -149,6 +149,7 @@
<ClCompile Include="..\test\test-connection-fail.c" />
<ClCompile Include="..\test\test-get-currentexe.c" />
<ClCompile Include="..\test\test-fail-always.c" />
<ClCompile Include="..\test\test-gethostbyname.c" />
<ClCompile Include="..\test\test-hrtime.c" />
<ClCompile Include="..\test\test-loop-handles.c" />
<ClCompile Include="..\test\test-pass-always.c" />

120
test/benchmark-ares.c Normal file
View File

@ -0,0 +1,120 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "../uv.h"
#include "task.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h> /* strlen */
ares_channel channel;
struct ares_options options;
int optmask;
struct in_addr testsrv;
int ares_callbacks;
int ares_errors;
int argument;
#define NUM_CALLS_TO_START 1000
static int64_t start_time;
static int64_t end_time;
/* callback method. may issue another call */
static void aresbynamecallback( void *arg,
int status,
int timeouts,
struct hostent *hostent) {
ares_callbacks++;
if (status != 0) {
ares_errors++;
}
}
static void prep_tcploopback()
{
int rc = 0;
optmask = 0;
/* for test, use echo server - TCP port TEST_PORT on loopback */
testsrv.S_un.S_un_b.s_b1 = 127;
testsrv.S_un.S_un_b.s_b2 = 0;
testsrv.S_un.S_un_b.s_b3 = 0;
testsrv.S_un.S_un_b.s_b4 = 1;
optmask = ARES_OPT_SERVERS | ARES_OPT_TCP_PORT | ARES_OPT_FLAGS;
options.servers = &testsrv;
options.nservers = 1;
options.tcp_port = htons(TEST_PORT_2);
options.flags = ARES_FLAG_USEVC;
rc = uv_ares_init_options(&channel, &options, optmask);
ASSERT(rc == ARES_SUCCESS);
}
BENCHMARK_IMPL(gethostbyname) {
int rc = 0;
int ares_start;;
rc = ares_library_init(ARES_LIB_INIT_ALL);
if (rc != 0) {
printf("ares library init fails %d\n", rc);
return 1;
}
uv_init();
ares_callbacks = 0;
ares_errors = 0;
uv_update_time();
start_time = uv_now();
prep_tcploopback();
for (ares_start = 0; ares_start < NUM_CALLS_TO_START; ares_start++) {
ares_gethostbyname(channel,
"echos.srv",
AF_INET,
&aresbynamecallback,
&argument);
}
uv_run();
uv_ares_destroy(channel);
end_time = uv_now();
if (ares_errors > 0) {
printf("There were %d failures\n", ares_errors);
}
LOGF("ares_gethostbyname: %d calls in %ld ms \n", ares_callbacks, (end_time - start_time));
return 0;
}

View File

@ -23,8 +23,10 @@ BENCHMARK_DECLARE (sizes)
BENCHMARK_DECLARE (ping_pongs)
BENCHMARK_DECLARE (pump100_client)
BENCHMARK_DECLARE (pump1_client)
BENCHMARK_DECLARE (gethostbyname)
HELPER_DECLARE (pump_server)
HELPER_DECLARE (echo_server)
HELPER_DECLARE (dns_server)
TASK_LIST_START
BENCHMARK_ENTRY (sizes)
@ -37,4 +39,8 @@ TASK_LIST_START
BENCHMARK_ENTRY (pump1_client)
BENCHMARK_HELPER (pump1_client, pump_server)
BENCHMARK_ENTRY (gethostbyname)
BENCHMARK_HELPER (gethostbyname, dns_server)
TASK_LIST_END

View File

@ -31,5 +31,7 @@ BENCHMARK_IMPL(sizes) {
LOGF("uv_idle_t: %lu bytes\n", sizeof(uv_idle_t));
LOGF("uv_async_t: %lu bytes\n", sizeof(uv_async_t));
LOGF("uv_timer_t: %lu bytes\n", sizeof(uv_timer_t));
LOGF("uv_ares_task_t: %lu bytes\n", sizeof(uv_ares_task_t));
LOGF("uv_ares_action_t: %lu bytes\n", sizeof(uv_ares_action_t));
return 0;
}

324
test/dns-server.c Normal file
View File

@ -0,0 +1,324 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "../uv.h"
#include "task.h"
#include <stdio.h>
#include <stdlib.h>
typedef struct {
uv_req_t req;
uv_buf_t buf;
} write_req_t;
/* used to track multiple DNS requests received */
typedef struct {
char* prevbuf_ptr;
int prevbuf_pos;
int prevbuf_rem;
} dnsstate;
/* modify handle to append dnsstate */
typedef struct {
uv_tcp_t handle;
dnsstate state;
} dnshandle;
static int server_closed;
static uv_tcp_t server;
static void after_write(uv_req_t* req, int status);
static void after_read(uv_tcp_t*, ssize_t nread, uv_buf_t buf);
static void on_close(uv_handle_t* peer);
static void on_server_close(uv_handle_t* handle);
static void on_connection(uv_tcp_t*, int status);
#define WRITE_BUF_LEN (64*1024)
#define DNSREC_LEN (4)
#define LEN_OFFSET 0
#define QUERYID_OFFSET 2
unsigned char DNSRsp[] = {0, 43, 0, 0, 0x81, 0x80, 0, 1, 0, 1, 0, 0, 0, 0 };
unsigned char qrecord[] = {5, 'e', 'c', 'h', 'o', 's', 3, 's', 'r', 'v', 0, 0, 1, 0, 1};
unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 };
static void after_write(uv_req_t* req, int status) {
write_req_t* wr;
if (status) {
uv_err_t err = uv_last_error();
fprintf(stderr, "uv_write error: %s\n", uv_strerror(err));
ASSERT(0);
}
wr = (write_req_t*) req;
/* Free the read/write buffer and the request */
free(wr->buf.base);
free(wr);
}
static void after_shutdown(uv_req_t* req, int status) {
uv_close(req->handle, on_close);
free(req);
}
static void addrsp(write_req_t* wr, char* hdr) {
char * dnsrsp;
short int rsplen;
short int* reclen;
rsplen = sizeof(DNSRsp) + sizeof(qrecord) + sizeof(arecord);
ASSERT (rsplen + wr->buf.len < WRITE_BUF_LEN);
dnsrsp = wr->buf.base + wr->buf.len;
/* copy stock response */
memcpy(dnsrsp, DNSRsp, sizeof(DNSRsp));
memcpy(dnsrsp + sizeof(DNSRsp), qrecord, sizeof(qrecord));
memcpy(dnsrsp + sizeof(DNSRsp) + sizeof(qrecord), arecord, sizeof(arecord));
/* overwrite with network order length and id from request header */
reclen = (short int*)dnsrsp;
*reclen = htons(rsplen-2);
dnsrsp[QUERYID_OFFSET] = hdr[QUERYID_OFFSET];
dnsrsp[QUERYID_OFFSET+1] = hdr[QUERYID_OFFSET+1];
wr->buf.len += rsplen;
}
static void process_req(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
write_req_t *wr;
dnshandle* dns = (dnshandle*)handle;
char hdrbuf[DNSREC_LEN];
int hdrbuf_remaining = DNSREC_LEN;
int rec_remaining = 0;
int readbuf_remaining;
char* dnsreq;
char* hdrstart;
int usingprev = 0;
wr = (write_req_t*) malloc(sizeof *wr);
uv_req_init(&wr->req, (uv_handle_t*)handle, after_write);
wr->buf.base = (char*)malloc(WRITE_BUF_LEN);
wr->buf.len = 0;
if (dns->state.prevbuf_ptr != NULL) {
dnsreq = dns->state.prevbuf_ptr + dns->state.prevbuf_pos;
readbuf_remaining = dns->state.prevbuf_rem;
usingprev = 1;
} else {
dnsreq = buf.base;
readbuf_remaining = nread;
}
hdrstart = dnsreq;
while (dnsreq != NULL) {
/* something to process */
while (readbuf_remaining > 0) {
/* something to process in current buffer */
if (hdrbuf_remaining > 0) {
/* process len and id */
if (readbuf_remaining < hdrbuf_remaining) {
/* too little to get request header. save for next buffer */
memcpy(&hdrbuf[DNSREC_LEN - hdrbuf_remaining], dnsreq, readbuf_remaining);
hdrbuf_remaining = DNSREC_LEN - readbuf_remaining;
break;
} else {
short int reclen_n;
/* save header */
memcpy(&hdrbuf[DNSREC_LEN - hdrbuf_remaining], dnsreq, hdrbuf_remaining);
dnsreq += hdrbuf_remaining;
readbuf_remaining -= hdrbuf_remaining;
hdrbuf_remaining = 0;
/* get record length */
reclen_n = *((short int*)hdrbuf);
rec_remaining = ntohs(reclen_n) - (DNSREC_LEN - 2);
}
}
if (rec_remaining <= readbuf_remaining) {
/* prepare reply */
addrsp(wr, hdrbuf);
/* move to next record */
dnsreq += rec_remaining;
hdrstart = dnsreq;
readbuf_remaining -= rec_remaining;
rec_remaining = 0;
hdrbuf_remaining = DNSREC_LEN;
} else {
/* otherwise this buffer is done. */
rec_remaining -= readbuf_remaining;
break;
}
}
/* if we had to use bytes from prev buffer, start processing the current one */
if (usingprev == 1) {
/* free previous buffer */
free(dns->state.prevbuf_ptr);
dnsreq = buf.base;
readbuf_remaining = nread;
usingprev = 0;
} else {
dnsreq = NULL;
}
}
/* send write buffer */
if (wr->buf.len > 0) {
if (uv_write(&wr->req, &wr->buf, 1)) {
FATAL("uv_write failed");
}
}
if (readbuf_remaining > 0) {
/* save start of record position, so we can continue on next read */
dns->state.prevbuf_ptr = buf.base;
dns->state.prevbuf_pos = hdrstart - buf.base;
dns->state.prevbuf_rem = nread - dns->state.prevbuf_pos;
} else {
/* nothing left in this buffer */
dns->state.prevbuf_ptr = NULL;
dns->state.prevbuf_pos = 0;
dns->state.prevbuf_rem = 0;
free(buf.base);
}
}
static void after_read(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) {
uv_req_t* req;
if (nread < 0) {
/* Error or EOF */
ASSERT (uv_last_error().code == UV_EOF);
if (buf.base) {
free(buf.base);
}
req = (uv_req_t*) malloc(sizeof *req);
uv_req_init(req, (uv_handle_t*)handle, after_shutdown);
uv_shutdown(req);
return;
}
if (nread == 0) {
/* Everything OK, but nothing read. */
free(buf.base);
return;
}
/* process requests and send responses */
process_req(handle, nread, buf);
}
static void on_close(uv_handle_t* peer) {
free(peer);
}
static uv_buf_t buf_alloc(uv_tcp_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*) malloc(suggested_size);
buf.len = suggested_size;
return buf;
}
static void on_connection(uv_tcp_t* server, int status) {
dnshandle* handle;
int r;
ASSERT(status == 0);
handle = (dnshandle*) malloc(sizeof *handle);
ASSERT(handle != NULL);
/* initialize read buffer state */
handle->state.prevbuf_ptr = 0;
handle->state.prevbuf_pos = 0;
handle->state.prevbuf_rem = 0;
uv_tcp_init((uv_tcp_t*)handle);
r = uv_accept(server, (uv_tcp_t*)handle);
ASSERT(r == 0);
r = uv_read_start((uv_tcp_t*)handle, buf_alloc, after_read);
ASSERT(r == 0);
}
static void on_server_close(uv_handle_t* handle) {
ASSERT(handle == (uv_handle_t*)&server);
}
static int dns_start(int port) {
struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port);
int r;
r = uv_tcp_init(&server);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Socket creation error\n");
return 1;
}
r = uv_bind(&server, addr);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Bind error\n");
return 1;
}
r = uv_listen(&server, 128, on_connection);
if (r) {
/* TODO: Error codes */
fprintf(stderr, "Listen error\n");
return 1;
}
return 0;
}
HELPER_IMPL(dns_server) {
uv_init();
if (dns_start(TEST_PORT_2))
return 1;
uv_run();
return 0;
}

203
test/test-gethostbyname.c Normal file
View File

@ -0,0 +1,203 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "../uv.h"
#include "task.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h> /* strlen */
ares_channel channel;
struct ares_options options;
int optmask;
struct in_addr testsrv;
int ares_bynamecallbacks;
int bynamecallbacksig;
int ares_byaddrcallbacks;
int byaddrcallbacksig;
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;
return buf;
}
static void aresbynamecallback( void *arg,
int status,
int timeouts,
struct hostent *hostent) {
int * iargs;
ASSERT(arg != NULL);
iargs = (int*)arg;
ASSERT(*iargs == bynamecallbacksig);
ASSERT(timeouts == 0);
ares_bynamecallbacks++;
}
static void aresbyaddrcallback( void *arg,
int status,
int timeouts,
struct hostent *hostent) {
int * iargs;
ASSERT(arg != NULL);
iargs = (int*)arg;
ASSERT(*iargs == byaddrcallbacksig);
ASSERT(timeouts == 0);
ares_byaddrcallbacks++;
}
static void prep_tcploopback()
{
int rc = 0;
/* for test, use echo server - TCP port TEST_PORT on loopback */
testsrv.S_un.S_un_b.s_b1 = 127;
testsrv.S_un.S_un_b.s_b2 = 0;
testsrv.S_un.S_un_b.s_b3 = 0;
testsrv.S_un.S_un_b.s_b4 = 1;
optmask = ARES_OPT_SERVERS | ARES_OPT_TCP_PORT | ARES_OPT_FLAGS;
options.servers = &testsrv;
options.nservers = 1;
options.tcp_port = htons(TEST_PORT);
options.flags = ARES_FLAG_USEVC;
rc = uv_ares_init_options(&channel, &options, optmask);
ASSERT(rc == ARES_SUCCESS);
}
TEST_IMPL(gethostbyname) {
int rc = 0;
char addr[4];
rc = ares_library_init(ARES_LIB_INIT_ALL);
if (rc != 0) {
printf("ares library init fails %d\n", rc);
return 1;
}
uv_init(alloc_cb);
printf("Start basic gethostbyname test\n");
prep_tcploopback();
ares_bynamecallbacks = 0;
bynamecallbacksig = 7;
ares_gethostbyname(channel,
"microsoft.com",
AF_INET,
&aresbynamecallback,
&bynamecallbacksig);
uv_run();
ASSERT(ares_bynamecallbacks == 1);
uv_ares_destroy(channel);
printf("Done basic gethostbyname test\n");
/* two sequential call on new channel */
printf("Start gethostbyname and gethostbyaddr sequential test\n");
prep_tcploopback();
ares_bynamecallbacks = 0;
bynamecallbacksig = 7;
ares_gethostbyname(channel,
"microsoft.com",
AF_INET,
&aresbynamecallback,
&bynamecallbacksig);
uv_run();
ASSERT(ares_bynamecallbacks == 1);
ares_byaddrcallbacks = 0;
byaddrcallbacksig = 8;
addr[0] = 10;
addr[1] = 0;
addr[2] = 1;
addr[3] = 99;
ares_gethostbyaddr(channel,
addr,
4,
AF_INET,
&aresbyaddrcallback,
&byaddrcallbacksig);
uv_run();
ASSERT(ares_byaddrcallbacks == 1);
uv_ares_destroy(channel);
printf("Done gethostbyname and gethostbyaddr sequential test\n");
/* two simultaneous calls on new channel */
printf("Start gethostbyname and gethostbyaddr concurrent test\n");
prep_tcploopback();
ares_bynamecallbacks = 0;
bynamecallbacksig = 7;
ares_gethostbyname(channel,
"microsoft.com",
AF_INET,
&aresbynamecallback,
&bynamecallbacksig);
ares_byaddrcallbacks = 0;
byaddrcallbacksig = 8;
addr[0] = 10;
addr[1] = 0;
addr[2] = 1;
addr[3] = 99;
ares_gethostbyaddr(channel,
addr,
4,
AF_INET,
&aresbyaddrcallback,
&byaddrcallbacksig);
uv_run();
ASSERT(ares_bynamecallbacks == 1);
ASSERT(ares_byaddrcallbacks == 1);
uv_ares_destroy(channel);
printf("Done gethostbyname and gethostbyaddr concurrent test\n");
return 0;
}

View File

@ -43,6 +43,7 @@ TEST_DECLARE (check_ref)
TEST_DECLARE (async)
TEST_DECLARE (get_currentexe)
TEST_DECLARE (hrtime)
TEST_DECLARE (gethostbyname)
TEST_DECLARE (fail_always)
TEST_DECLARE (pass_always)
HELPER_DECLARE (echo_server)
@ -95,6 +96,9 @@ TASK_LIST_START
TEST_ENTRY (hrtime)
TEST_ENTRY (gethostbyname)
TEST_HELPER (gethostbyname, echo_server)
#if 0
/* These are for testing the test runner. */
TEST_ENTRY (fail_always)

View File

@ -1232,4 +1232,16 @@ int64_t uv_timer_get_repeat(uv_timer_t* timer) {
return (int64_t)(1000 * timer->timer_watcher.repeat);
}
/* c-ares integration initialize and terminate */
int uv_ares_init_options(ares_channel *channelptr,
struct ares_options *options,
int optmask) {
rc = ares_init_options(channelptr, options, optmask);
return rc;
}
void uv_ares_destroy(ares_channel channel) {
ares_destroy(channel);
}

View File

@ -97,4 +97,8 @@ typedef struct {
uv_timer_cb timer_cb;
#define UV_ARES_ACTION_PRIVATE_FIELDS /* TODO */
#define UV_ARES_TASK_PRIVATE_FIELDS /* TODO */
#endif /* UV_UNIX_H */

290
uv-win.c
View File

@ -189,6 +189,27 @@ static struct sockaddr_in uv_addr_ip4_any_;
static char uv_zero_[] = "";
void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req);
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req);
/* list used for ares task handles */
static uv_ares_task_t* uv_ares_handles_ = NULL;
/* memory used per ares_channel */
struct uv_ares_channel_s {
ares_channel channel;
};
typedef struct uv_ares_channel_s uv_ares_channel_t;
/* static data to hold single ares_channel */
static uv_ares_channel_t uv_ares_data = { NULL };
/* default timeout per socket request if ares does not specify value */
/* use 20 sec */
#define ARES_TIMEOUT_MS 20000
/* Atomic set operation on char */
#ifdef _MSC_VER /* MSVC */
@ -1517,6 +1538,14 @@ static void uv_process_reqs() {
uv_async_return_req((uv_async_t*)handle, req);
break;
case UV_ARES:
uv_ares_process((uv_ares_action_t*)handle, req);
break;
case UV_ARES_TASK:
uv_ares_task_cleanup((uv_ares_task_t*)handle, req);
break;
default:
assert(0);
}
@ -1710,3 +1739,264 @@ done:
uint64_t uv_get_hrtime(void) {
assert(0 && "implement me");
}
/* find matching ares handle in list */
void uv_add_ares_handle(uv_ares_task_t* handle) {
handle->ares_next = uv_ares_handles_;
handle->ares_prev = NULL;
if (uv_ares_handles_) {
uv_ares_handles_->ares_prev = handle;
}
uv_ares_handles_ = handle;
}
/* find matching ares handle in list */
/* TODO: faster lookup */
uv_ares_task_t* uv_find_ares_handle(ares_socket_t sock) {
uv_ares_task_t* handle = uv_ares_handles_;
while (handle != NULL) {
if (handle->sock == sock) {
break;
}
handle = handle->ares_next;
}
return handle;
}
/* remove ares handle in list */
void uv_remove_ares_handle(uv_ares_task_t* handle) {
if (handle == uv_ares_handles_) {
uv_ares_handles_ = handle->ares_next;
}
if (handle->ares_next) {
handle->ares_next->ares_prev = handle->ares_prev;
}
if (handle->ares_prev) {
handle->ares_prev->ares_next = handle->ares_next;
}
}
/* thread pool callback when socket is signalled */
VOID CALLBACK uv_ares_socksignal_tp(PVOID parameter,
BOOLEAN timerfired) {
WSANETWORKEVENTS network_events;
uv_ares_task_t* sockhandle;
uv_ares_action_t* selhandle;
uv_req_t* uv_ares_req;
assert(parameter != NULL);
if (parameter != NULL) {
sockhandle = (uv_ares_task_t*)parameter;
/* clear socket status for this event */
/* do not fail if error, thread may run after socket close */
/* The code assumes that c-ares will write all pending data in the callback,
unless the socket would block. We can clear the state here to avoid unecessary
signals. */
WSAEnumNetworkEvents(sockhandle->sock, sockhandle->h_event, &network_events);
/* setup new handle */
selhandle = (uv_ares_action_t*)malloc(sizeof(uv_ares_action_t));
if (selhandle == NULL) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
selhandle->type = UV_ARES;
selhandle->close_cb = NULL;
selhandle->data = sockhandle->data;
selhandle->sock = sockhandle->sock;
selhandle->read = (network_events.lNetworkEvents & (FD_READ | FD_CONNECT)) ? 1 : 0;
selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0;
uv_ares_req = &selhandle->ares_req;
uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL);
uv_ares_req->type = UV_WAKEUP;
/* post ares needs to called */
if (!PostQueuedCompletionStatus(uv_iocp_,
0,
0,
&uv_ares_req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
}
/* callback from ares when socket operation is started */
void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) {
/* look to see if we have a handle for this socket in our list */
uv_ares_task_t* uv_handle_ares = uv_find_ares_handle(sock);
struct timeval tv;
struct timeval* tvptr;
int timeoutms = 0;
if (read == 0 && write == 0) {
/* if read and write are 0, cleanup existing data */
/* The code assumes that c-ares does a callback with read = 0 and write = 0
when the socket is closed. After we recieve this we stop monitoring the socket. */
if (uv_handle_ares != NULL) {
uv_req_t* uv_ares_req;
uv_handle_ares->h_close_event = CreateEvent(NULL, FALSE, FALSE, NULL);
/* remove Wait */
if (uv_handle_ares->h_wait) {
UnregisterWaitEx(uv_handle_ares->h_wait, uv_handle_ares->h_close_event);
uv_handle_ares->h_wait = NULL;
}
/* detach socket from the event */
WSAEventSelect(sock, NULL, 0);
if (uv_handle_ares->h_event != WSA_INVALID_EVENT) {
WSACloseEvent(uv_handle_ares->h_event);
uv_handle_ares->h_event = WSA_INVALID_EVENT;
}
/* remove handle from list */
uv_remove_ares_handle(uv_handle_ares);
/* Post request to cleanup the Task */
uv_ares_req = &uv_handle_ares->ares_req;
uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL);
uv_ares_req->type = UV_WAKEUP;
/* post ares done with socket - finish cleanup when all threads done. */
if (!PostQueuedCompletionStatus(uv_iocp_,
0,
0,
&uv_ares_req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
} else {
assert(0);
uv_fatal_error(ERROR_INVALID_DATA, "ares_SockStateCB");
}
} else {
if (uv_handle_ares == NULL) {
/* setup new handle */
/* The code assumes that c-ares will call us when it has an open socket.
We need to call into c-ares when there is something to read,
or when it becomes writable. */
uv_handle_ares = (uv_ares_task_t*)malloc(sizeof(uv_ares_task_t));
if (uv_handle_ares == NULL) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
}
uv_handle_ares->type = UV_ARES_TASK;
uv_handle_ares->close_cb = NULL;
uv_handle_ares->data = ((uv_ares_channel_t*)data)->channel;
uv_handle_ares->sock = sock;
uv_handle_ares->h_wait = NULL;
uv_handle_ares->flags = 0;
/* create an event to wait on socket signal */
uv_handle_ares->h_event = WSACreateEvent();
if (uv_handle_ares->h_event == WSA_INVALID_EVENT) {
uv_fatal_error(WSAGetLastError(), "WSACreateEvent");
}
/* tie event to socket */
if (SOCKET_ERROR == WSAEventSelect(sock, uv_handle_ares->h_event, FD_READ | FD_WRITE | FD_CONNECT)) {
uv_fatal_error(WSAGetLastError(), "WSAEventSelect");
}
/* add handle to list */
uv_add_ares_handle(uv_handle_ares);
uv_refs_++;
tv.tv_sec = 0;
tvptr = ares_timeout(((uv_ares_channel_t*)data)->channel, NULL, &tv);
if (tvptr) {
timeoutms = (tvptr->tv_sec * 1000) + (tvptr->tv_usec / 1000);
} else {
timeoutms = ARES_TIMEOUT_MS;
}
/* specify thread pool function to call when event is signaled */
if (RegisterWaitForSingleObject(&uv_handle_ares->h_wait,
uv_handle_ares->h_event,
uv_ares_socksignal_tp,
(void*)uv_handle_ares,
timeoutms,
WT_EXECUTEINWAITTHREAD) == 0) {
uv_fatal_error(GetLastError(), "RegisterWaitForSingleObject");
}
} else {
/* found existing handle. */
assert(uv_handle_ares->type == UV_ARES_TASK);
assert(uv_handle_ares->data != NULL);
assert(uv_handle_ares->h_event != WSA_INVALID_EVENT);
}
}
}
/* called via uv_poll when ares completion port signaled */
void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) {
ares_process_fd( (ares_channel)handle->data,
handle->read ? handle->sock : INVALID_SOCKET,
handle->write ? handle->sock : INVALID_SOCKET);
/* release handle for select here */
free(handle);
}
/* called via uv_poll when ares is finished with socket */
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req) {
/* check for event complete without waiting */
unsigned int signaled = WaitForSingleObject(handle->h_close_event, 0);
if (signaled != WAIT_TIMEOUT) {
uv_refs_--;
/* close event handle and free uv handle memory */
CloseHandle(handle->h_close_event);
free(handle);
} else {
/* stil busy - repost and try again */
if (!PostQueuedCompletionStatus(uv_iocp_,
0,
0,
&req->overlapped)) {
uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus");
}
}
}
/* set ares SOCK_STATE callback to our handler */
int uv_ares_init_options(ares_channel *channelptr,
struct ares_options *options,
int optmask) {
int rc;
/* only allow single init at a time */
if (uv_ares_data.channel != NULL) {
return UV_EALREADY;
}
/* set our callback as an option */
options->sock_state_cb = uv_ares_sockstate_cb;
options->sock_state_cb_data = &uv_ares_data;
optmask |= ARES_OPT_SOCK_STATE_CB;
/* We do the call to ares_init_option for caller. */
rc = ares_init_options(channelptr, options, optmask);
/* if success, save channel */
if (rc == ARES_SUCCESS) {
uv_ares_data.channel = *channelptr;
}
return rc;
}
/* release memory */
void uv_ares_destroy(ares_channel channel) {
/* only allow destroy if did init */
if (uv_ares_data.channel != NULL) {
ares_destroy(channel);
uv_ares_data.channel = NULL;
}
}

View File

@ -108,5 +108,19 @@ typedef struct uv_buf_t {
unsigned int flags; \
uv_err_t error;
#define UV_ARES_ACTION_PRIVATE_FIELDS \
struct uv_req_s ares_req; \
SOCKET sock; \
int read; \
int write;
#define UV_ARES_TASK_PRIVATE_FIELDS \
uv_ares_task_t* ares_prev; \
uv_ares_task_t* ares_next; \
struct uv_req_s ares_req; \
SOCKET sock; \
HANDLE h_wait; \
WSAEVENT h_event; \
HANDLE h_close_event;
int uv_utf16_to_utf8(wchar_t* utf16Buffer, size_t utf16Size, char* utf8Buffer, size_t utf8Size);

33
uv.h
View File

@ -48,6 +48,8 @@ typedef struct uv_check_s uv_check_t;
typedef struct uv_idle_s uv_idle_t;
typedef struct uv_req_s uv_req_t;
typedef struct uv_async_s uv_async_t;
typedef struct uv_ares_task_s uv_ares_task_t;
typedef struct uv_ares_action_s uv_ares_action_t;
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
@ -129,7 +131,9 @@ typedef enum {
UV_PREPARE,
UV_CHECK,
UV_IDLE,
UV_ASYNC
UV_ASYNC,
UV_ARES,
UV_ARES_TASK
} uv_handle_type;
typedef enum {
@ -364,6 +368,31 @@ void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat);
int64_t uv_timer_get_repeat(uv_timer_t* timer);
/*
* Subclass of uv_handle_t. Used for integration of c-ares.
*/
struct uv_ares_task_s {
UV_HANDLE_FIELDS
UV_ARES_TASK_PRIVATE_FIELDS
};
/*
* Subclass of uv_handle_t. Used for integration of c-ares.
*/
struct uv_ares_action_s {
UV_HANDLE_FIELDS
UV_ARES_ACTION_PRIVATE_FIELDS
};
/* c-ares integration initialize and terminate */
int uv_ares_init_options(ares_channel *channelptr,
struct ares_options *options,
int optmask);
void uv_ares_destroy(ares_channel channel);
/*
* Most functions return boolean: 0 for success and -1 for failure.
* On error the user should then call uv_last_error() to determine
@ -401,6 +430,8 @@ union uv_any_handle {
uv_idle_t idle;
uv_async_t async;
uv_timer_t timer;
uv_ares_task_t arest;
uv_ares_action_t aresa;
};
/* Diagnostic counters */