Add handle->write_queue_size support for windows
This commit is contained in:
parent
4eb2c6b3f2
commit
a379649899
37
oio-win.c
37
oio-win.c
@ -137,7 +137,7 @@ static LPFN_TRANSMITFILE pTransmitFile;
|
|||||||
* Private oio_req flags.
|
* Private oio_req flags.
|
||||||
*/
|
*/
|
||||||
/* The request is currently queued. */
|
/* The request is currently queued. */
|
||||||
#define OIO_REQ_PENDING 0x01
|
#define OIO_REQ_PENDING 0x01
|
||||||
|
|
||||||
|
|
||||||
/* Binary tree used to keep the list of timers sorted. */
|
/* Binary tree used to keep the list of timers sorted. */
|
||||||
@ -428,6 +428,7 @@ static int oio_tcp_init_socket(oio_handle_t* handle, oio_close_cb close_cb,
|
|||||||
handle->socket = socket;
|
handle->socket = socket;
|
||||||
handle->close_cb = close_cb;
|
handle->close_cb = close_cb;
|
||||||
handle->data = data;
|
handle->data = data;
|
||||||
|
handle->write_queue_size = 0;
|
||||||
handle->type = OIO_TCP;
|
handle->type = OIO_TCP;
|
||||||
handle->flags = 0;
|
handle->flags = 0;
|
||||||
handle->reqs_pending = 0;
|
handle->reqs_pending = 0;
|
||||||
@ -925,9 +926,21 @@ int oio_connect(oio_req_t* req, struct sockaddr* addr) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static size_t oio_count_bufs(oio_buf bufs[], int count) {
|
||||||
|
size_t bytes = 0;
|
||||||
|
int i;
|
||||||
|
|
||||||
|
for (i = 0; i < count; i++) {
|
||||||
|
bytes += (size_t)bufs[i].len;
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int oio_write(oio_req_t* req, oio_buf bufs[], int bufcnt) {
|
int oio_write(oio_req_t* req, oio_buf bufs[], int bufcnt) {
|
||||||
int result;
|
int result;
|
||||||
DWORD bytes;
|
DWORD bytes, err;
|
||||||
oio_handle_t* handle = req->handle;
|
oio_handle_t* handle = req->handle;
|
||||||
|
|
||||||
assert(!(req->flags & OIO_REQ_PENDING));
|
assert(!(req->flags & OIO_REQ_PENDING));
|
||||||
@ -952,9 +965,22 @@ int oio_write(oio_req_t* req, oio_buf bufs[], int bufcnt) {
|
|||||||
0,
|
0,
|
||||||
&req->overlapped,
|
&req->overlapped,
|
||||||
NULL);
|
NULL);
|
||||||
if (result != 0 && WSAGetLastError() != ERROR_IO_PENDING) {
|
if (result != 0) {
|
||||||
oio_set_sys_error(WSAGetLastError());
|
err = WSAGetLastError();
|
||||||
return -1;
|
if (err != WSA_IO_PENDING) {
|
||||||
|
/* Send faild due to an error */
|
||||||
|
oio_set_sys_error(WSAGetLastError());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (result == 0) {
|
||||||
|
/* Request completed immediately */
|
||||||
|
req->queued_bytes = 0;
|
||||||
|
} else {
|
||||||
|
/* Request queued by the kernel */
|
||||||
|
req->queued_bytes = oio_count_bufs(bufs, bufcnt);
|
||||||
|
handle->write_queue_size += req->queued_bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
req->flags |= OIO_REQ_PENDING;
|
req->flags |= OIO_REQ_PENDING;
|
||||||
@ -1005,6 +1031,7 @@ static void oio_tcp_return_req(oio_handle_t* handle, oio_req_t* req) {
|
|||||||
switch (req->type) {
|
switch (req->type) {
|
||||||
case OIO_WRITE:
|
case OIO_WRITE:
|
||||||
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
|
success = GetOverlappedResult(handle->handle, &req->overlapped, &bytes, FALSE);
|
||||||
|
handle->write_queue_size -= req->queued_bytes;
|
||||||
if (!success) {
|
if (!success) {
|
||||||
oio_set_sys_error(GetLastError());
|
oio_set_sys_error(GetLastError());
|
||||||
oio_close_error(handle, oio_last_error_);
|
oio_close_error(handle, oio_last_error_);
|
||||||
|
|||||||
@ -44,7 +44,10 @@ typedef struct oio_buf {
|
|||||||
#define oio_req_private_fields \
|
#define oio_req_private_fields \
|
||||||
union { \
|
union { \
|
||||||
/* Used by I/O operations */ \
|
/* Used by I/O operations */ \
|
||||||
OVERLAPPED overlapped; \
|
struct { \
|
||||||
|
OVERLAPPED overlapped; \
|
||||||
|
size_t queued_bytes; \
|
||||||
|
}; \
|
||||||
/* Used by timers */ \
|
/* Used by timers */ \
|
||||||
struct { \
|
struct { \
|
||||||
RB_ENTRY(oio_req_s) tree_entry; \
|
RB_ENTRY(oio_req_s) tree_entry; \
|
||||||
|
|||||||
2
oio.h
2
oio.h
@ -150,6 +150,8 @@ struct oio_handle_s {
|
|||||||
/* public */
|
/* public */
|
||||||
oio_close_cb close_cb;
|
oio_close_cb close_cb;
|
||||||
void* data;
|
void* data;
|
||||||
|
/* number of bytes queued for writing */
|
||||||
|
size_t write_queue_size;
|
||||||
/* private */
|
/* private */
|
||||||
oio_handle_private_fields
|
oio_handle_private_fields
|
||||||
};
|
};
|
||||||
|
|||||||
@ -59,6 +59,9 @@ static void shutdown_cb(oio_req_t* req, int status) {
|
|||||||
ASSERT(req);
|
ASSERT(req);
|
||||||
ASSERT(status == 0);
|
ASSERT(status == 0);
|
||||||
|
|
||||||
|
/* The write buffer should be empty by now. */
|
||||||
|
ASSERT(req->handle->write_queue_size == 0);
|
||||||
|
|
||||||
/* Now we wait for the EOF */
|
/* Now we wait for the EOF */
|
||||||
shutdown_cb_called++;
|
shutdown_cb_called++;
|
||||||
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user