added unchecked writes

This commit is contained in:
Michele Caini 2016-12-01 12:54:14 +01:00
parent c478680a1d
commit 7080ce0923
5 changed files with 219 additions and 37 deletions

View File

@ -464,6 +464,9 @@ public:
/**
* @brief Async [write](http://linux.die.net/man/2/pwritev).
*
* The request takes the ownership of the data and it is in charge of delete
* them.
*
* Emit a `FsEvent<FileReq::Type::WRITE>` event when completed.<br/>
* Emit an ErrorEvent event in case of errors.
*
@ -477,6 +480,24 @@ public:
cleanupAndInvoke(&uv_fs_write, parent(), get(), file, bufs, 1, offset, &fsResultCallback<Type::WRITE>);
}
/**
* @brief Async [write](http://linux.die.net/man/2/pwritev).
*
* The request doesn't take the ownership of the data. Be sure that their
* lifetime overcome the one of the request.
*
* Emit a `FsEvent<FileReq::Type::WRITE>` event when completed.<br/>
* Emit an ErrorEvent event in case of errors.
*
* @param data The data to be written.
* @param len The lenght of the submitted data.
* @param offset Offset, as described in the official documentation.
*/
void write(char *data, std::size_t len, int64_t offset) {
uv_buf_t bufs[] = { uv_buf_init(data, len) };
cleanupAndInvoke(&uv_fs_write, parent(), get(), file, bufs, 1, offset, &fsResultCallback<Type::WRITE>);
}
/**
* @brief Sync [write](http://linux.die.net/man/2/pwritev).
*

View File

@ -167,7 +167,7 @@ public:
auto def = uv_default_loop();
if(def) {
auto ptr = std::unique_ptr<uv_loop_t, Deleter>(def, [](uv_loop_t *){ });
auto ptr = std::unique_ptr<uv_loop_t, Deleter>(def, [](uv_loop_t *){});
loop = std::shared_ptr<Loop>(new Loop{std::move(ptr)});
}

View File

@ -94,23 +94,14 @@ struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
class WriteReq final: public Request<WriteReq, uv_write_t> {
public:
using Deleter = void(*)(uv_buf_t *);
template<std::size_t N>
static void deleter(uv_buf_t *bufs) {
std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; });
delete[] bufs;
}
public:
template<std::size_t N>
WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, const uv_buf_t (&arr)[N])
WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<uv_buf_t[], Deleter> bufs, std::size_t nbufs)
: Request<WriteReq, uv_write_t>{std::move(ca), std::move(loop)},
bufs{new uv_buf_t[N], &deleter<N>},
nbufs{N}
{
std::copy_n(std::begin(arr), N, bufs.get());
}
bufs{std::move(bufs)},
nbufs{nbufs}
{ }
void write(uv_stream_t *handle) {
invoke(&uv_write, get(), handle, bufs.get(), nbufs, &defaultCallback<WriteEvent>);
@ -250,7 +241,9 @@ public:
/**
* @brief Writes data to the stream.
*
* Data are written in order.<br/>
* Data are written in order. The handle takes the ownership of the data and
* it is in charge of delete them.
*
* A WriteEvent event will be emitted when the data have been written.<br/>
* An ErrorEvent event will be emitted in case of errors.
*
@ -258,18 +251,55 @@ public:
* @param len The lenght of the submitted data.
*/
void write(std::unique_ptr<char[]> data, std::size_t len) {
const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) };
constexpr std::size_t N = 1;
auto write = this->loop().template resource<details::WriteReq>(
std::unique_ptr<uv_buf_t[], details::WriteReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data.release(), len) },
[](uv_buf_t *bufs) {
std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; });
delete[] bufs;
}
}, N);
auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) {
ptr->publish(event);
};
auto write = this->loop().template resource<details::WriteReq>(bufs);
write->template once<ErrorEvent>(listener);
write->template once<WriteEvent>(listener);
write->write(this->template get<uv_stream_t>());
}
/**
* @brief Writes data to the stream.
*
* Data are written in order. The handle doesn't take the ownership of the
* data. Be sure that their lifetime overcome the one of the request.
*
* A WriteEvent event will be emitted when the data have been written.<br/>
* An ErrorEvent event will be emitted in case of errors.
*
* @param data The data to be written to the stream.
* @param len The lenght of the submitted data.
*/
void write(char *data, std::size_t len) {
constexpr std::size_t N = 1;
auto write = this->loop().template resource<details::WriteReq>(
std::unique_ptr<uv_buf_t[], details::WriteReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data, len) },
[](uv_buf_t *bufs) { delete[] bufs; }
}, N);
auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) {
ptr->publish(event);
};
write->template once<ErrorEvent>(listener);
write->template once<WriteEvent>(listener);
write->write(this->template get<uv_stream_t>());
}
/**
* @brief Extended write function for sending handles over a pipe handle.
@ -280,6 +310,9 @@ public:
* connection (listening or connected state). Bound sockets or pipes will be
* assumed to be servers.
*
* The handle takes the ownership of the data and it is in charge of delete
* them.
*
* A WriteEvent event will be emitted when the data have been written.<br/>
* An ErrorEvent wvent will be emitted in case of errors.
*
@ -289,13 +322,59 @@ public:
*/
template<typename S>
void write(S &send, std::unique_ptr<char[]> data, std::size_t len) {
const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) };
constexpr std::size_t N = 1;
auto write = this->loop().template resource<details::WriteReq>(
std::unique_ptr<uv_buf_t[], details::WriteReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data.release(), len) },
[](uv_buf_t *bufs) {
std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; });
delete[] bufs;
}
}, N);
auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) {
ptr->publish(event);
};
write->template once<ErrorEvent>(listener);
write->template once<WriteEvent>(listener);
write->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());
}
/**
* @brief Extended write function for sending handles over a pipe handle.
*
* The pipe must be initialized with `ipc == true`.
*
* `send` must be a TcpHandle or PipeHandle handle, which is a server or a
* connection (listening or connected state). Bound sockets or pipes will be
* assumed to be servers.
*
* The handle doesn't take the ownership of the data. Be sure that their
* lifetime overcome the one of the request.
*
* A WriteEvent event will be emitted when the data have been written.<br/>
* An ErrorEvent wvent will be emitted in case of errors.
*
* @param send The handle over which to write data.
* @param data The data to be written to the stream.
* @param len The lenght of the submitted data.
*/
template<typename S>
void write(S &send, char *data, std::size_t len) {
constexpr std::size_t N = 1;
auto write = this->loop().template resource<details::WriteReq>(
std::unique_ptr<uv_buf_t[], details::WriteReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data, len) },
[](uv_buf_t *bufs) { delete[] bufs; }
}, N);
auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) {
ptr->publish(event);
};
auto write = this->loop().template resource<details::WriteReq>(bufs);
write->template once<ErrorEvent>(listener);
write->template once<WriteEvent>(listener);
write->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());

View File

@ -59,23 +59,14 @@ enum class UVMembership: std::underlying_type_t<uv_membership> {
class SendReq final: public Request<SendReq, uv_udp_send_t> {
public:
using Deleter = void(*)(uv_buf_t *);
template<std::size_t N>
static void deleter(uv_buf_t *bufs) {
std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; });
delete[] bufs;
}
public:
template<std::size_t N>
SendReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, const uv_buf_t (&arr)[N])
SendReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<uv_buf_t[], Deleter> bufs, std::size_t nbufs)
: Request<SendReq, uv_udp_send_t>{std::move(ca), std::move(loop)},
bufs{new uv_buf_t[N], &deleter<N>},
nbufs{N}
{
std::copy_n(std::begin(arr), N, bufs.get());
}
bufs{std::move(bufs)},
nbufs{nbufs}
{ }
void send(uv_udp_t *handle, const struct sockaddr* addr) {
invoke(&uv_udp_send, get(), handle, bufs.get(), nbufs, addr, &defaultCallback<SendEvent>);
@ -293,6 +284,9 @@ public:
* will be bound to `0.0.0.0` (the _all interfaces_ IPv4 address) and a
* random port number.
*
* The handle takes the ownership of the data and it is in charge of delete
* them.
*
* A SendEvent event will be emitted when the data have been sent.<br/>
* An ErrorEvent event will be emitted in case of errors.
*
@ -303,16 +297,64 @@ public:
*/
template<typename I = IPv4>
void send(std::string ip, unsigned int port, std::unique_ptr<char[]> data, std::size_t len) {
constexpr std::size_t N = 1;
typename details::IpTraits<I>::Type addr;
details::IpTraits<I>::addrFunc(ip.data(), port, &addr);
const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) };
auto send = loop().resource<details::SendReq>(
std::unique_ptr<uv_buf_t[], details::SendReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data.release(), len) },
[](uv_buf_t *bufs) {
std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; });
delete[] bufs;
}
}, N);
auto listener = [ptr = shared_from_this()](const auto &event, details::SendReq &) {
ptr->publish(event);
};
send->once<ErrorEvent>(listener);
send->once<SendEvent>(listener);
send->send(get(), reinterpret_cast<const sockaddr *>(&addr));
}
/**
* @brief Sends data over the UDP socket.
*
* Note that if the socket has not previously been bound with `bind()`, it
* will be bound to `0.0.0.0` (the _all interfaces_ IPv4 address) and a
* random port number.
*
* The handle doesn't take the ownership of the data. Be sure that their
* lifetime overcome the one of the request.
*
* A SendEvent event will be emitted when the data have been sent.<br/>
* An ErrorEvent event will be emitted in case of errors.
*
* @param ip The address to which to send data.
* @param port The port to which to send data.
* @param data The data to be sent.
* @param len The lenght of the submitted data.
*/
template<typename I = IPv4>
void send(std::string ip, unsigned int port, char *data, std::size_t len) {
constexpr std::size_t N = 1;
typename details::IpTraits<I>::Type addr;
details::IpTraits<I>::addrFunc(ip.data(), port, &addr);
auto send = loop().resource<details::SendReq>(
std::unique_ptr<uv_buf_t[], details::SendReq::Deleter>{
new uv_buf_t[N]{ uv_buf_init(data, len) },
[](uv_buf_t *bufs) { delete[] bufs; }
}, N);
auto listener = [ptr = shared_from_this()](const auto &event, details::SendReq &) {
ptr->publish(event);
};
auto send = loop().resource<details::SendReq>(bufs);
send->once<ErrorEvent>(listener);
send->once<SendEvent>(listener);
send->send(get(), reinterpret_cast<const sockaddr *>(&addr));

View File

@ -54,7 +54,7 @@ TEST(FileReq, OpenAndCloseSync) {
}
TEST(FileReq, RW) {
TEST(FileReq, RWChecked) {
const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"};
auto loop = uvw::Loop::getDefault();
@ -93,6 +93,46 @@ TEST(FileReq, RW) {
}
TEST(FileReq, RWUnchecked) {
const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"};
std::unique_ptr<char[]> data{new char[1]{ 42 }};
auto loop = uvw::Loop::getDefault();
auto request = loop->resource<uvw::FileReq>();
bool checkFileWriteEvent = false;
bool checkFileReadEvent = false;
request->on<uvw::ErrorEvent>([](const auto &, auto &) {
FAIL();
});
request->on<uvw::FsEvent<uvw::FileReq::Type::READ>>([&checkFileReadEvent](const auto &event, auto &request) {
ASSERT_FALSE(checkFileReadEvent);
ASSERT_EQ(event.data[0], 42);
checkFileReadEvent = true;
request.close();
});
request->on<uvw::FsEvent<uvw::FileReq::Type::WRITE>>([&checkFileWriteEvent](const auto &, auto &request) {
ASSERT_FALSE(checkFileWriteEvent);
checkFileWriteEvent = true;
request.read(0, 1);
});
request->on<uvw::FsEvent<uvw::FileReq::Type::OPEN>>([&data](const auto &, auto &request) {
request.write(data.get(), 1, 0);
});
request->open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644);
loop->run();
ASSERT_TRUE(checkFileWriteEvent);
ASSERT_TRUE(checkFileReadEvent);
}
TEST(FileReq, RWSync) {
const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"};