From 52511b9ddca86afbb034b3db85ebcf4185c6e1eb Mon Sep 17 00:00:00 2001 From: Igor Zinkovsky Date: Wed, 11 Jan 2012 18:46:27 -0800 Subject: [PATCH] windows: implement uv_loop_new+uv_loop_delete --- include/uv-private/uv-win.h | 6 +++++- src/uv-common.c | 10 +--------- src/win/core.c | 22 +++++++++++++++++++--- src/win/tcp.c | 12 ++++-------- src/win/udp.c | 10 +++------- src/win/util.c | 16 +++++++++++++++- test/test-thread.c | 19 ++++++++++++++++--- 7 files changed, 63 insertions(+), 32 deletions(-) diff --git a/include/uv-private/uv-win.h b/include/uv-private/uv-win.h index 5515bd6c..5a6a949e 100644 --- a/include/uv-private/uv-win.h +++ b/include/uv-private/uv-win.h @@ -200,7 +200,11 @@ RB_HEAD(uv_timer_tree_s, uv_timer_s); uv_idle_t* next_idle_handle; \ ares_channel ares_chan; \ int ares_active_sockets; \ - uv_timer_t ares_polling_timer; + uv_timer_t ares_polling_timer; \ + /* Counter to keep track of active tcp streams */ \ + unsigned int active_tcp_streams; \ + /* Counter to keep track of active udp streams */ \ + unsigned int active_udp_streams; #define UV_REQ_TYPE_PRIVATE \ /* TODO: remove the req suffix */ \ diff --git a/src/uv-common.c b/src/uv-common.c index d5e2c2a9..0d1c3633 100644 --- a/src/uv-common.c +++ b/src/uv-common.c @@ -33,14 +33,6 @@ #include "ares/inet_ntop.h" -static uv_counters_t counters; - - -uv_counters_t* uv_counters() { - return &counters; -} - - size_t uv_strlcpy(char* dst, const char* src, size_t size) { size_t n; @@ -300,7 +292,7 @@ int uv_tcp_connect6(uv_connect_t* req, #ifdef _WIN32 -static DWORD __stdcall uv__thread_start(void *ctx_v) +static UINT __stdcall uv__thread_start(void *ctx_v) #else static void *uv__thread_start(void *ctx_v) #endif diff --git a/src/win/core.c b/src/win/core.c index 70da4273..c5b49fc0 100644 --- a/src/win/core.c +++ b/src/win/core.c @@ -86,6 +86,9 @@ static void uv_loop_init(uv_loop_t* loop) { loop->ares_active_sockets = 0; loop->ares_chan = NULL; + loop->active_tcp_streams = 0; + loop->active_udp_streams = 0; + loop->last_err = uv_ok_; } @@ -106,13 +109,26 @@ uv_loop_t* uv_default_loop(void) { uv_loop_t* uv_loop_new(void) { - assert(0 && "implement me"); - return NULL; + uv_loop_t* loop; + + /* Initialize libuv itself first */ + uv_once(&uv_init_guard_, uv_init); + + loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + + if (!loop) { + uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); + } + + uv_loop_init(loop); + return loop; } void uv_loop_delete(uv_loop_t* loop) { - assert(0 && "implement me"); + if (loop != &uv_default_loop_) { + free(loop); + } } diff --git a/src/win/tcp.c b/src/win/tcp.c index 7965f73a..f810913f 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -42,10 +42,6 @@ const unsigned int uv_simultaneous_server_accepts = 32; /* A zero-size buffer for use by uv_tcp_read */ static char uv_zero_[] = ""; -/* Counter to keep track of active tcp streams */ -static unsigned int active_tcp_streams = 0; - - static int uv__tcp_nodelay(uv_tcp_t* handle, SOCKET socket, int enable) { if (setsockopt(socket, IPPROTO_TCP, @@ -217,7 +213,7 @@ void uv_tcp_endgame(uv_loop_t* loop, uv_tcp_t* handle) { handle->close_cb((uv_handle_t*)handle); } - active_tcp_streams--; + loop->active_tcp_streams--; uv_unref(loop); } @@ -399,7 +395,7 @@ static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) { * Preallocate a read buffer if the number of active streams is below * the threshold. */ - if (active_tcp_streams < uv_active_tcp_streams_threshold) { + if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) { handle->flags &= ~UV_HANDLE_ZERO_READ; handle->read_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536); assert(handle->read_buffer.len > 0); @@ -559,7 +555,7 @@ int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { } } - active_tcp_streams++; + loop->active_tcp_streams++; return rv; } @@ -1007,7 +1003,7 @@ void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle, NULL, 0) == 0) { uv_connection_init((uv_stream_t*)handle); - active_tcp_streams++; + loop->active_tcp_streams++; ((uv_connect_cb)req->cb)(req, 0); } else { uv__set_sys_error(loop, WSAGetLastError()); diff --git a/src/win/udp.c b/src/win/udp.c index 0299e775..422b0b3c 100644 --- a/src/win/udp.c +++ b/src/win/udp.c @@ -34,10 +34,6 @@ const unsigned int uv_active_udp_streams_threshold = 0; /* A zero-size buffer for use by uv_udp_read */ static char uv_zero_[] = ""; -/* Counter to keep track of active udp streams */ -static unsigned int active_udp_streams = 0; - - int uv_udp_getsockname(uv_udp_t* handle, struct sockaddr* name, int* namelen) { uv_loop_t* loop = handle->loop; @@ -269,7 +265,7 @@ static void uv_udp_queue_recv(uv_loop_t* loop, uv_udp_t* handle) { * Preallocate a read buffer if the number of active streams is below * the threshold. */ - if (active_udp_streams < uv_active_udp_streams_threshold) { + if (loop->active_udp_streams < uv_active_udp_streams_threshold) { handle->flags &= ~UV_HANDLE_ZERO_READ; handle->recv_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536); @@ -357,7 +353,7 @@ int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, } handle->flags |= UV_HANDLE_READING; - active_udp_streams++; + loop->active_udp_streams++; handle->recv_cb = recv_cb; handle->alloc_cb = alloc_cb; @@ -374,7 +370,7 @@ int uv_udp_recv_start(uv_udp_t* handle, uv_alloc_cb alloc_cb, int uv_udp_recv_stop(uv_udp_t* handle) { if (handle->flags & UV_HANDLE_READING) { handle->flags &= ~UV_HANDLE_READING; - active_udp_streams--; + handle->loop->active_udp_streams--; } return 0; diff --git a/src/win/util.c b/src/win/util.c index f50952b9..15618e92 100644 --- a/src/win/util.c +++ b/src/win/util.c @@ -47,7 +47,8 @@ static char *process_title; - +static uv_once_t uv_process_title_init_guard_ = UV_ONCE_INIT; +static CRITICAL_SECTION process_title_lock; int uv_utf16_to_utf8(const wchar_t* utf16Buffer, size_t utf16Size, char* utf8Buffer, size_t utf8Size) { @@ -263,11 +264,18 @@ char** uv_setup_args(int argc, char** argv) { } +static void uv_process_title_init(void) { + InitializeCriticalSection(&process_title_lock); +} + + uv_err_t uv_set_process_title(const char* title) { uv_err_t err; int length; wchar_t* title_w = NULL; + uv_once(&uv_process_title_init_guard_, uv_process_title_init); + /* Find out how big the buffer for the wide-char title must be */ length = uv_utf8_to_utf16(title, NULL, 0); if (!length) { @@ -297,8 +305,10 @@ uv_err_t uv_set_process_title(const char* title) { goto done; } + EnterCriticalSection(&process_title_lock); free(process_title); process_title = strdup(title); + LeaveCriticalSection(&process_title_lock); err = uv_ok_; @@ -339,6 +349,9 @@ static int uv__get_process_title() { uv_err_t uv_get_process_title(char* buffer, size_t size) { + uv_once(&uv_process_title_init_guard_, uv_process_title_init); + + EnterCriticalSection(&process_title_lock); /* * If the process_title was never read before nor explicitly set, * we must query it with getConsoleTitleW @@ -349,6 +362,7 @@ uv_err_t uv_get_process_title(char* buffer, size_t size) { assert(process_title); strncpy(buffer, process_title, size); + LeaveCriticalSection(&process_title_lock); return uv_ok_; } diff --git a/test/test-thread.c b/test/test-thread.c index fcdff7c1..72cf9668 100644 --- a/test/test-thread.c +++ b/test/test-thread.c @@ -46,6 +46,12 @@ struct fs_req { uv_fs_t handle; }; + +struct thread { + uv_thread_t thread_id; + volatile int thread_called; +}; + static void getaddrinfo_do(struct getaddrinfo_req* req); static void getaddrinfo_cb(uv_getaddrinfo_t* handle, int status, @@ -95,6 +101,8 @@ static void fs_do(struct fs_req* req) { static void fs_cb(uv_fs_t* handle) { struct fs_req* req = container_of(handle, struct fs_req, handle); + uv_fs_req_cleanup(handle); + if (--req->counter) fs_do(req); } @@ -106,6 +114,7 @@ static void do_work(void* arg) { uv_loop_t* loop; size_t i; int r; + struct thread* thread = arg; loop = uv_loop_new(); ASSERT(loop != NULL); @@ -128,6 +137,7 @@ static void do_work(void* arg) { ASSERT(r == 0); uv_loop_delete(loop); + thread->thread_called = 1; } @@ -157,18 +167,21 @@ TEST_IMPL(thread_create) { * that each "finished" callback is run in its originating thread. */ TEST_IMPL(threadpool_multiple_event_loops) { - uv_thread_t threads[8]; + struct thread threads[8]; size_t i; int r; + memset(threads, 0, sizeof(threads)); + for (i = 0; i < ARRAY_SIZE(threads); i++) { - r = uv_thread_create(threads + i, do_work, NULL); + r = uv_thread_create(&threads[i].thread_id, do_work, &threads[i]); ASSERT(r == 0); } for (i = 0; i < ARRAY_SIZE(threads); i++) { - r = uv_thread_join(threads + i); + r = uv_thread_join(&threads[i].thread_id); ASSERT(r == 0); + ASSERT(threads[i].thread_called); } return 0;