unix: allow nbufs > IOV_MAX in uv_fs_{read,write}

This allows writing and reading any amount of buffers,
regardless of what IOV_MAX may be defined as.

It also moves the IOV_MAX test from stream to core.

This is based on the excellent work of @bwijen in #269.

Refs: https://github.com/libuv/libuv/pull/269
PR-URL: https://github.com/libuv/libuv/pull/448
Reviewed-By: Saúl Ibarra Corretgé <saghul@gmail.com>
This commit is contained in:
ronkorving 2015-08-04 17:07:54 +09:00 committed by Saúl Ibarra Corretgé
parent 0b4f4889f1
commit 2bf782777f
6 changed files with 248 additions and 21 deletions

View File

@ -35,7 +35,7 @@
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <limits.h> /* INT_MAX, PATH_MAX */
#include <limits.h> /* INT_MAX, PATH_MAX, IOV_MAX */
#include <sys/uio.h> /* writev */
#include <sys/resource.h> /* getrusage */
#include <pwd.h>
@ -199,6 +199,19 @@ void uv__make_close_pending(uv_handle_t* handle) {
handle->loop->closing_handles = handle;
}
int uv__getiovmax(void) {
#if defined(IOV_MAX)
return IOV_MAX;
#elif defined(_SC_IOV_MAX)
static int iovmax = -1;
if (iovmax == -1)
iovmax = sysconf(_SC_IOV_MAX);
return iovmax;
#else
return 1024;
#endif
}
static void uv__finish_close(uv_handle_t* handle) {
/* Note: while the handle is in the UV_CLOSING state now, it's still possible

View File

@ -309,8 +309,6 @@ static ssize_t uv__fs_read(uv_fs_t* req) {
}
done:
if (req->bufs != req->bufsml)
uv__free(req->bufs);
return result;
}
@ -670,9 +668,6 @@ done:
pthread_mutex_unlock(&lock);
#endif
if (req->bufs != req->bufsml)
uv__free(req->bufs);
return r;
}
@ -777,6 +772,47 @@ static int uv__fs_fstat(int fd, uv_stat_t *buf) {
}
typedef ssize_t (*uv__fs_buf_iter_processor)(uv_fs_t* req);
static ssize_t uv__fs_buf_iter(uv_fs_t* req, uv__fs_buf_iter_processor process) {
unsigned int iovmax;
unsigned int nbufs;
uv_buf_t* bufs;
ssize_t total;
ssize_t result;
iovmax = uv__getiovmax();
nbufs = req->nbufs;
bufs = req->bufs;
total = 0;
while (nbufs > 0) {
req->nbufs = nbufs;
if (req->nbufs > iovmax)
req->nbufs = iovmax;
result = process(req);
if (result <= 0) {
if (total == 0)
total = result;
break;
}
if (req->off >= 0)
req->off += result;
req->bufs += req->nbufs;
nbufs -= req->nbufs;
total += result;
}
if (bufs != req->bufsml)
uv__free(bufs);
req->bufs = NULL;
return total;
}
static void uv__fs_work(struct uv__work* w) {
int retry_on_eintr;
uv_fs_t* req;
@ -810,7 +846,7 @@ static void uv__fs_work(struct uv__work* w) {
X(MKDIR, mkdir(req->path, req->mode));
X(MKDTEMP, uv__fs_mkdtemp(req));
X(OPEN, uv__fs_open(req));
X(READ, uv__fs_read(req));
X(READ, uv__fs_buf_iter(req, uv__fs_read));
X(SCANDIR, uv__fs_scandir(req));
X(READLINK, uv__fs_readlink(req));
X(RENAME, rename(req->path, req->new_path));
@ -820,7 +856,7 @@ static void uv__fs_work(struct uv__work* w) {
X(SYMLINK, symlink(req->path, req->new_path));
X(UNLINK, unlink(req->path));
X(UTIME, uv__fs_utime(req));
X(WRITE, uv__fs_write(req));
X(WRITE, uv__fs_buf_iter(req, uv__fs_write));
default: abort();
}
#undef X

View File

@ -178,6 +178,7 @@ int uv__socket(int domain, int type, int protocol);
int uv__dup(int fd);
ssize_t uv__recvmsg(int fd, struct msghdr *msg, int flags);
void uv__make_close_pending(uv_handle_t* handle);
int uv__getiovmax(void);
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events);

View File

@ -739,19 +739,6 @@ static int uv__handle_fd(uv_handle_t* handle) {
}
}
static int uv__getiovmax() {
#if defined(IOV_MAX)
return IOV_MAX;
#elif defined(_SC_IOV_MAX)
static int iovmax = -1;
if (iovmax == -1)
iovmax = sysconf(_SC_IOV_MAX);
return iovmax;
#else
return 1024;
#endif
}
static void uv__write(uv_stream_t* stream) {
struct iovec* iov;
QUEUE* q;

View File

@ -2328,3 +2328,189 @@ TEST_IMPL(fs_write_multiple_bufs) {
MAKE_VALGRIND_HAPPY();
return 0;
}
TEST_IMPL(fs_write_alotof_bufs) {
const size_t iovcount = 54321;
uv_buf_t* iovs;
char* buffer;
size_t index;
int r;
/* Setup. */
unlink("test_file");
loop = uv_default_loop();
iovs = malloc(sizeof(*iovs) * iovcount);
ASSERT(iovs != NULL);
r = uv_fs_open(loop,
&open_req1,
"test_file",
O_RDWR | O_CREAT,
S_IWUSR | S_IRUSR,
NULL);
ASSERT(r >= 0);
ASSERT(open_req1.result >= 0);
uv_fs_req_cleanup(&open_req1);
for (index = 0; index < iovcount; ++index)
iovs[index] = uv_buf_init(test_buf, sizeof(test_buf));
r = uv_fs_write(loop,
&write_req,
open_req1.result,
iovs,
iovcount,
-1,
NULL);
ASSERT(r >= 0);
ASSERT((size_t)write_req.result == sizeof(test_buf) * iovcount);
uv_fs_req_cleanup(&write_req);
/* Read the strings back to separate buffers. */
buffer = malloc(sizeof(test_buf) * iovcount);
ASSERT(buffer != NULL);
for (index = 0; index < iovcount; ++index)
iovs[index] = uv_buf_init(buffer + index * sizeof(test_buf),
sizeof(test_buf));
r = uv_fs_read(loop, &read_req, open_req1.result, iovs, iovcount, 0, NULL);
ASSERT(r >= 0);
ASSERT((size_t)read_req.result == sizeof(test_buf) * iovcount);
for (index = 0; index < iovcount; ++index)
ASSERT(strncmp(buffer + index * sizeof(test_buf),
test_buf,
sizeof(test_buf)) == 0);
uv_fs_req_cleanup(&read_req);
free(buffer);
iov = uv_buf_init(buf, sizeof(buf));
r = uv_fs_read(loop,
&read_req,
open_req1.result,
&iov,
1,
read_req.result,
NULL);
ASSERT(r == 0);
ASSERT(read_req.result == 0);
uv_fs_req_cleanup(&read_req);
r = uv_fs_close(loop, &close_req, open_req1.result, NULL);
ASSERT(r == 0);
ASSERT(close_req.result == 0);
uv_fs_req_cleanup(&close_req);
/* Cleanup */
unlink("test_file");
free(iovs);
MAKE_VALGRIND_HAPPY();
return 0;
}
TEST_IMPL(fs_write_alotof_bufs_with_offset) {
const size_t iovcount = 54321;
uv_buf_t* iovs;
char* buffer;
size_t index;
int r;
int64_t offset;
char* filler = "0123456789";
int filler_len = strlen(filler);
/* Setup. */
unlink("test_file");
loop = uv_default_loop();
iovs = malloc(sizeof(*iovs) * iovcount);
ASSERT(iovs != NULL);
r = uv_fs_open(loop,
&open_req1,
"test_file",
O_RDWR | O_CREAT,
S_IWUSR | S_IRUSR,
NULL);
ASSERT(r >= 0);
ASSERT(open_req1.result >= 0);
uv_fs_req_cleanup(&open_req1);
iov = uv_buf_init(filler, filler_len);
r = uv_fs_write(loop, &write_req, open_req1.result, &iov, 1, -1, NULL);
ASSERT(r == filler_len);
ASSERT(write_req.result == filler_len);
uv_fs_req_cleanup(&write_req);
offset = (int64_t)r;
for (index = 0; index < iovcount; ++index)
iovs[index] = uv_buf_init(test_buf, sizeof(test_buf));
r = uv_fs_write(loop,
&write_req,
open_req1.result,
iovs,
iovcount,
offset,
NULL);
ASSERT(r >= 0);
ASSERT((size_t)write_req.result == sizeof(test_buf) * iovcount);
uv_fs_req_cleanup(&write_req);
/* Read the strings back to separate buffers. */
buffer = malloc(sizeof(test_buf) * iovcount);
ASSERT(buffer != NULL);
for (index = 0; index < iovcount; ++index)
iovs[index] = uv_buf_init(buffer + index * sizeof(test_buf),
sizeof(test_buf));
r = uv_fs_read(loop, &read_req, open_req1.result, iovs, iovcount, offset, NULL);
ASSERT(r >= 0);
ASSERT(read_req.result == sizeof(test_buf) * iovcount);
for (index = 0; index < iovcount; ++index)
ASSERT(strncmp(buffer + index * sizeof(test_buf),
test_buf,
sizeof(test_buf)) == 0);
uv_fs_req_cleanup(&read_req);
free(buffer);
r = uv_fs_stat(loop, &stat_req, "test_file", NULL);
ASSERT(r == 0);
ASSERT((int64_t)((uv_stat_t*)stat_req.ptr)->st_size ==
offset + (int64_t)(iovcount * sizeof(test_buf)));
uv_fs_req_cleanup(&stat_req);
iov = uv_buf_init(buf, sizeof(buf));
r = uv_fs_read(loop,
&read_req,
open_req1.result,
&iov,
1,
read_req.result + offset,
NULL);
ASSERT(r == 0);
ASSERT(read_req.result == 0);
uv_fs_req_cleanup(&read_req);
r = uv_fs_close(loop, &close_req, open_req1.result, NULL);
ASSERT(r == 0);
ASSERT(close_req.result == 0);
uv_fs_req_cleanup(&close_req);
/* Cleanup */
unlink("test_file");
free(iovs);
MAKE_VALGRIND_HAPPY();
return 0;
}

View File

@ -278,6 +278,8 @@ TEST_DECLARE (fs_scandir_file)
TEST_DECLARE (fs_open_dir)
TEST_DECLARE (fs_rename_to_existing_file)
TEST_DECLARE (fs_write_multiple_bufs)
TEST_DECLARE (fs_write_alotof_bufs)
TEST_DECLARE (fs_write_alotof_bufs_with_offset)
TEST_DECLARE (threadpool_queue_work_simple)
TEST_DECLARE (threadpool_queue_work_einval)
TEST_DECLARE (threadpool_multiple_event_loops)
@ -692,6 +694,8 @@ TASK_LIST_START
TEST_ENTRY (fs_open_dir)
TEST_ENTRY (fs_rename_to_existing_file)
TEST_ENTRY (fs_write_multiple_bufs)
TEST_ENTRY (fs_write_alotof_bufs)
TEST_ENTRY (fs_write_alotof_bufs_with_offset)
TEST_ENTRY (threadpool_queue_work_simple)
TEST_ENTRY (threadpool_queue_work_einval)
TEST_ENTRY (threadpool_multiple_event_loops)