diff --git a/include/uv.h b/include/uv.h index 66a593d8..e5eada6c 100644 --- a/include/uv.h +++ b/include/uv.h @@ -140,8 +140,8 @@ typedef enum { UV_CHECK, UV_IDLE, UV_ASYNC, - UV_ARES, UV_ARES_TASK, + UV_ARES_EVENT, UV_GETADDRINFO } uv_handle_type; @@ -152,7 +152,11 @@ typedef enum { UV_READ, UV_WRITE, UV_SHUTDOWN, - UV_WAKEUP + UV_WAKEUP, + /* TODO: remove the req suffix */ + UV_ARES_EVENT_REQ, + UV_ARES_CLEANUP_REQ, + UV_GETADDRINFO_REQ } uv_req_type; diff --git a/src/uv-win.c b/src/uv-win.c index b9d60211..2b09faec 100644 --- a/src/uv-win.c +++ b/src/uv-win.c @@ -239,8 +239,8 @@ struct uv_ares_action_s { int write; }; -void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req); -void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req); +void uv_process_ares_event_req(uv_ares_action_t* handle, uv_req_t* req); +void uv_process_ares_cleanup_req(uv_ares_task_t* handle, uv_req_t* req); void uv_ares_poll(uv_timer_t* handle, int status); static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); @@ -263,7 +263,7 @@ static uv_ares_channel_t uv_ares_data = { NULL, 0 }; /* getaddrinfo integration */ -static void uv_getaddrinfo_done(uv_getaddrinfo_t* handle, uv_req_t* req); +static void uv_process_getaddrinfo_req(uv_getaddrinfo_t* handle, uv_req_t* req); /* adjust size value to be multiple of 4. Use to keep pointer aligned */ /* Do we need different versions of this for different architectures? */ #define ALIGNED_SIZE(X) ((((X) + 3) >> 2) << 2) @@ -766,7 +766,7 @@ static void uv_process_endgames() { case UV_TCP: uv_tcp_endgame((uv_tcp_t*)handle); break; - + case UV_NAMED_PIPE: uv_pipe_endgame((uv_pipe_t*)handle); break; @@ -830,7 +830,7 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) { uv_want_endgame(handle); } return 0; - + case UV_NAMED_PIPE: pipe = (uv_pipe_t*)handle; pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); @@ -1031,7 +1031,7 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle) { /* Prepare the overlapped structure. */ memset(&(req->overlapped), 0, sizeof(req->overlapped)); - if (!ConnectNamedPipe(instance->handle, &req->overlapped) && + if (!ConnectNamedPipe(instance->handle, &req->overlapped) && GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(GetLastError()); @@ -1099,12 +1099,12 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) { req->type = UV_READ; /* Do 0-read */ - result = ReadFile(handle->connection->handle, - &uv_zero_, - 0, - NULL, - &req->overlapped); - + result = ReadFile(handle->connection->handle, + &uv_zero_, + 0, + NULL, + &req->overlapped); + if (!result && GetLastError() != ERROR_IO_PENDING) { /* Make this req pending reporting an error. */ req->error = uv_new_sys_error(WSAGetLastError()); @@ -1150,7 +1150,7 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) { static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { int rv = 0; - + if (server->accept_socket == INVALID_SOCKET) { uv_set_sys_error(WSAENOTCONN); return -1; @@ -1173,12 +1173,12 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { } -static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { +static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { assert(server->acceptConnection); /* Make the connection instance active */ server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE; - + /* Move the connection instance from server to client */ client->connection = server->acceptConnection; server->acceptConnection = NULL; @@ -1189,7 +1189,7 @@ static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { uv_init_connection((uv_stream_t*)client); client->flags |= UV_HANDLE_PIPESERVER; uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); - + if (!(server->flags & UV_HANDLE_CLOSING)) { uv_pipe_queue_accept(server); } @@ -1206,7 +1206,7 @@ int uv_accept(uv_handle_t* server, uv_stream_t* client) { } else if (server->type == UV_NAMED_PIPE) { return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); } - + return -1; } @@ -1470,7 +1470,7 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) { uv_pipe_t* handle = (uv_pipe_t*) req->handle; assert(!(req->flags & UV_REQ_PENDING)); - + if (bufcnt != 1) { uv_set_sys_error(UV_ENOTSUP); return -1; @@ -1558,7 +1558,18 @@ int uv_shutdown(uv_req_t* req) { } -static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { +#define DECREASE_PENDING_REQ_COUNT(handle) \ + do { \ + handle->reqs_pending--; \ + \ + if (handle->flags & UV_HANDLE_CLOSING && \ + handle->reqs_pending == 0) { \ + uv_want_endgame((uv_handle_t*)handle); \ + } \ + } while (0) + + +static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { DWORD bytes, flags, err; uv_buf_t buf; @@ -1567,160 +1578,173 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { /* Mark the request non-pending */ req->flags &= ~UV_REQ_PENDING; - switch (req->type) { - case UV_WRITE: - handle->write_queue_size -= req->queued_bytes; - if (req->cb) { - uv_last_error_ = req->error; - ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); - } - handle->write_reqs_pending--; - if (handle->write_reqs_pending == 0 && - handle->flags & UV_HANDLE_SHUTTING) { - uv_want_endgame((uv_handle_t*)handle); - } - break; - - case UV_READ: - if (req->error.code != UV_OK) { - /* An error occurred doing the 0-read. */ - if (!(handle->flags & UV_HANDLE_READING)) { - break; - } - - /* Stop reading and report error. */ - handle->flags &= ~UV_HANDLE_READING; - uv_last_error_ = req->error; - buf.base = 0; - buf.len = 0; - handle->read_cb((uv_stream_t*)handle, -1, buf); - break; - } - - /* Do nonblocking reads until the buffer is empty */ - while (handle->flags & UV_HANDLE_READING) { - buf = handle->alloc_cb((uv_stream_t*)handle, 65536); - assert(buf.len > 0); - flags = 0; - if (WSARecv(handle->socket, - (WSABUF*)&buf, - 1, - &bytes, - &flags, - NULL, - NULL) != SOCKET_ERROR) { - if (bytes > 0) { - /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); - /* Read again only if bytes == buf.len */ - if (bytes < buf.len) { - break; - } - } else { - /* Connection closed */ - handle->flags &= ~UV_HANDLE_READING; - handle->flags |= UV_HANDLE_EOF; - uv_last_error_.code = UV_EOF; - uv_last_error_.sys_errno_ = ERROR_SUCCESS; - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (req->error.code != UV_OK) { + /* An error occurred doing the 0-read. */ + if ((handle->flags & UV_HANDLE_READING)) { + handle->flags &= ~UV_HANDLE_READING; + uv_last_error_ = req->error; + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, buf); + } + } else { + /* Do nonblocking reads until the buffer is empty */ + while (handle->flags & UV_HANDLE_READING) { + buf = handle->alloc_cb((uv_stream_t*)handle, 65536); + assert(buf.len > 0); + flags = 0; + if (WSARecv(handle->socket, + (WSABUF*)&buf, + 1, + &bytes, + &flags, + NULL, + NULL) != SOCKET_ERROR) { + if (bytes > 0) { + /* Successful read */ + handle->read_cb((uv_stream_t*)handle, bytes, buf); + /* Read again only if bytes == buf.len */ + if (bytes < buf.len) { break; } } else { - err = WSAGetLastError(); - if (err == WSAEWOULDBLOCK) { - /* Read buffer was completely empty, report a 0-byte read. */ - uv_set_sys_error(WSAEWOULDBLOCK); - handle->read_cb((uv_stream_t*)handle, 0, buf); - } else { - /* Ouch! serious error. */ - uv_set_sys_error(err); - handle->read_cb((uv_stream_t*)handle, -1, buf); - } + /* Connection closed */ + handle->flags &= ~UV_HANDLE_READING; + handle->flags |= UV_HANDLE_EOF; + uv_last_error_.code = UV_EOF; + uv_last_error_.sys_errno_ = ERROR_SUCCESS; + handle->read_cb((uv_stream_t*)handle, -1, buf); break; } - } - /* Post another 0-read if still reading and not closing. */ - if (handle->flags & UV_HANDLE_READING) { - uv_tcp_queue_read(handle); - } - break; - - case UV_ACCEPT: - /* If handle->accepted_socket is not a valid socket, then */ - /* uv_queue_accept must have failed. This is a serious error. We stop */ - /* accepting connections and report this error to the connection */ - /* callback. */ - if (handle->accept_socket == INVALID_SOCKET) { - if (!(handle->flags & UV_HANDLE_LISTENING)) { - break; - } - handle->flags &= ~UV_HANDLE_LISTENING; - if (handle->connection_cb) { - uv_last_error_ = req->error; - handle->connection_cb((uv_handle_t*)handle, -1); + } else { + err = WSAGetLastError(); + if (err == WSAEWOULDBLOCK) { + /* Read buffer was completely empty, report a 0-byte read. */ + uv_set_sys_error(WSAEWOULDBLOCK); + handle->read_cb((uv_stream_t*)handle, 0, buf); + } else { + /* Ouch! serious error. */ + uv_set_sys_error(err); + handle->read_cb((uv_stream_t*)handle, -1, buf); } break; } + } - if (req->error.code == UV_OK && - setsockopt(handle->accept_socket, - SOL_SOCKET, - SO_UPDATE_ACCEPT_CONTEXT, - (char*)&handle->socket, - sizeof(handle->socket)) == 0) { - /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */ - if (handle->connection_cb) { - handle->connection_cb((uv_handle_t*)handle, 0); - } - } else { - /* Error related to accepted socket is ignored because the server */ - /* socket may still be healthy. If the server socket is broken - /* uv_queue_accept will detect it. */ - closesocket(handle->accept_socket); - if (handle->flags & UV_HANDLE_LISTENING) { - uv_tcp_queue_accept(handle); - } - } - break; - - case UV_CONNECT: - if (req->cb) { - if (req->error.code == UV_OK) { - if (setsockopt(handle->socket, - SOL_SOCKET, - SO_UPDATE_CONNECT_CONTEXT, - NULL, - 0) == 0) { - uv_init_connection((uv_stream_t*)handle); - ((uv_connect_cb)req->cb)(req, 0); - } else { - uv_set_sys_error(WSAGetLastError()); - ((uv_connect_cb)req->cb)(req, -1); - } - } else { - uv_last_error_ = req->error; - ((uv_connect_cb)req->cb)(req, -1); - } - } - break; - - default: - assert(0); + /* Post another 0-read if still reading and not closing. */ + if (handle->flags & UV_HANDLE_READING) { + uv_tcp_queue_read(handle); + } } - /* The number of pending requests is now down by one */ - handle->reqs_pending--; - - /* Queue the handle's close callback if it is closing and there are no */ - /* more pending requests. */ - if (handle->flags & UV_HANDLE_CLOSING && - handle->reqs_pending == 0) { - uv_want_endgame((uv_handle_t*)handle); - } + DECREASE_PENDING_REQ_COUNT(handle); } -static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) { +static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) { + DWORD bytes, flags, err; + uv_buf_t buf; + + assert(handle->type == UV_TCP); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + handle->write_queue_size -= req->queued_bytes; + + if (req->cb) { + uv_last_error_ = req->error; + ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); + } + + handle->write_reqs_pending--; + if (handle->flags & UV_HANDLE_SHUTTING && + handle->write_reqs_pending == 0) { + uv_want_endgame((uv_handle_t*)handle); + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) { + DWORD bytes, flags, err; + uv_buf_t buf; + + assert(handle->type == UV_TCP); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + /* If handle->accepted_socket is not a valid socket, then */ + /* uv_queue_accept must have failed. This is a serious error. We stop */ + /* accepting connections and report this error to the connection */ + /* callback. */ + if (handle->accept_socket == INVALID_SOCKET) { + if (handle->flags & UV_HANDLE_LISTENING) { + handle->flags &= ~UV_HANDLE_LISTENING; + if (handle->connection_cb) { + uv_last_error_ = req->error; + handle->connection_cb((uv_handle_t*)handle, -1); + } + } + } else if (req->error.code == UV_OK && + setsockopt(handle->accept_socket, + SOL_SOCKET, + SO_UPDATE_ACCEPT_CONTEXT, + (char*)&handle->socket, + sizeof(handle->socket)) == 0) { + /* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */ + if (handle->connection_cb) { + handle->connection_cb((uv_handle_t*)handle, 0); + } + } else { + /* Error related to accepted socket is ignored because the server */ + /* socket may still be healthy. If the server socket is broken + /* uv_queue_accept will detect it. */ + closesocket(handle->accept_socket); + if (handle->flags & UV_HANDLE_LISTENING) { + uv_tcp_queue_accept(handle); + } + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) { + DWORD bytes, flags, err; + uv_buf_t buf; + + assert(handle->type == UV_TCP); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + if (req->cb) { + if (req->error.code == UV_OK) { + if (setsockopt(handle->socket, + SOL_SOCKET, + SO_UPDATE_CONNECT_CONTEXT, + NULL, + 0) == 0) { + uv_init_connection((uv_stream_t*)handle); + ((uv_connect_cb)req->cb)(req, 0); + } else { + uv_set_sys_error(WSAGetLastError()); + ((uv_connect_cb)req->cb)(req, -1); + } + } else { + uv_last_error_ = req->error; + ((uv_connect_cb)req->cb)(req, -1); + } + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { DWORD bytes, err, mode; uv_buf_t buf; uv_pipe_instance_t* acceptingConn; @@ -1730,148 +1754,173 @@ static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) { /* Mark the request non-pending */ req->flags &= ~UV_REQ_PENDING; - switch (req->type) { - case UV_WRITE: - handle->write_queue_size -= req->queued_bytes; - if (req->cb) { - uv_last_error_ = req->error; - ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); - } - handle->write_reqs_pending--; - if (handle->write_reqs_pending == 0 && - handle->flags & UV_HANDLE_SHUTTING) { - uv_want_endgame((uv_handle_t*)handle); - } - break; + if (req->error.code != UV_OK) { + /* An error occurred doing the 0-read. */ + if (handle->flags & UV_HANDLE_READING) { + /* Stop reading and report error. */ + handle->flags &= ~UV_HANDLE_READING; + uv_last_error_ = req->error; + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, buf); + } + } else { + /* + * Temporarily switch to non-blocking mode. + * This is so that ReadFile doesn't block if the read buffer is empty. + */ + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; + if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + /* We can't continue processing this read. */ + handle->flags &= ~UV_HANDLE_READING; + uv_set_sys_error(GetLastError()); + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, buf); + } - case UV_READ: - if (req->error.code != UV_OK) { - /* An error occurred doing the 0-read. */ - if (!(handle->flags & UV_HANDLE_READING)) { - break; - } + /* Do non-blocking reads until the buffer is empty */ + while (handle->flags & UV_HANDLE_READING) { + buf = handle->alloc_cb((uv_stream_t*)handle, 65536); + assert(buf.len > 0); - /* Stop reading and report error. */ - handle->flags &= ~UV_HANDLE_READING; - uv_last_error_ = req->error; - buf.base = 0; - buf.len = 0; - handle->read_cb((uv_stream_t*)handle, -1, buf); - break; - } - - /* Temporarily switch to non-blocking mode. - * This is so that ReadFile doesn't block if the read buffer is empty. - */ - mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT; - if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { - /* We can't continue processing this read. */ - err = GetLastError(); - uv_set_sys_error(err); - handle->read_cb((uv_stream_t*)handle, -1, buf); - break; - } - - /* Do non-blocking reads until the buffer is empty */ - while (handle->flags & UV_HANDLE_READING) { - buf = handle->alloc_cb((uv_stream_t*)handle, 65536); - assert(buf.len > 0); - - if (ReadFile(handle->connection->handle, - buf.base, - buf.len, - &bytes, - NULL)) { - if (bytes > 0) { - /* Successful read */ - handle->read_cb((uv_stream_t*)handle, bytes, buf); - /* Read again only if bytes == buf.len */ - if (bytes < buf.len) { - break; - } - } else { - /* Connection closed */ - handle->flags &= ~UV_HANDLE_READING; - handle->flags |= UV_HANDLE_EOF; - uv_last_error_.code = UV_EOF; - uv_last_error_.sys_errno_ = ERROR_SUCCESS; - handle->read_cb((uv_stream_t*)handle, -1, buf); + if (ReadFile(handle->connection->handle, + buf.base, + buf.len, + &bytes, + NULL)) { + if (bytes > 0) { + /* Successful read */ + handle->read_cb((uv_stream_t*)handle, bytes, buf); + /* Read again only if bytes == buf.len */ + if (bytes < buf.len) { break; } } else { - err = GetLastError(); - if (err == ERROR_NO_DATA) { - /* Read buffer was completely empty, report a 0-byte read. */ - uv_set_sys_error(UV_EAGAIN); - handle->read_cb((uv_stream_t*)handle, 0, buf); - } else { - /* Ouch! serious error. */ - uv_set_sys_error(err); - handle->read_cb((uv_stream_t*)handle, -1, buf); - } - break; - } - } - - if (handle->flags & UV_HANDLE_READING) { - /* Switch back to blocking mode so that we can use IOCP for 0-reads */ - mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; - if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { - /* Report and continue. */ - err = GetLastError(); - uv_set_sys_error(err); + /* Connection closed */ + handle->flags &= ~UV_HANDLE_READING; + handle->flags |= UV_HANDLE_EOF; + uv_last_error_.code = UV_EOF; + uv_last_error_.sys_errno_ = ERROR_SUCCESS; handle->read_cb((uv_stream_t*)handle, -1, buf); break; } - - /* Post another 0-read if still reading and not closing. */ - uv_pipe_queue_read(handle); - } - - break; - - case UV_ACCEPT: - if (req->error.code == UV_OK) { - /* Put the connection instance into accept state */ - handle->acceptConnection = uv_req_to_pipeinstance(req); - handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED; - - if (handle->connection_cb) { - handle->connection_cb((uv_handle_t*)handle, 0); - } } else { - /* Ignore errors and continue listening */ - if (handle->flags & UV_HANDLE_LISTENING) { - uv_pipe_queue_accept(handle); - } - } - break; - - case UV_CONNECT: - if (req->cb) { - if (req->error.code == UV_OK) { - uv_init_connection((uv_stream_t*)handle); - ((uv_connect_cb)req->cb)(req, 0); + err = GetLastError(); + if (err == ERROR_NO_DATA) { + /* Read buffer was completely empty, report a 0-byte read. */ + uv_set_sys_error(UV_EAGAIN); + handle->read_cb((uv_stream_t*)handle, 0, buf); } else { - uv_last_error_ = req->error; - ((uv_connect_cb)req->cb)(req, -1); + /* Ouch! serious error. */ + uv_set_sys_error(err); + handle->read_cb((uv_stream_t*)handle, -1, buf); } + break; } - break; + } - default: - assert(0); + /* TODO: if the read callback stops reading we can't start reading again + because the pipe will still be in nowait mode. */ + if (handle->flags & UV_HANDLE_READING) { + /* Switch back to blocking mode so that we can use IOCP for 0-reads */ + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; + if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) { + /* Post another 0-read */ + uv_pipe_queue_read(handle); + } else { + /* Report and continue. */ + /* We can't continue processing this read. */ + handle->flags &= ~UV_HANDLE_READING; + uv_set_sys_error(GetLastError()); + buf.base = 0; + buf.len = 0; + handle->read_cb((uv_stream_t*)handle, -1, buf); + } + } } - /* The number of pending requests is now down by one */ - handle->reqs_pending--; + DECREASE_PENDING_REQ_COUNT(handle); +} - /* Queue the handle's close callback if it is closing and there are no */ - /* more pending requests. */ - if (handle->flags & UV_HANDLE_CLOSING && - handle->reqs_pending == 0) { + +static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) { + DWORD bytes, err, mode; + uv_buf_t buf; + uv_pipe_instance_t* acceptingConn; + + assert(handle->type == UV_NAMED_PIPE); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + handle->write_queue_size -= req->queued_bytes; + + if (req->cb) { + uv_last_error_ = req->error; + ((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); + } + + handle->write_reqs_pending--; + if (handle->write_reqs_pending == 0 && + handle->flags & UV_HANDLE_SHUTTING) { uv_want_endgame((uv_handle_t*)handle); } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) { + DWORD bytes, err, mode; + uv_buf_t buf; + uv_pipe_instance_t* acceptingConn; + + assert(handle->type == UV_NAMED_PIPE); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + if (req->error.code == UV_OK) { + /* Put the connection instance into accept state */ + handle->acceptConnection = uv_req_to_pipeinstance(req); + handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED; + + if (handle->connection_cb) { + handle->connection_cb((uv_handle_t*)handle, 0); + } + } else { + /* Ignore errors and continue listening */ + if (handle->flags & UV_HANDLE_LISTENING) { + uv_pipe_queue_accept(handle); + } + } + + DECREASE_PENDING_REQ_COUNT(handle); +} + + +static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) { + DWORD bytes, err, mode; + uv_buf_t buf; + uv_pipe_instance_t* acceptingConn; + + assert(handle->type == UV_NAMED_PIPE); + + /* Mark the request non-pending */ + req->flags &= ~UV_REQ_PENDING; + + if (req->cb) { + if (req->error.code == UV_OK) { + uv_init_connection((uv_stream_t*)handle); + ((uv_connect_cb)req->cb)(req, 0); + } else { + uv_last_error_ = req->error; + ((uv_connect_cb)req->cb)(req, -1); + } + } + + DECREASE_PENDING_REQ_COUNT(handle); } @@ -2147,7 +2196,7 @@ int uv_async_send(uv_async_t* handle) { } -static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) { +static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) { assert(handle->type == UV_ASYNC); assert(req->type == UV_WAKEUP); @@ -2161,36 +2210,58 @@ static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) { } +#define DELEGATE_STREAM_REQ(req, method) \ + do { \ + switch (req->handle->type) { \ + case UV_TCP: \ + uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req); \ + break; \ + \ + case UV_NAMED_PIPE: \ + uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req); \ + break; \ + \ + default: \ + assert(0); \ + } \ + } while (0) + + static void uv_process_reqs() { uv_req_t* req; - uv_handle_t* handle; while (req = uv_remove_pending_req()) { - handle = req->handle; - - switch (handle->type) { - case UV_TCP: - uv_tcp_return_req((uv_tcp_t*)handle, req); - break; - - case UV_NAMED_PIPE: - uv_pipe_return_req((uv_pipe_t*)handle, req); + switch (req->type) { + case UV_READ: + DELEGATE_STREAM_REQ(req, read); break; - case UV_ASYNC: - uv_async_return_req((uv_async_t*)handle, req); + case UV_WRITE: + DELEGATE_STREAM_REQ(req, write); break; - case UV_ARES: - uv_ares_process((uv_ares_action_t*)handle, req); + case UV_ACCEPT: + DELEGATE_STREAM_REQ(req, accept); break; - case UV_ARES_TASK: - uv_ares_task_cleanup((uv_ares_task_t*)handle, req); + case UV_CONNECT: + DELEGATE_STREAM_REQ(req, connect); break; - case UV_GETADDRINFO: - uv_getaddrinfo_done((uv_getaddrinfo_t*)handle, req); + case UV_WAKEUP: + uv_process_async_wakeup_req((uv_async_t*) req->handle, req); + break; + + case UV_ARES_EVENT_REQ: + uv_process_ares_event_req((uv_ares_action_t*) req->handle, req); + break; + + case UV_ARES_CLEANUP_REQ: + uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req); + break; + + case UV_GETADDRINFO_REQ: + uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req); break; default: @@ -2416,7 +2487,7 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) { if (selhandle == NULL) { uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); } - selhandle->type = UV_ARES; + selhandle->type = UV_ARES_EVENT; selhandle->close_cb = NULL; selhandle->data = sockhandle->data; selhandle->sock = sockhandle->sock; @@ -2425,7 +2496,7 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) { uv_ares_req = &selhandle->ares_req; uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL); - uv_ares_req->type = UV_WAKEUP; + uv_ares_req->type = UV_ARES_EVENT_REQ; /* post ares needs to called */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2473,7 +2544,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { /* Post request to cleanup the Task */ uv_ares_req = &uv_handle_ares->ares_req; uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL); - uv_ares_req->type = UV_WAKEUP; + uv_ares_req->type = UV_ARES_CLEANUP_REQ; /* post ares done with socket - finish cleanup when all threads done. */ if (!PostQueuedCompletionStatus(uv_iocp_, @@ -2518,7 +2589,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { uv_add_ares_handle(uv_handle_ares); uv_refs_++; - /* + /* * we have a single polling timer for all ares sockets. * This is preferred to using ares_timeout. See ares_timeout.c warning. * if timer is not running start it, and keep socket count @@ -2548,7 +2619,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) { } /* called via uv_poll when ares completion port signaled */ -void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) { +void uv_process_ares_event_req(uv_ares_action_t* handle, uv_req_t* req) { uv_ares_channel_t* uv_ares_data_ptr = (uv_ares_channel_t*)handle->data; ares_process_fd(uv_ares_data_ptr->channel, @@ -2560,7 +2631,7 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) { } /* called via uv_poll when ares is finished with socket */ -void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req) { +void uv_process_ares_cleanup_req(uv_ares_task_t* handle, uv_req_t* req) { /* check for event complete without waiting */ unsigned int signaled = WaitForSingleObject(handle->h_close_event, 0); @@ -2692,7 +2763,7 @@ static DWORD WINAPI getaddrinfo_thread_proc(void* parameter) { * and copy all structs and referenced strings into the one block. * Each size calculation is adjusted to avoid unaligned pointers. */ -static void uv_getaddrinfo_done(uv_getaddrinfo_t* handle, uv_req_t* req) { +static void uv_process_getaddrinfo_req(uv_getaddrinfo_t* handle, uv_req_t* req) { int addrinfo_len = 0; int name_len = 0; size_t addrinfo_struct_len = ALIGNED_SIZE(sizeof(struct addrinfo)); @@ -2903,7 +2974,7 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle, /* init request for Post handling */ uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL); - handle->getadddrinfo_req.type = UV_WAKEUP; + handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ; /* Ask thread to run. Treat this as a long operation */ if (QueueUserWorkItem(&getaddrinfo_thread_proc, handle, WT_EXECUTELONGFUNCTION) == 0) { @@ -2987,7 +3058,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { } maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount; - + for (i = 0; i < instanceCount; i++) { pipeHandle = CreateNamedPipe(handle->name, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, @@ -3026,7 +3097,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) { uv_pipe_queue_accept(handle); return 0; - + error: close_pipe(handle, NULL, NULL); uv_set_sys_error(errno); @@ -3040,12 +3111,12 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { uv_pipe_t* handle = (uv_pipe_t*)req->handle; assert(!(req->flags & UV_REQ_PENDING)); - + req->type = UV_CONNECT; handle->connection = &handle->clientConnection; handle->server = NULL; memset(&req->overlapped, 0, sizeof(req->overlapped)); - + handle->clientConnection.handle = CreateFile(name, GENERIC_READ | GENERIC_WRITE, 0, @@ -3053,27 +3124,27 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { OPEN_EXISTING, FILE_FLAG_OVERLAPPED, NULL); - + if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && GetLastError() != ERROR_IO_PENDING) { errno = GetLastError(); goto error; } - + mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { errno = GetLastError(); goto error; - } - + } + if (CreateIoCompletionPort(handle->clientConnection.handle, uv_iocp_, (ULONG_PTR)handle, 0) == NULL) { errno = GetLastError(); goto error; - } + } req->error = uv_ok_; req->flags |= UV_REQ_PENDING; @@ -3081,7 +3152,7 @@ int uv_pipe_connect(uv_req_t* req, const char* name) { uv_insert_pending_req(req); handle->reqs_pending++; return 0; - + error: close_pipe(handle, NULL, NULL); req->error = uv_new_sys_error(errno); @@ -3125,7 +3196,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { * The handle is for the pipe server. * To clean-up we close every connection instance that was made in uv_pipe_listen. */ - + if (handle->name) { free(handle->name); handle->name = NULL; @@ -3169,6 +3240,6 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) { } } } - + handle->flags |= UV_HANDLE_SHUT; }