diff --git a/AUTHORS b/AUTHORS index 2a4976be..e3b6ae15 100644 --- a/AUTHORS +++ b/AUTHORS @@ -5,3 +5,4 @@ Josh Roesslein Alan Gutierrez Vanilla Hsu Ben Noordhuis +Henry Rawas diff --git a/Makefile b/Makefile index fa97bfa0..60505cdb 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ include config-unix.mk endif TESTS=test/echo-server.c test/test-*.c -BENCHMARKS=test/echo-server.c test/benchmark-*.c +BENCHMARKS=test/echo-server.c test/dns-server.c test/benchmark-*.c all: uv.a test/run-tests test/run-benchmarks diff --git a/msvs/libuv-benchmark.vcxproj b/msvs/libuv-benchmark.vcxproj index 4a24462b..53d5b526 100644 --- a/msvs/libuv-benchmark.vcxproj +++ b/msvs/libuv-benchmark.vcxproj @@ -135,9 +135,11 @@ + + diff --git a/msvs/libuv-test.vcxproj b/msvs/libuv-test.vcxproj index 5c1cf406..a32cbe46 100644 --- a/msvs/libuv-test.vcxproj +++ b/msvs/libuv-test.vcxproj @@ -149,6 +149,7 @@ + diff --git a/test/benchmark-ares.c b/test/benchmark-ares.c new file mode 100644 index 00000000..ff384805 --- /dev/null +++ b/test/benchmark-ares.c @@ -0,0 +1,120 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "../uv.h" +#include "task.h" + +#include +#include +#include /* strlen */ + +ares_channel channel; +struct ares_options options; +int optmask; + +struct in_addr testsrv; + +int ares_callbacks; +int ares_errors; +int argument; + +#define NUM_CALLS_TO_START 1000 + +static int64_t start_time; +static int64_t end_time; + +/* callback method. may issue another call */ +static void aresbynamecallback( void *arg, + int status, + int timeouts, + struct hostent *hostent) { + ares_callbacks++; + if (status != 0) { + ares_errors++; + } + +} + + +static void prep_tcploopback() +{ + int rc = 0; + optmask = 0; + + /* for test, use echo server - TCP port TEST_PORT on loopback */ + testsrv.S_un.S_un_b.s_b1 = 127; + testsrv.S_un.S_un_b.s_b2 = 0; + testsrv.S_un.S_un_b.s_b3 = 0; + testsrv.S_un.S_un_b.s_b4 = 1; + + optmask = ARES_OPT_SERVERS | ARES_OPT_TCP_PORT | ARES_OPT_FLAGS; + options.servers = &testsrv; + options.nservers = 1; + options.tcp_port = htons(TEST_PORT_2); + options.flags = ARES_FLAG_USEVC; + + rc = uv_ares_init_options(&channel, &options, optmask); + + ASSERT(rc == ARES_SUCCESS); +} + + +BENCHMARK_IMPL(gethostbyname) { + + int rc = 0; + int ares_start;; + + rc = ares_library_init(ARES_LIB_INIT_ALL); + if (rc != 0) { + printf("ares library init fails %d\n", rc); + return 1; + } + + uv_init(); + ares_callbacks = 0; + ares_errors = 0; + + uv_update_time(); + start_time = uv_now(); + + prep_tcploopback(); + + for (ares_start = 0; ares_start < NUM_CALLS_TO_START; ares_start++) { + ares_gethostbyname(channel, + "echos.srv", + AF_INET, + &aresbynamecallback, + &argument); + } + + uv_run(); + + uv_ares_destroy(channel); + + end_time = uv_now(); + + if (ares_errors > 0) { + printf("There were %d failures\n", ares_errors); + } + LOGF("ares_gethostbyname: %d calls in %ld ms \n", ares_callbacks, (end_time - start_time)); + + return 0; +} diff --git a/test/benchmark-list.h b/test/benchmark-list.h index 33ef97e2..ff3421d8 100644 --- a/test/benchmark-list.h +++ b/test/benchmark-list.h @@ -23,8 +23,10 @@ BENCHMARK_DECLARE (sizes) BENCHMARK_DECLARE (ping_pongs) BENCHMARK_DECLARE (pump100_client) BENCHMARK_DECLARE (pump1_client) +BENCHMARK_DECLARE (gethostbyname) HELPER_DECLARE (pump_server) HELPER_DECLARE (echo_server) +HELPER_DECLARE (dns_server) TASK_LIST_START BENCHMARK_ENTRY (sizes) @@ -37,4 +39,8 @@ TASK_LIST_START BENCHMARK_ENTRY (pump1_client) BENCHMARK_HELPER (pump1_client, pump_server) + + BENCHMARK_ENTRY (gethostbyname) + BENCHMARK_HELPER (gethostbyname, dns_server) + TASK_LIST_END diff --git a/test/benchmark-sizes.c b/test/benchmark-sizes.c index a5f573fe..c6044f45 100644 --- a/test/benchmark-sizes.c +++ b/test/benchmark-sizes.c @@ -31,5 +31,7 @@ BENCHMARK_IMPL(sizes) { LOGF("uv_idle_t: %lu bytes\n", sizeof(uv_idle_t)); LOGF("uv_async_t: %lu bytes\n", sizeof(uv_async_t)); LOGF("uv_timer_t: %lu bytes\n", sizeof(uv_timer_t)); + LOGF("uv_ares_task_t: %lu bytes\n", sizeof(uv_ares_task_t)); + LOGF("uv_ares_action_t: %lu bytes\n", sizeof(uv_ares_action_t)); return 0; } diff --git a/test/dns-server.c b/test/dns-server.c new file mode 100644 index 00000000..c0e486b9 --- /dev/null +++ b/test/dns-server.c @@ -0,0 +1,324 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "../uv.h" +#include "task.h" +#include +#include + + +typedef struct { + uv_req_t req; + uv_buf_t buf; +} write_req_t; + + +/* used to track multiple DNS requests received */ +typedef struct { + char* prevbuf_ptr; + int prevbuf_pos; + int prevbuf_rem; +} dnsstate; + + +/* modify handle to append dnsstate */ +typedef struct { + uv_tcp_t handle; + dnsstate state; +} dnshandle; + + +static int server_closed; +static uv_tcp_t server; + + +static void after_write(uv_req_t* req, int status); +static void after_read(uv_tcp_t*, ssize_t nread, uv_buf_t buf); +static void on_close(uv_handle_t* peer); +static void on_server_close(uv_handle_t* handle); +static void on_connection(uv_tcp_t*, int status); + +#define WRITE_BUF_LEN (64*1024) +#define DNSREC_LEN (4) + +#define LEN_OFFSET 0 +#define QUERYID_OFFSET 2 +unsigned char DNSRsp[] = {0, 43, 0, 0, 0x81, 0x80, 0, 1, 0, 1, 0, 0, 0, 0 }; +unsigned char qrecord[] = {5, 'e', 'c', 'h', 'o', 's', 3, 's', 'r', 'v', 0, 0, 1, 0, 1}; +unsigned char arecord[] = {0xc0, 0x0c, 0, 1, 0, 1, 0, 0, 5, 0xbd, 0, 4, 10, 0, 1, 1 }; + + +static void after_write(uv_req_t* req, int status) { + write_req_t* wr; + + if (status) { + uv_err_t err = uv_last_error(); + fprintf(stderr, "uv_write error: %s\n", uv_strerror(err)); + ASSERT(0); + } + + wr = (write_req_t*) req; + + /* Free the read/write buffer and the request */ + free(wr->buf.base); + free(wr); +} + + +static void after_shutdown(uv_req_t* req, int status) { + uv_close(req->handle, on_close); + free(req); +} + + +static void addrsp(write_req_t* wr, char* hdr) { + char * dnsrsp; + short int rsplen; + short int* reclen; + + rsplen = sizeof(DNSRsp) + sizeof(qrecord) + sizeof(arecord); + + ASSERT (rsplen + wr->buf.len < WRITE_BUF_LEN); + + dnsrsp = wr->buf.base + wr->buf.len; + + /* copy stock response */ + memcpy(dnsrsp, DNSRsp, sizeof(DNSRsp)); + memcpy(dnsrsp + sizeof(DNSRsp), qrecord, sizeof(qrecord)); + memcpy(dnsrsp + sizeof(DNSRsp) + sizeof(qrecord), arecord, sizeof(arecord)); + + /* overwrite with network order length and id from request header */ + reclen = (short int*)dnsrsp; + *reclen = htons(rsplen-2); + dnsrsp[QUERYID_OFFSET] = hdr[QUERYID_OFFSET]; + dnsrsp[QUERYID_OFFSET+1] = hdr[QUERYID_OFFSET+1]; + + wr->buf.len += rsplen; +} + +static void process_req(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) { + write_req_t *wr; + dnshandle* dns = (dnshandle*)handle; + char hdrbuf[DNSREC_LEN]; + int hdrbuf_remaining = DNSREC_LEN; + int rec_remaining = 0; + int readbuf_remaining; + char* dnsreq; + char* hdrstart; + int usingprev = 0; + + wr = (write_req_t*) malloc(sizeof *wr); + uv_req_init(&wr->req, (uv_handle_t*)handle, after_write); + wr->buf.base = (char*)malloc(WRITE_BUF_LEN); + wr->buf.len = 0; + + if (dns->state.prevbuf_ptr != NULL) { + dnsreq = dns->state.prevbuf_ptr + dns->state.prevbuf_pos; + readbuf_remaining = dns->state.prevbuf_rem; + usingprev = 1; + } else { + dnsreq = buf.base; + readbuf_remaining = nread; + } + hdrstart = dnsreq; + + while (dnsreq != NULL) { + /* something to process */ + while (readbuf_remaining > 0) { + /* something to process in current buffer */ + if (hdrbuf_remaining > 0) { + /* process len and id */ + if (readbuf_remaining < hdrbuf_remaining) { + /* too little to get request header. save for next buffer */ + memcpy(&hdrbuf[DNSREC_LEN - hdrbuf_remaining], dnsreq, readbuf_remaining); + hdrbuf_remaining = DNSREC_LEN - readbuf_remaining; + break; + } else { + short int reclen_n; + /* save header */ + memcpy(&hdrbuf[DNSREC_LEN - hdrbuf_remaining], dnsreq, hdrbuf_remaining); + dnsreq += hdrbuf_remaining; + readbuf_remaining -= hdrbuf_remaining; + hdrbuf_remaining = 0; + + /* get record length */ + reclen_n = *((short int*)hdrbuf); + rec_remaining = ntohs(reclen_n) - (DNSREC_LEN - 2); + } + } + + if (rec_remaining <= readbuf_remaining) { + /* prepare reply */ + addrsp(wr, hdrbuf); + + /* move to next record */ + dnsreq += rec_remaining; + hdrstart = dnsreq; + readbuf_remaining -= rec_remaining; + rec_remaining = 0; + hdrbuf_remaining = DNSREC_LEN; + } else { + /* otherwise this buffer is done. */ + rec_remaining -= readbuf_remaining; + break; + } + } + + /* if we had to use bytes from prev buffer, start processing the current one */ + if (usingprev == 1) { + /* free previous buffer */ + free(dns->state.prevbuf_ptr); + dnsreq = buf.base; + readbuf_remaining = nread; + usingprev = 0; + } else { + dnsreq = NULL; + } + } + + /* send write buffer */ + if (wr->buf.len > 0) { + if (uv_write(&wr->req, &wr->buf, 1)) { + FATAL("uv_write failed"); + } + } + + if (readbuf_remaining > 0) { + /* save start of record position, so we can continue on next read */ + dns->state.prevbuf_ptr = buf.base; + dns->state.prevbuf_pos = hdrstart - buf.base; + dns->state.prevbuf_rem = nread - dns->state.prevbuf_pos; + } else { + /* nothing left in this buffer */ + dns->state.prevbuf_ptr = NULL; + dns->state.prevbuf_pos = 0; + dns->state.prevbuf_rem = 0; + free(buf.base); + } +} + +static void after_read(uv_tcp_t* handle, ssize_t nread, uv_buf_t buf) { + uv_req_t* req; + + if (nread < 0) { + /* Error or EOF */ + ASSERT (uv_last_error().code == UV_EOF); + + if (buf.base) { + free(buf.base); + } + + req = (uv_req_t*) malloc(sizeof *req); + uv_req_init(req, (uv_handle_t*)handle, after_shutdown); + uv_shutdown(req); + + return; + } + + if (nread == 0) { + /* Everything OK, but nothing read. */ + free(buf.base); + return; + } + /* process requests and send responses */ + process_req(handle, nread, buf); +} + + +static void on_close(uv_handle_t* peer) { + free(peer); +} + + +static uv_buf_t buf_alloc(uv_tcp_t* handle, size_t suggested_size) { + uv_buf_t buf; + buf.base = (char*) malloc(suggested_size); + buf.len = suggested_size; + return buf; +} + + +static void on_connection(uv_tcp_t* server, int status) { + dnshandle* handle; + int r; + + ASSERT(status == 0); + + handle = (dnshandle*) malloc(sizeof *handle); + ASSERT(handle != NULL); + + /* initialize read buffer state */ + handle->state.prevbuf_ptr = 0; + handle->state.prevbuf_pos = 0; + handle->state.prevbuf_rem = 0; + + uv_tcp_init((uv_tcp_t*)handle); + + r = uv_accept(server, (uv_tcp_t*)handle); + ASSERT(r == 0); + + r = uv_read_start((uv_tcp_t*)handle, buf_alloc, after_read); + ASSERT(r == 0); +} + + +static void on_server_close(uv_handle_t* handle) { + ASSERT(handle == (uv_handle_t*)&server); +} + + +static int dns_start(int port) { + struct sockaddr_in addr = uv_ip4_addr("0.0.0.0", port); + int r; + + r = uv_tcp_init(&server); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Socket creation error\n"); + return 1; + } + + r = uv_bind(&server, addr); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Bind error\n"); + return 1; + } + + r = uv_listen(&server, 128, on_connection); + if (r) { + /* TODO: Error codes */ + fprintf(stderr, "Listen error\n"); + return 1; + } + + return 0; +} + + +HELPER_IMPL(dns_server) { + uv_init(); + if (dns_start(TEST_PORT_2)) + return 1; + + uv_run(); + return 0; +} diff --git a/test/test-gethostbyname.c b/test/test-gethostbyname.c new file mode 100644 index 00000000..1b963dd5 --- /dev/null +++ b/test/test-gethostbyname.c @@ -0,0 +1,203 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "../uv.h" +#include "task.h" + +#include +#include +#include /* strlen */ + +ares_channel channel; +struct ares_options options; +int optmask; + +struct in_addr testsrv; + +int ares_bynamecallbacks; +int bynamecallbacksig; +int ares_byaddrcallbacks; +int byaddrcallbacksig; + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { + uv_buf_t buf; + buf.base = (char*)malloc(size); + buf.len = size; + return buf; +} + +static void aresbynamecallback( void *arg, + int status, + int timeouts, + struct hostent *hostent) { + int * iargs; + ASSERT(arg != NULL); + iargs = (int*)arg; + ASSERT(*iargs == bynamecallbacksig); + ASSERT(timeouts == 0); + + ares_bynamecallbacks++; +} + +static void aresbyaddrcallback( void *arg, + int status, + int timeouts, + struct hostent *hostent) { + int * iargs; + ASSERT(arg != NULL); + iargs = (int*)arg; + ASSERT(*iargs == byaddrcallbacksig); + ASSERT(timeouts == 0); + + ares_byaddrcallbacks++; +} + +static void prep_tcploopback() +{ + int rc = 0; + /* for test, use echo server - TCP port TEST_PORT on loopback */ + testsrv.S_un.S_un_b.s_b1 = 127; + testsrv.S_un.S_un_b.s_b2 = 0; + testsrv.S_un.S_un_b.s_b3 = 0; + testsrv.S_un.S_un_b.s_b4 = 1; + + optmask = ARES_OPT_SERVERS | ARES_OPT_TCP_PORT | ARES_OPT_FLAGS; + options.servers = &testsrv; + options.nservers = 1; + options.tcp_port = htons(TEST_PORT); + options.flags = ARES_FLAG_USEVC; + + rc = uv_ares_init_options(&channel, &options, optmask); + + ASSERT(rc == ARES_SUCCESS); +} + + +TEST_IMPL(gethostbyname) { + + int rc = 0; + char addr[4]; + + rc = ares_library_init(ARES_LIB_INIT_ALL); + if (rc != 0) { + printf("ares library init fails %d\n", rc); + return 1; + } + + uv_init(alloc_cb); + + printf("Start basic gethostbyname test\n"); + prep_tcploopback(); + + ares_bynamecallbacks = 0; + bynamecallbacksig = 7; + + ares_gethostbyname(channel, + "microsoft.com", + AF_INET, + &aresbynamecallback, + &bynamecallbacksig); + uv_run(); + + ASSERT(ares_bynamecallbacks == 1); + + uv_ares_destroy(channel); + printf("Done basic gethostbyname test\n"); + + + /* two sequential call on new channel */ + + printf("Start gethostbyname and gethostbyaddr sequential test\n"); + prep_tcploopback(); + + ares_bynamecallbacks = 0; + bynamecallbacksig = 7; + + ares_gethostbyname(channel, + "microsoft.com", + AF_INET, + &aresbynamecallback, + &bynamecallbacksig); + uv_run(); + + ASSERT(ares_bynamecallbacks == 1); + + ares_byaddrcallbacks = 0; + byaddrcallbacksig = 8; + addr[0] = 10; + addr[1] = 0; + addr[2] = 1; + addr[3] = 99; + + ares_gethostbyaddr(channel, + addr, + 4, + AF_INET, + &aresbyaddrcallback, + &byaddrcallbacksig); + + uv_run(); + + ASSERT(ares_byaddrcallbacks == 1); + + uv_ares_destroy(channel); + printf("Done gethostbyname and gethostbyaddr sequential test\n"); + + + /* two simultaneous calls on new channel */ + + printf("Start gethostbyname and gethostbyaddr concurrent test\n"); + prep_tcploopback(); + + ares_bynamecallbacks = 0; + bynamecallbacksig = 7; + + ares_gethostbyname(channel, + "microsoft.com", + AF_INET, + &aresbynamecallback, + &bynamecallbacksig); + + ares_byaddrcallbacks = 0; + byaddrcallbacksig = 8; + addr[0] = 10; + addr[1] = 0; + addr[2] = 1; + addr[3] = 99; + + ares_gethostbyaddr(channel, + addr, + 4, + AF_INET, + &aresbyaddrcallback, + &byaddrcallbacksig); + + uv_run(); + + ASSERT(ares_bynamecallbacks == 1); + ASSERT(ares_byaddrcallbacks == 1); + + + uv_ares_destroy(channel); + printf("Done gethostbyname and gethostbyaddr concurrent test\n"); + + return 0; +} diff --git a/test/test-list.h b/test/test-list.h index 7b4dc8f7..1c692fbf 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -43,6 +43,7 @@ TEST_DECLARE (check_ref) TEST_DECLARE (async) TEST_DECLARE (get_currentexe) TEST_DECLARE (hrtime) +TEST_DECLARE (gethostbyname) TEST_DECLARE (fail_always) TEST_DECLARE (pass_always) HELPER_DECLARE (echo_server) @@ -95,6 +96,9 @@ TASK_LIST_START TEST_ENTRY (hrtime) + TEST_ENTRY (gethostbyname) + TEST_HELPER (gethostbyname, echo_server) + #if 0 /* These are for testing the test runner. */ TEST_ENTRY (fail_always) diff --git a/uv-unix.c b/uv-unix.c index d9f63287..c1322db2 100644 --- a/uv-unix.c +++ b/uv-unix.c @@ -1232,4 +1232,16 @@ int64_t uv_timer_get_repeat(uv_timer_t* timer) { return (int64_t)(1000 * timer->timer_watcher.repeat); } +/* c-ares integration initialize and terminate */ +int uv_ares_init_options(ares_channel *channelptr, + struct ares_options *options, + int optmask) { + rc = ares_init_options(channelptr, options, optmask); + return rc; +} + +void uv_ares_destroy(ares_channel channel) { + ares_destroy(channel); +} + diff --git a/uv-unix.h b/uv-unix.h index 236208b6..e898626e 100644 --- a/uv-unix.h +++ b/uv-unix.h @@ -97,4 +97,8 @@ typedef struct { uv_timer_cb timer_cb; +#define UV_ARES_ACTION_PRIVATE_FIELDS /* TODO */ + +#define UV_ARES_TASK_PRIVATE_FIELDS /* TODO */ + #endif /* UV_UNIX_H */ diff --git a/uv-win.c b/uv-win.c index e941c243..cb0eead2 100644 --- a/uv-win.c +++ b/uv-win.c @@ -189,6 +189,27 @@ static struct sockaddr_in uv_addr_ip4_any_; static char uv_zero_[] = ""; +void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req); +void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req); + +/* list used for ares task handles */ +static uv_ares_task_t* uv_ares_handles_ = NULL; + +/* memory used per ares_channel */ +struct uv_ares_channel_s { + ares_channel channel; +}; + +typedef struct uv_ares_channel_s uv_ares_channel_t; + +/* static data to hold single ares_channel */ +static uv_ares_channel_t uv_ares_data = { NULL }; + +/* default timeout per socket request if ares does not specify value */ +/* use 20 sec */ +#define ARES_TIMEOUT_MS 20000 + + /* Atomic set operation on char */ #ifdef _MSC_VER /* MSVC */ @@ -1517,6 +1538,14 @@ static void uv_process_reqs() { uv_async_return_req((uv_async_t*)handle, req); break; + case UV_ARES: + uv_ares_process((uv_ares_action_t*)handle, req); + break; + + case UV_ARES_TASK: + uv_ares_task_cleanup((uv_ares_task_t*)handle, req); + break; + default: assert(0); } @@ -1710,3 +1739,264 @@ done: uint64_t uv_get_hrtime(void) { assert(0 && "implement me"); } + +/* find matching ares handle in list */ +void uv_add_ares_handle(uv_ares_task_t* handle) { + handle->ares_next = uv_ares_handles_; + handle->ares_prev = NULL; + + if (uv_ares_handles_) { + uv_ares_handles_->ares_prev = handle; + } + uv_ares_handles_ = handle; +} + +/* find matching ares handle in list */ +/* TODO: faster lookup */ +uv_ares_task_t* uv_find_ares_handle(ares_socket_t sock) { + uv_ares_task_t* handle = uv_ares_handles_; + while (handle != NULL) { + if (handle->sock == sock) { + break; + } + handle = handle->ares_next; + } + + return handle; +} + +/* remove ares handle in list */ +void uv_remove_ares_handle(uv_ares_task_t* handle) { + if (handle == uv_ares_handles_) { + uv_ares_handles_ = handle->ares_next; + } + + if (handle->ares_next) { + handle->ares_next->ares_prev = handle->ares_prev; + } + + if (handle->ares_prev) { + handle->ares_prev->ares_next = handle->ares_next; + } +} + +/* thread pool callback when socket is signalled */ +VOID CALLBACK uv_ares_socksignal_tp(PVOID parameter, + BOOLEAN timerfired) { + WSANETWORKEVENTS network_events; + uv_ares_task_t* sockhandle; + uv_ares_action_t* selhandle; + uv_req_t* uv_ares_req; + + assert(parameter != NULL); + + if (parameter != NULL) { + sockhandle = (uv_ares_task_t*)parameter; + + /* clear socket status for this event */ + /* do not fail if error, thread may run after socket close */ + /* The code assumes that c-ares will write all pending data in the callback, + unless the socket would block. We can clear the state here to avoid unecessary + signals. */ + WSAEnumNetworkEvents(sockhandle->sock, sockhandle->h_event, &network_events); + + /* setup new handle */ + selhandle = (uv_ares_action_t*)malloc(sizeof(uv_ares_action_t)); + if (selhandle == NULL) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + selhandle->type = UV_ARES; + selhandle->close_cb = NULL; + selhandle->data = sockhandle->data; + selhandle->sock = sockhandle->sock; + selhandle->read = (network_events.lNetworkEvents & (FD_READ | FD_CONNECT)) ? 1 : 0; + selhandle->write = (network_events.lNetworkEvents & (FD_WRITE | FD_CONNECT)) ? 1 : 0; + + uv_ares_req = &selhandle->ares_req; + uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL); + uv_ares_req->type = UV_WAKEUP; + + /* post ares needs to called */ + if (!PostQueuedCompletionStatus(uv_iocp_, + 0, + 0, + &uv_ares_req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + } +} + +/* callback from ares when socket operation is started */ +void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { + /* look to see if we have a handle for this socket in our list */ + uv_ares_task_t* uv_handle_ares = uv_find_ares_handle(sock); + struct timeval tv; + struct timeval* tvptr; + int timeoutms = 0; + + if (read == 0 && write == 0) { + /* if read and write are 0, cleanup existing data */ + /* The code assumes that c-ares does a callback with read = 0 and write = 0 + when the socket is closed. After we recieve this we stop monitoring the socket. */ + if (uv_handle_ares != NULL) { + uv_req_t* uv_ares_req; + + uv_handle_ares->h_close_event = CreateEvent(NULL, FALSE, FALSE, NULL); + /* remove Wait */ + if (uv_handle_ares->h_wait) { + UnregisterWaitEx(uv_handle_ares->h_wait, uv_handle_ares->h_close_event); + uv_handle_ares->h_wait = NULL; + } + + /* detach socket from the event */ + WSAEventSelect(sock, NULL, 0); + if (uv_handle_ares->h_event != WSA_INVALID_EVENT) { + WSACloseEvent(uv_handle_ares->h_event); + uv_handle_ares->h_event = WSA_INVALID_EVENT; + } + /* remove handle from list */ + uv_remove_ares_handle(uv_handle_ares); + + /* Post request to cleanup the Task */ + uv_ares_req = &uv_handle_ares->ares_req; + uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL); + uv_ares_req->type = UV_WAKEUP; + + /* post ares done with socket - finish cleanup when all threads done. */ + if (!PostQueuedCompletionStatus(uv_iocp_, + 0, + 0, + &uv_ares_req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + } else { + assert(0); + uv_fatal_error(ERROR_INVALID_DATA, "ares_SockStateCB"); + } + } else { + if (uv_handle_ares == NULL) { + /* setup new handle */ + /* The code assumes that c-ares will call us when it has an open socket. + We need to call into c-ares when there is something to read, + or when it becomes writable. */ + uv_handle_ares = (uv_ares_task_t*)malloc(sizeof(uv_ares_task_t)); + if (uv_handle_ares == NULL) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + uv_handle_ares->type = UV_ARES_TASK; + uv_handle_ares->close_cb = NULL; + uv_handle_ares->data = ((uv_ares_channel_t*)data)->channel; + uv_handle_ares->sock = sock; + uv_handle_ares->h_wait = NULL; + uv_handle_ares->flags = 0; + + /* create an event to wait on socket signal */ + uv_handle_ares->h_event = WSACreateEvent(); + if (uv_handle_ares->h_event == WSA_INVALID_EVENT) { + uv_fatal_error(WSAGetLastError(), "WSACreateEvent"); + } + + /* tie event to socket */ + if (SOCKET_ERROR == WSAEventSelect(sock, uv_handle_ares->h_event, FD_READ | FD_WRITE | FD_CONNECT)) { + uv_fatal_error(WSAGetLastError(), "WSAEventSelect"); + } + + /* add handle to list */ + uv_add_ares_handle(uv_handle_ares); + uv_refs_++; + tv.tv_sec = 0; + tvptr = ares_timeout(((uv_ares_channel_t*)data)->channel, NULL, &tv); + if (tvptr) { + timeoutms = (tvptr->tv_sec * 1000) + (tvptr->tv_usec / 1000); + } else { + timeoutms = ARES_TIMEOUT_MS; + } + + /* specify thread pool function to call when event is signaled */ + if (RegisterWaitForSingleObject(&uv_handle_ares->h_wait, + uv_handle_ares->h_event, + uv_ares_socksignal_tp, + (void*)uv_handle_ares, + timeoutms, + WT_EXECUTEINWAITTHREAD) == 0) { + uv_fatal_error(GetLastError(), "RegisterWaitForSingleObject"); + } + } else { + /* found existing handle. */ + assert(uv_handle_ares->type == UV_ARES_TASK); + assert(uv_handle_ares->data != NULL); + assert(uv_handle_ares->h_event != WSA_INVALID_EVENT); + } + } +} + +/* called via uv_poll when ares completion port signaled */ +void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) { + + ares_process_fd( (ares_channel)handle->data, + handle->read ? handle->sock : INVALID_SOCKET, + handle->write ? handle->sock : INVALID_SOCKET); + + /* release handle for select here */ + free(handle); +} + +/* called via uv_poll when ares is finished with socket */ +void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req) { + /* check for event complete without waiting */ + unsigned int signaled = WaitForSingleObject(handle->h_close_event, 0); + + if (signaled != WAIT_TIMEOUT) { + + uv_refs_--; + + /* close event handle and free uv handle memory */ + CloseHandle(handle->h_close_event); + free(handle); + } else { + /* stil busy - repost and try again */ + if (!PostQueuedCompletionStatus(uv_iocp_, + 0, + 0, + &req->overlapped)) { + uv_fatal_error(GetLastError(), "PostQueuedCompletionStatus"); + } + } +} +/* set ares SOCK_STATE callback to our handler */ +int uv_ares_init_options(ares_channel *channelptr, + struct ares_options *options, + int optmask) { + int rc; + + /* only allow single init at a time */ + if (uv_ares_data.channel != NULL) { + return UV_EALREADY; + } + + /* set our callback as an option */ + options->sock_state_cb = uv_ares_sockstate_cb; + options->sock_state_cb_data = &uv_ares_data; + optmask |= ARES_OPT_SOCK_STATE_CB; + + /* We do the call to ares_init_option for caller. */ + rc = ares_init_options(channelptr, options, optmask); + + /* if success, save channel */ + if (rc == ARES_SUCCESS) { + uv_ares_data.channel = *channelptr; + } + + return rc; +} + +/* release memory */ +void uv_ares_destroy(ares_channel channel) { + /* only allow destroy if did init */ + if (uv_ares_data.channel != NULL) { + ares_destroy(channel); + uv_ares_data.channel = NULL; + } +} + + diff --git a/uv-win.h b/uv-win.h index 9a002aa2..fe317faa 100644 --- a/uv-win.h +++ b/uv-win.h @@ -108,5 +108,19 @@ typedef struct uv_buf_t { unsigned int flags; \ uv_err_t error; +#define UV_ARES_ACTION_PRIVATE_FIELDS \ + struct uv_req_s ares_req; \ + SOCKET sock; \ + int read; \ + int write; + +#define UV_ARES_TASK_PRIVATE_FIELDS \ + uv_ares_task_t* ares_prev; \ + uv_ares_task_t* ares_next; \ + struct uv_req_s ares_req; \ + SOCKET sock; \ + HANDLE h_wait; \ + WSAEVENT h_event; \ + HANDLE h_close_event; int uv_utf16_to_utf8(wchar_t* utf16Buffer, size_t utf16Size, char* utf8Buffer, size_t utf8Size); diff --git a/uv.h b/uv.h index fcda4544..f383d57b 100644 --- a/uv.h +++ b/uv.h @@ -48,6 +48,8 @@ typedef struct uv_check_s uv_check_t; typedef struct uv_idle_s uv_idle_t; typedef struct uv_req_s uv_req_t; typedef struct uv_async_s uv_async_t; +typedef struct uv_ares_task_s uv_ares_task_t; +typedef struct uv_ares_action_s uv_ares_action_t; #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) @@ -129,7 +131,9 @@ typedef enum { UV_PREPARE, UV_CHECK, UV_IDLE, - UV_ASYNC + UV_ASYNC, + UV_ARES, + UV_ARES_TASK } uv_handle_type; typedef enum { @@ -364,6 +368,31 @@ void uv_timer_set_repeat(uv_timer_t* timer, int64_t repeat); int64_t uv_timer_get_repeat(uv_timer_t* timer); +/* + * Subclass of uv_handle_t. Used for integration of c-ares. + */ +struct uv_ares_task_s { + UV_HANDLE_FIELDS + UV_ARES_TASK_PRIVATE_FIELDS +}; + + +/* + * Subclass of uv_handle_t. Used for integration of c-ares. + */ +struct uv_ares_action_s { + UV_HANDLE_FIELDS + UV_ARES_ACTION_PRIVATE_FIELDS +}; + +/* c-ares integration initialize and terminate */ +int uv_ares_init_options(ares_channel *channelptr, + struct ares_options *options, + int optmask); + +void uv_ares_destroy(ares_channel channel); + + /* * Most functions return boolean: 0 for success and -1 for failure. * On error the user should then call uv_last_error() to determine @@ -401,6 +430,8 @@ union uv_any_handle { uv_idle_t idle; uv_async_t async; uv_timer_t timer; + uv_ares_task_t arest; + uv_ares_action_t aresa; }; /* Diagnostic counters */