uv-win: refactor request processing

This commit is contained in:
Bert Belder 2011-07-13 21:49:15 +02:00
parent 9c19391536
commit 28650425e2
2 changed files with 399 additions and 324 deletions

View File

@ -140,8 +140,8 @@ typedef enum {
UV_CHECK, UV_CHECK,
UV_IDLE, UV_IDLE,
UV_ASYNC, UV_ASYNC,
UV_ARES,
UV_ARES_TASK, UV_ARES_TASK,
UV_ARES_EVENT,
UV_GETADDRINFO UV_GETADDRINFO
} uv_handle_type; } uv_handle_type;
@ -152,7 +152,11 @@ typedef enum {
UV_READ, UV_READ,
UV_WRITE, UV_WRITE,
UV_SHUTDOWN, UV_SHUTDOWN,
UV_WAKEUP UV_WAKEUP,
/* TODO: remove the req suffix */
UV_ARES_EVENT_REQ,
UV_ARES_CLEANUP_REQ,
UV_GETADDRINFO_REQ
} uv_req_type; } uv_req_type;

View File

@ -239,8 +239,8 @@ struct uv_ares_action_s {
int write; int write;
}; };
void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req); void uv_process_ares_event_req(uv_ares_action_t* handle, uv_req_t* req);
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req); void uv_process_ares_cleanup_req(uv_ares_task_t* handle, uv_req_t* req);
void uv_ares_poll(uv_timer_t* handle, int status); void uv_ares_poll(uv_timer_t* handle, int status);
static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err); static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err);
@ -263,7 +263,7 @@ static uv_ares_channel_t uv_ares_data = { NULL, 0 };
/* getaddrinfo integration */ /* getaddrinfo integration */
static void uv_getaddrinfo_done(uv_getaddrinfo_t* handle, uv_req_t* req); static void uv_process_getaddrinfo_req(uv_getaddrinfo_t* handle, uv_req_t* req);
/* adjust size value to be multiple of 4. Use to keep pointer aligned */ /* adjust size value to be multiple of 4. Use to keep pointer aligned */
/* Do we need different versions of this for different architectures? */ /* Do we need different versions of this for different architectures? */
#define ALIGNED_SIZE(X) ((((X) + 3) >> 2) << 2) #define ALIGNED_SIZE(X) ((((X) + 3) >> 2) << 2)
@ -766,7 +766,7 @@ static void uv_process_endgames() {
case UV_TCP: case UV_TCP:
uv_tcp_endgame((uv_tcp_t*)handle); uv_tcp_endgame((uv_tcp_t*)handle);
break; break;
case UV_NAMED_PIPE: case UV_NAMED_PIPE:
uv_pipe_endgame((uv_pipe_t*)handle); uv_pipe_endgame((uv_pipe_t*)handle);
break; break;
@ -830,7 +830,7 @@ static int uv_close_error(uv_handle_t* handle, uv_err_t e) {
uv_want_endgame(handle); uv_want_endgame(handle);
} }
return 0; return 0;
case UV_NAMED_PIPE: case UV_NAMED_PIPE:
pipe = (uv_pipe_t*)handle; pipe = (uv_pipe_t*)handle;
pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING); pipe->flags &= ~(UV_HANDLE_READING | UV_HANDLE_LISTENING);
@ -1031,7 +1031,7 @@ static void uv_pipe_queue_accept(uv_pipe_t* handle) {
/* Prepare the overlapped structure. */ /* Prepare the overlapped structure. */
memset(&(req->overlapped), 0, sizeof(req->overlapped)); memset(&(req->overlapped), 0, sizeof(req->overlapped));
if (!ConnectNamedPipe(instance->handle, &req->overlapped) && if (!ConnectNamedPipe(instance->handle, &req->overlapped) &&
GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) { GetLastError() != ERROR_IO_PENDING && GetLastError() != ERROR_PIPE_CONNECTED) {
/* Make this req pending reporting an error. */ /* Make this req pending reporting an error. */
req->error = uv_new_sys_error(GetLastError()); req->error = uv_new_sys_error(GetLastError());
@ -1099,12 +1099,12 @@ static void uv_pipe_queue_read(uv_pipe_t* handle) {
req->type = UV_READ; req->type = UV_READ;
/* Do 0-read */ /* Do 0-read */
result = ReadFile(handle->connection->handle, result = ReadFile(handle->connection->handle,
&uv_zero_, &uv_zero_,
0, 0,
NULL, NULL,
&req->overlapped); &req->overlapped);
if (!result && GetLastError() != ERROR_IO_PENDING) { if (!result && GetLastError() != ERROR_IO_PENDING) {
/* Make this req pending reporting an error. */ /* Make this req pending reporting an error. */
req->error = uv_new_sys_error(WSAGetLastError()); req->error = uv_new_sys_error(WSAGetLastError());
@ -1150,7 +1150,7 @@ int uv_tcp_listen(uv_tcp_t* handle, int backlog, uv_connection_cb cb) {
static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) { static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
int rv = 0; int rv = 0;
if (server->accept_socket == INVALID_SOCKET) { if (server->accept_socket == INVALID_SOCKET) {
uv_set_sys_error(WSAENOTCONN); uv_set_sys_error(WSAENOTCONN);
return -1; return -1;
@ -1173,12 +1173,12 @@ static int uv_tcp_accept(uv_tcp_t* server, uv_tcp_t* client) {
} }
static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) { static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
assert(server->acceptConnection); assert(server->acceptConnection);
/* Make the connection instance active */ /* Make the connection instance active */
server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE; server->acceptConnection->state = UV_PIPEINSTANCE_ACTIVE;
/* Move the connection instance from server to client */ /* Move the connection instance from server to client */
client->connection = server->acceptConnection; client->connection = server->acceptConnection;
server->acceptConnection = NULL; server->acceptConnection = NULL;
@ -1189,7 +1189,7 @@ static int uv_pipe_accept(uv_pipe_t* server, uv_pipe_t* client) {
uv_init_connection((uv_stream_t*)client); uv_init_connection((uv_stream_t*)client);
client->flags |= UV_HANDLE_PIPESERVER; client->flags |= UV_HANDLE_PIPESERVER;
uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL); uv_req_init(&(client->read_req), (uv_handle_t*)client, NULL);
if (!(server->flags & UV_HANDLE_CLOSING)) { if (!(server->flags & UV_HANDLE_CLOSING)) {
uv_pipe_queue_accept(server); uv_pipe_queue_accept(server);
} }
@ -1206,7 +1206,7 @@ int uv_accept(uv_handle_t* server, uv_stream_t* client) {
} else if (server->type == UV_NAMED_PIPE) { } else if (server->type == UV_NAMED_PIPE) {
return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client); return uv_pipe_accept((uv_pipe_t*)server, (uv_pipe_t*)client);
} }
return -1; return -1;
} }
@ -1470,7 +1470,7 @@ int uv_pipe_write(uv_req_t* req, uv_buf_t bufs[], int bufcnt) {
uv_pipe_t* handle = (uv_pipe_t*) req->handle; uv_pipe_t* handle = (uv_pipe_t*) req->handle;
assert(!(req->flags & UV_REQ_PENDING)); assert(!(req->flags & UV_REQ_PENDING));
if (bufcnt != 1) { if (bufcnt != 1) {
uv_set_sys_error(UV_ENOTSUP); uv_set_sys_error(UV_ENOTSUP);
return -1; return -1;
@ -1558,7 +1558,18 @@ int uv_shutdown(uv_req_t* req) {
} }
static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) { #define DECREASE_PENDING_REQ_COUNT(handle) \
do { \
handle->reqs_pending--; \
\
if (handle->flags & UV_HANDLE_CLOSING && \
handle->reqs_pending == 0) { \
uv_want_endgame((uv_handle_t*)handle); \
} \
} while (0)
static void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
DWORD bytes, flags, err; DWORD bytes, flags, err;
uv_buf_t buf; uv_buf_t buf;
@ -1567,160 +1578,173 @@ static void uv_tcp_return_req(uv_tcp_t* handle, uv_req_t* req) {
/* Mark the request non-pending */ /* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING; req->flags &= ~UV_REQ_PENDING;
switch (req->type) { if (req->error.code != UV_OK) {
case UV_WRITE: /* An error occurred doing the 0-read. */
handle->write_queue_size -= req->queued_bytes; if ((handle->flags & UV_HANDLE_READING)) {
if (req->cb) { handle->flags &= ~UV_HANDLE_READING;
uv_last_error_ = req->error; uv_last_error_ = req->error;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); buf.base = 0;
} buf.len = 0;
handle->write_reqs_pending--; handle->read_cb((uv_stream_t*)handle, -1, buf);
if (handle->write_reqs_pending == 0 && }
handle->flags & UV_HANDLE_SHUTTING) { } else {
uv_want_endgame((uv_handle_t*)handle); /* Do nonblocking reads until the buffer is empty */
} while (handle->flags & UV_HANDLE_READING) {
break; buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
case UV_READ: flags = 0;
if (req->error.code != UV_OK) { if (WSARecv(handle->socket,
/* An error occurred doing the 0-read. */ (WSABUF*)&buf,
if (!(handle->flags & UV_HANDLE_READING)) { 1,
break; &bytes,
} &flags,
NULL,
/* Stop reading and report error. */ NULL) != SOCKET_ERROR) {
handle->flags &= ~UV_HANDLE_READING; if (bytes > 0) {
uv_last_error_ = req->error; /* Successful read */
buf.base = 0; handle->read_cb((uv_stream_t*)handle, bytes, buf);
buf.len = 0; /* Read again only if bytes == buf.len */
handle->read_cb((uv_stream_t*)handle, -1, buf); if (bytes < buf.len) {
break;
}
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~UV_HANDLE_READING;
handle->flags |= UV_HANDLE_EOF;
uv_last_error_.code = UV_EOF;
uv_last_error_.sys_errno_ = ERROR_SUCCESS;
handle->read_cb((uv_stream_t*)handle, -1, buf);
break; break;
} }
} else { } else {
err = WSAGetLastError(); /* Connection closed */
if (err == WSAEWOULDBLOCK) { handle->flags &= ~UV_HANDLE_READING;
/* Read buffer was completely empty, report a 0-byte read. */ handle->flags |= UV_HANDLE_EOF;
uv_set_sys_error(WSAEWOULDBLOCK); uv_last_error_.code = UV_EOF;
handle->read_cb((uv_stream_t*)handle, 0, buf); uv_last_error_.sys_errno_ = ERROR_SUCCESS;
} else { handle->read_cb((uv_stream_t*)handle, -1, buf);
/* Ouch! serious error. */
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
break; break;
} }
} } else {
/* Post another 0-read if still reading and not closing. */ err = WSAGetLastError();
if (handle->flags & UV_HANDLE_READING) { if (err == WSAEWOULDBLOCK) {
uv_tcp_queue_read(handle); /* Read buffer was completely empty, report a 0-byte read. */
} uv_set_sys_error(WSAEWOULDBLOCK);
break; handle->read_cb((uv_stream_t*)handle, 0, buf);
} else {
case UV_ACCEPT: /* Ouch! serious error. */
/* If handle->accepted_socket is not a valid socket, then */ uv_set_sys_error(err);
/* uv_queue_accept must have failed. This is a serious error. We stop */ handle->read_cb((uv_stream_t*)handle, -1, buf);
/* accepting connections and report this error to the connection */
/* callback. */
if (handle->accept_socket == INVALID_SOCKET) {
if (!(handle->flags & UV_HANDLE_LISTENING)) {
break;
}
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
uv_last_error_ = req->error;
handle->connection_cb((uv_handle_t*)handle, -1);
} }
break; break;
} }
}
if (req->error.code == UV_OK && /* Post another 0-read if still reading and not closing. */
setsockopt(handle->accept_socket, if (handle->flags & UV_HANDLE_READING) {
SOL_SOCKET, uv_tcp_queue_read(handle);
SO_UPDATE_ACCEPT_CONTEXT, }
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle);
}
}
break;
case UV_CONNECT:
if (req->cb) {
if (req->error.code == UV_OK) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_set_sys_error(WSAGetLastError());
((uv_connect_cb)req->cb)(req, -1);
}
} else {
uv_last_error_ = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
}
break;
default:
assert(0);
} }
/* The number of pending requests is now down by one */ DECREASE_PENDING_REQ_COUNT(handle);
handle->reqs_pending--;
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */
if (handle->flags & UV_HANDLE_CLOSING &&
handle->reqs_pending == 0) {
uv_want_endgame((uv_handle_t*)handle);
}
} }
static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) { static void uv_process_tcp_write_req(uv_tcp_t* handle, uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
uv_last_error_ = req->error;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1);
}
handle->write_reqs_pending--;
if (handle->flags & UV_HANDLE_SHUTTING &&
handle->write_reqs_pending == 0) {
uv_want_endgame((uv_handle_t*)handle);
}
DECREASE_PENDING_REQ_COUNT(handle);
}
static void uv_process_tcp_accept_req(uv_tcp_t* handle, uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
/* If handle->accepted_socket is not a valid socket, then */
/* uv_queue_accept must have failed. This is a serious error. We stop */
/* accepting connections and report this error to the connection */
/* callback. */
if (handle->accept_socket == INVALID_SOCKET) {
if (handle->flags & UV_HANDLE_LISTENING) {
handle->flags &= ~UV_HANDLE_LISTENING;
if (handle->connection_cb) {
uv_last_error_ = req->error;
handle->connection_cb((uv_handle_t*)handle, -1);
}
}
} else if (req->error.code == UV_OK &&
setsockopt(handle->accept_socket,
SOL_SOCKET,
SO_UPDATE_ACCEPT_CONTEXT,
(char*)&handle->socket,
sizeof(handle->socket)) == 0) {
/* Accept and SO_UPDATE_ACCEPT_CONTEXT were successful. */
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
/* Error related to accepted socket is ignored because the server */
/* socket may still be healthy. If the server socket is broken
/* uv_queue_accept will detect it. */
closesocket(handle->accept_socket);
if (handle->flags & UV_HANDLE_LISTENING) {
uv_tcp_queue_accept(handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
static void uv_process_tcp_connect_req(uv_tcp_t* handle, uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
if (req->cb) {
if (req->error.code == UV_OK) {
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_set_sys_error(WSAGetLastError());
((uv_connect_cb)req->cb)(req, -1);
}
} else {
uv_last_error_ = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
static void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
DWORD bytes, err, mode; DWORD bytes, err, mode;
uv_buf_t buf; uv_buf_t buf;
uv_pipe_instance_t* acceptingConn; uv_pipe_instance_t* acceptingConn;
@ -1730,148 +1754,173 @@ static void uv_pipe_return_req(uv_pipe_t* handle, uv_req_t* req) {
/* Mark the request non-pending */ /* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING; req->flags &= ~UV_REQ_PENDING;
switch (req->type) { if (req->error.code != UV_OK) {
case UV_WRITE: /* An error occurred doing the 0-read. */
handle->write_queue_size -= req->queued_bytes; if (handle->flags & UV_HANDLE_READING) {
if (req->cb) { /* Stop reading and report error. */
uv_last_error_ = req->error; handle->flags &= ~UV_HANDLE_READING;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1); uv_last_error_ = req->error;
} buf.base = 0;
handle->write_reqs_pending--; buf.len = 0;
if (handle->write_reqs_pending == 0 && handle->read_cb((uv_stream_t*)handle, -1, buf);
handle->flags & UV_HANDLE_SHUTTING) { }
uv_want_endgame((uv_handle_t*)handle); } else {
} /*
break; * Temporarily switch to non-blocking mode.
* This is so that ReadFile doesn't block if the read buffer is empty.
*/
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* We can't continue processing this read. */
handle->flags &= ~UV_HANDLE_READING;
uv_set_sys_error(GetLastError());
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
case UV_READ: /* Do non-blocking reads until the buffer is empty */
if (req->error.code != UV_OK) { while (handle->flags & UV_HANDLE_READING) {
/* An error occurred doing the 0-read. */ buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
if (!(handle->flags & UV_HANDLE_READING)) { assert(buf.len > 0);
break;
}
/* Stop reading and report error. */ if (ReadFile(handle->connection->handle,
handle->flags &= ~UV_HANDLE_READING; buf.base,
uv_last_error_ = req->error; buf.len,
buf.base = 0; &bytes,
buf.len = 0; NULL)) {
handle->read_cb((uv_stream_t*)handle, -1, buf); if (bytes > 0) {
break; /* Successful read */
} handle->read_cb((uv_stream_t*)handle, bytes, buf);
/* Read again only if bytes == buf.len */
/* Temporarily switch to non-blocking mode. if (bytes < buf.len) {
* This is so that ReadFile doesn't block if the read buffer is empty.
*/
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_NOWAIT;
if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* We can't continue processing this read. */
err = GetLastError();
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
break;
}
/* Do non-blocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
assert(buf.len > 0);
if (ReadFile(handle->connection->handle,
buf.base,
buf.len,
&bytes,
NULL)) {
if (bytes > 0) {
/* Successful read */
handle->read_cb((uv_stream_t*)handle, bytes, buf);
/* Read again only if bytes == buf.len */
if (bytes < buf.len) {
break;
}
} else {
/* Connection closed */
handle->flags &= ~UV_HANDLE_READING;
handle->flags |= UV_HANDLE_EOF;
uv_last_error_.code = UV_EOF;
uv_last_error_.sys_errno_ = ERROR_SUCCESS;
handle->read_cb((uv_stream_t*)handle, -1, buf);
break; break;
} }
} else { } else {
err = GetLastError(); /* Connection closed */
if (err == ERROR_NO_DATA) { handle->flags &= ~UV_HANDLE_READING;
/* Read buffer was completely empty, report a 0-byte read. */ handle->flags |= UV_HANDLE_EOF;
uv_set_sys_error(UV_EAGAIN); uv_last_error_.code = UV_EOF;
handle->read_cb((uv_stream_t*)handle, 0, buf); uv_last_error_.sys_errno_ = ERROR_SUCCESS;
} else {
/* Ouch! serious error. */
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
break;
}
}
if (handle->flags & UV_HANDLE_READING) {
/* Switch back to blocking mode so that we can use IOCP for 0-reads */
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* Report and continue. */
err = GetLastError();
uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf); handle->read_cb((uv_stream_t*)handle, -1, buf);
break; break;
} }
/* Post another 0-read if still reading and not closing. */
uv_pipe_queue_read(handle);
}
break;
case UV_ACCEPT:
if (req->error.code == UV_OK) {
/* Put the connection instance into accept state */
handle->acceptConnection = uv_req_to_pipeinstance(req);
handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED;
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else { } else {
/* Ignore errors and continue listening */ err = GetLastError();
if (handle->flags & UV_HANDLE_LISTENING) { if (err == ERROR_NO_DATA) {
uv_pipe_queue_accept(handle); /* Read buffer was completely empty, report a 0-byte read. */
} uv_set_sys_error(UV_EAGAIN);
} handle->read_cb((uv_stream_t*)handle, 0, buf);
break;
case UV_CONNECT:
if (req->cb) {
if (req->error.code == UV_OK) {
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else { } else {
uv_last_error_ = req->error; /* Ouch! serious error. */
((uv_connect_cb)req->cb)(req, -1); uv_set_sys_error(err);
handle->read_cb((uv_stream_t*)handle, -1, buf);
} }
break;
} }
break; }
default: /* TODO: if the read callback stops reading we can't start reading again
assert(0); because the pipe will still be in nowait mode. */
if (handle->flags & UV_HANDLE_READING) {
/* Switch back to blocking mode so that we can use IOCP for 0-reads */
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (SetNamedPipeHandleState(handle->connection->handle, &mode, NULL, NULL)) {
/* Post another 0-read */
uv_pipe_queue_read(handle);
} else {
/* Report and continue. */
/* We can't continue processing this read. */
handle->flags &= ~UV_HANDLE_READING;
uv_set_sys_error(GetLastError());
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, -1, buf);
}
}
} }
/* The number of pending requests is now down by one */ DECREASE_PENDING_REQ_COUNT(handle);
handle->reqs_pending--; }
/* Queue the handle's close callback if it is closing and there are no */
/* more pending requests. */ static void uv_process_pipe_write_req(uv_pipe_t* handle, uv_req_t* req) {
if (handle->flags & UV_HANDLE_CLOSING && DWORD bytes, err, mode;
handle->reqs_pending == 0) { uv_buf_t buf;
uv_pipe_instance_t* acceptingConn;
assert(handle->type == UV_NAMED_PIPE);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
handle->write_queue_size -= req->queued_bytes;
if (req->cb) {
uv_last_error_ = req->error;
((uv_write_cb)req->cb)(req, uv_last_error_.code == UV_OK ? 0 : -1);
}
handle->write_reqs_pending--;
if (handle->write_reqs_pending == 0 &&
handle->flags & UV_HANDLE_SHUTTING) {
uv_want_endgame((uv_handle_t*)handle); uv_want_endgame((uv_handle_t*)handle);
} }
DECREASE_PENDING_REQ_COUNT(handle);
}
static void uv_process_pipe_accept_req(uv_pipe_t* handle, uv_req_t* req) {
DWORD bytes, err, mode;
uv_buf_t buf;
uv_pipe_instance_t* acceptingConn;
assert(handle->type == UV_NAMED_PIPE);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
if (req->error.code == UV_OK) {
/* Put the connection instance into accept state */
handle->acceptConnection = uv_req_to_pipeinstance(req);
handle->acceptConnection->state = UV_PIPEINSTANCE_ACCEPTED;
if (handle->connection_cb) {
handle->connection_cb((uv_handle_t*)handle, 0);
}
} else {
/* Ignore errors and continue listening */
if (handle->flags & UV_HANDLE_LISTENING) {
uv_pipe_queue_accept(handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
}
static void uv_process_pipe_connect_req(uv_pipe_t* handle, uv_req_t* req) {
DWORD bytes, err, mode;
uv_buf_t buf;
uv_pipe_instance_t* acceptingConn;
assert(handle->type == UV_NAMED_PIPE);
/* Mark the request non-pending */
req->flags &= ~UV_REQ_PENDING;
if (req->cb) {
if (req->error.code == UV_OK) {
uv_init_connection((uv_stream_t*)handle);
((uv_connect_cb)req->cb)(req, 0);
} else {
uv_last_error_ = req->error;
((uv_connect_cb)req->cb)(req, -1);
}
}
DECREASE_PENDING_REQ_COUNT(handle);
} }
@ -2147,7 +2196,7 @@ int uv_async_send(uv_async_t* handle) {
} }
static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) { static void uv_process_async_wakeup_req(uv_async_t* handle, uv_req_t* req) {
assert(handle->type == UV_ASYNC); assert(handle->type == UV_ASYNC);
assert(req->type == UV_WAKEUP); assert(req->type == UV_WAKEUP);
@ -2161,36 +2210,58 @@ static void uv_async_return_req(uv_async_t* handle, uv_req_t* req) {
} }
#define DELEGATE_STREAM_REQ(req, method) \
do { \
switch (req->handle->type) { \
case UV_TCP: \
uv_process_tcp_##method##_req((uv_tcp_t*) req->handle, req); \
break; \
\
case UV_NAMED_PIPE: \
uv_process_pipe_##method##_req((uv_pipe_t*) req->handle, req); \
break; \
\
default: \
assert(0); \
} \
} while (0)
static void uv_process_reqs() { static void uv_process_reqs() {
uv_req_t* req; uv_req_t* req;
uv_handle_t* handle;
while (req = uv_remove_pending_req()) { while (req = uv_remove_pending_req()) {
handle = req->handle; switch (req->type) {
case UV_READ:
switch (handle->type) { DELEGATE_STREAM_REQ(req, read);
case UV_TCP:
uv_tcp_return_req((uv_tcp_t*)handle, req);
break;
case UV_NAMED_PIPE:
uv_pipe_return_req((uv_pipe_t*)handle, req);
break; break;
case UV_ASYNC: case UV_WRITE:
uv_async_return_req((uv_async_t*)handle, req); DELEGATE_STREAM_REQ(req, write);
break; break;
case UV_ARES: case UV_ACCEPT:
uv_ares_process((uv_ares_action_t*)handle, req); DELEGATE_STREAM_REQ(req, accept);
break; break;
case UV_ARES_TASK: case UV_CONNECT:
uv_ares_task_cleanup((uv_ares_task_t*)handle, req); DELEGATE_STREAM_REQ(req, connect);
break; break;
case UV_GETADDRINFO: case UV_WAKEUP:
uv_getaddrinfo_done((uv_getaddrinfo_t*)handle, req); uv_process_async_wakeup_req((uv_async_t*) req->handle, req);
break;
case UV_ARES_EVENT_REQ:
uv_process_ares_event_req((uv_ares_action_t*) req->handle, req);
break;
case UV_ARES_CLEANUP_REQ:
uv_process_ares_cleanup_req((uv_ares_task_t*) req->handle, req);
break;
case UV_GETADDRINFO_REQ:
uv_process_getaddrinfo_req((uv_getaddrinfo_t*) req->handle, req);
break; break;
default: default:
@ -2416,7 +2487,7 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) {
if (selhandle == NULL) { if (selhandle == NULL) {
uv_fatal_error(ERROR_OUTOFMEMORY, "malloc"); uv_fatal_error(ERROR_OUTOFMEMORY, "malloc");
} }
selhandle->type = UV_ARES; selhandle->type = UV_ARES_EVENT;
selhandle->close_cb = NULL; selhandle->close_cb = NULL;
selhandle->data = sockhandle->data; selhandle->data = sockhandle->data;
selhandle->sock = sockhandle->sock; selhandle->sock = sockhandle->sock;
@ -2425,7 +2496,7 @@ VOID CALLBACK uv_ares_socksignal_tp(void* parameter, BOOLEAN timerfired) {
uv_ares_req = &selhandle->ares_req; uv_ares_req = &selhandle->ares_req;
uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL); uv_req_init(uv_ares_req, (uv_handle_t*)selhandle, NULL);
uv_ares_req->type = UV_WAKEUP; uv_ares_req->type = UV_ARES_EVENT_REQ;
/* post ares needs to called */ /* post ares needs to called */
if (!PostQueuedCompletionStatus(uv_iocp_, if (!PostQueuedCompletionStatus(uv_iocp_,
@ -2473,7 +2544,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) {
/* Post request to cleanup the Task */ /* Post request to cleanup the Task */
uv_ares_req = &uv_handle_ares->ares_req; uv_ares_req = &uv_handle_ares->ares_req;
uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL); uv_req_init(uv_ares_req, (uv_handle_t*)uv_handle_ares, NULL);
uv_ares_req->type = UV_WAKEUP; uv_ares_req->type = UV_ARES_CLEANUP_REQ;
/* post ares done with socket - finish cleanup when all threads done. */ /* post ares done with socket - finish cleanup when all threads done. */
if (!PostQueuedCompletionStatus(uv_iocp_, if (!PostQueuedCompletionStatus(uv_iocp_,
@ -2518,7 +2589,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) {
uv_add_ares_handle(uv_handle_ares); uv_add_ares_handle(uv_handle_ares);
uv_refs_++; uv_refs_++;
/* /*
* we have a single polling timer for all ares sockets. * we have a single polling timer for all ares sockets.
* This is preferred to using ares_timeout. See ares_timeout.c warning. * This is preferred to using ares_timeout. See ares_timeout.c warning.
* if timer is not running start it, and keep socket count * if timer is not running start it, and keep socket count
@ -2548,7 +2619,7 @@ void uv_ares_sockstate_cb(void *data, ares_socket_t sock, int read, int write) {
} }
/* called via uv_poll when ares completion port signaled */ /* called via uv_poll when ares completion port signaled */
void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) { void uv_process_ares_event_req(uv_ares_action_t* handle, uv_req_t* req) {
uv_ares_channel_t* uv_ares_data_ptr = (uv_ares_channel_t*)handle->data; uv_ares_channel_t* uv_ares_data_ptr = (uv_ares_channel_t*)handle->data;
ares_process_fd(uv_ares_data_ptr->channel, ares_process_fd(uv_ares_data_ptr->channel,
@ -2560,7 +2631,7 @@ void uv_ares_process(uv_ares_action_t* handle, uv_req_t* req) {
} }
/* called via uv_poll when ares is finished with socket */ /* called via uv_poll when ares is finished with socket */
void uv_ares_task_cleanup(uv_ares_task_t* handle, uv_req_t* req) { void uv_process_ares_cleanup_req(uv_ares_task_t* handle, uv_req_t* req) {
/* check for event complete without waiting */ /* check for event complete without waiting */
unsigned int signaled = WaitForSingleObject(handle->h_close_event, 0); unsigned int signaled = WaitForSingleObject(handle->h_close_event, 0);
@ -2692,7 +2763,7 @@ static DWORD WINAPI getaddrinfo_thread_proc(void* parameter) {
* and copy all structs and referenced strings into the one block. * and copy all structs and referenced strings into the one block.
* Each size calculation is adjusted to avoid unaligned pointers. * Each size calculation is adjusted to avoid unaligned pointers.
*/ */
static void uv_getaddrinfo_done(uv_getaddrinfo_t* handle, uv_req_t* req) { static void uv_process_getaddrinfo_req(uv_getaddrinfo_t* handle, uv_req_t* req) {
int addrinfo_len = 0; int addrinfo_len = 0;
int name_len = 0; int name_len = 0;
size_t addrinfo_struct_len = ALIGNED_SIZE(sizeof(struct addrinfo)); size_t addrinfo_struct_len = ALIGNED_SIZE(sizeof(struct addrinfo));
@ -2903,7 +2974,7 @@ int uv_getaddrinfo(uv_getaddrinfo_t* handle,
/* init request for Post handling */ /* init request for Post handling */
uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL); uv_req_init(&handle->getadddrinfo_req, (uv_handle_t*)handle, NULL);
handle->getadddrinfo_req.type = UV_WAKEUP; handle->getadddrinfo_req.type = UV_GETADDRINFO_REQ;
/* Ask thread to run. Treat this as a long operation */ /* Ask thread to run. Treat this as a long operation */
if (QueueUserWorkItem(&getaddrinfo_thread_proc, handle, WT_EXECUTELONGFUNCTION) == 0) { if (QueueUserWorkItem(&getaddrinfo_thread_proc, handle, WT_EXECUTELONGFUNCTION) == 0) {
@ -2987,7 +3058,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
} }
maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount; maxInstances = instanceCount >= PIPE_UNLIMITED_INSTANCES ? PIPE_UNLIMITED_INSTANCES : instanceCount;
for (i = 0; i < instanceCount; i++) { for (i = 0; i < instanceCount; i++) {
pipeHandle = CreateNamedPipe(handle->name, pipeHandle = CreateNamedPipe(handle->name,
PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX | FILE_FLAG_OVERLAPPED,
@ -3026,7 +3097,7 @@ int uv_pipe_listen(uv_pipe_t* handle, int instanceCount, uv_connection_cb cb) {
uv_pipe_queue_accept(handle); uv_pipe_queue_accept(handle);
return 0; return 0;
error: error:
close_pipe(handle, NULL, NULL); close_pipe(handle, NULL, NULL);
uv_set_sys_error(errno); uv_set_sys_error(errno);
@ -3040,12 +3111,12 @@ int uv_pipe_connect(uv_req_t* req, const char* name) {
uv_pipe_t* handle = (uv_pipe_t*)req->handle; uv_pipe_t* handle = (uv_pipe_t*)req->handle;
assert(!(req->flags & UV_REQ_PENDING)); assert(!(req->flags & UV_REQ_PENDING));
req->type = UV_CONNECT; req->type = UV_CONNECT;
handle->connection = &handle->clientConnection; handle->connection = &handle->clientConnection;
handle->server = NULL; handle->server = NULL;
memset(&req->overlapped, 0, sizeof(req->overlapped)); memset(&req->overlapped, 0, sizeof(req->overlapped));
handle->clientConnection.handle = CreateFile(name, handle->clientConnection.handle = CreateFile(name,
GENERIC_READ | GENERIC_WRITE, GENERIC_READ | GENERIC_WRITE,
0, 0,
@ -3053,27 +3124,27 @@ int uv_pipe_connect(uv_req_t* req, const char* name) {
OPEN_EXISTING, OPEN_EXISTING,
FILE_FLAG_OVERLAPPED, FILE_FLAG_OVERLAPPED,
NULL); NULL);
if (handle->clientConnection.handle == INVALID_HANDLE_VALUE && if (handle->clientConnection.handle == INVALID_HANDLE_VALUE &&
GetLastError() != ERROR_IO_PENDING) { GetLastError() != ERROR_IO_PENDING) {
errno = GetLastError(); errno = GetLastError();
goto error; goto error;
} }
mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT; mode = PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT;
if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) { if (!SetNamedPipeHandleState(handle->clientConnection.handle, &mode, NULL, NULL)) {
errno = GetLastError(); errno = GetLastError();
goto error; goto error;
} }
if (CreateIoCompletionPort(handle->clientConnection.handle, if (CreateIoCompletionPort(handle->clientConnection.handle,
uv_iocp_, uv_iocp_,
(ULONG_PTR)handle, (ULONG_PTR)handle,
0) == NULL) { 0) == NULL) {
errno = GetLastError(); errno = GetLastError();
goto error; goto error;
} }
req->error = uv_ok_; req->error = uv_ok_;
req->flags |= UV_REQ_PENDING; req->flags |= UV_REQ_PENDING;
@ -3081,7 +3152,7 @@ int uv_pipe_connect(uv_req_t* req, const char* name) {
uv_insert_pending_req(req); uv_insert_pending_req(req);
handle->reqs_pending++; handle->reqs_pending++;
return 0; return 0;
error: error:
close_pipe(handle, NULL, NULL); close_pipe(handle, NULL, NULL);
req->error = uv_new_sys_error(errno); req->error = uv_new_sys_error(errno);
@ -3125,7 +3196,7 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
* The handle is for the pipe server. * The handle is for the pipe server.
* To clean-up we close every connection instance that was made in uv_pipe_listen. * To clean-up we close every connection instance that was made in uv_pipe_listen.
*/ */
if (handle->name) { if (handle->name) {
free(handle->name); free(handle->name);
handle->name = NULL; handle->name = NULL;
@ -3169,6 +3240,6 @@ static void close_pipe(uv_pipe_t* handle, int* status, uv_err_t* err) {
} }
} }
} }
handle->flags |= UV_HANDLE_SHUT; handle->flags |= UV_HANDLE_SHUT;
} }