From 7080ce09238d2da4b9953bdf20f9c528dd371351 Mon Sep 17 00:00:00 2001 From: Michele Caini Date: Thu, 1 Dec 2016 12:54:14 +0100 Subject: [PATCH] added unchecked writes --- src/uvw/fs.hpp | 21 ++++++++ src/uvw/loop.hpp | 2 +- src/uvw/stream.hpp | 117 +++++++++++++++++++++++++++++++++++------- src/uvw/udp.hpp | 74 ++++++++++++++++++++------ test/uvw/file_req.cpp | 42 ++++++++++++++- 5 files changed, 219 insertions(+), 37 deletions(-) diff --git a/src/uvw/fs.hpp b/src/uvw/fs.hpp index b118a5a2..6028289d 100644 --- a/src/uvw/fs.hpp +++ b/src/uvw/fs.hpp @@ -464,6 +464,9 @@ public: /** * @brief Async [write](http://linux.die.net/man/2/pwritev). * + * The request takes the ownership of the data and it is in charge of delete + * them. + * * Emit a `FsEvent` event when completed.
* Emit an ErrorEvent event in case of errors. * @@ -477,6 +480,24 @@ public: cleanupAndInvoke(&uv_fs_write, parent(), get(), file, bufs, 1, offset, &fsResultCallback); } + /** + * @brief Async [write](http://linux.die.net/man/2/pwritev). + * + * The request doesn't take the ownership of the data. Be sure that their + * lifetime overcome the one of the request. + * + * Emit a `FsEvent` event when completed.
+ * Emit an ErrorEvent event in case of errors. + * + * @param data The data to be written. + * @param len The lenght of the submitted data. + * @param offset Offset, as described in the official documentation. + */ + void write(char *data, std::size_t len, int64_t offset) { + uv_buf_t bufs[] = { uv_buf_init(data, len) }; + cleanupAndInvoke(&uv_fs_write, parent(), get(), file, bufs, 1, offset, &fsResultCallback); + } + /** * @brief Sync [write](http://linux.die.net/man/2/pwritev). * diff --git a/src/uvw/loop.hpp b/src/uvw/loop.hpp index 11c5cb68..321c82f9 100644 --- a/src/uvw/loop.hpp +++ b/src/uvw/loop.hpp @@ -167,7 +167,7 @@ public: auto def = uv_default_loop(); if(def) { - auto ptr = std::unique_ptr(def, [](uv_loop_t *){ }); + auto ptr = std::unique_ptr(def, [](uv_loop_t *){}); loop = std::shared_ptr(new Loop{std::move(ptr)}); } diff --git a/src/uvw/stream.hpp b/src/uvw/stream.hpp index f1eb8dda..19d0981a 100644 --- a/src/uvw/stream.hpp +++ b/src/uvw/stream.hpp @@ -94,23 +94,14 @@ struct ShutdownReq final: public Request { class WriteReq final: public Request { +public: using Deleter = void(*)(uv_buf_t *); - template - static void deleter(uv_buf_t *bufs) { - std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; }); - delete[] bufs; - } - -public: - template - WriteReq(ConstructorAccess ca, std::shared_ptr loop, const uv_buf_t (&arr)[N]) + WriteReq(ConstructorAccess ca, std::shared_ptr loop, std::unique_ptr bufs, std::size_t nbufs) : Request{std::move(ca), std::move(loop)}, - bufs{new uv_buf_t[N], &deleter}, - nbufs{N} - { - std::copy_n(std::begin(arr), N, bufs.get()); - } + bufs{std::move(bufs)}, + nbufs{nbufs} + { } void write(uv_stream_t *handle) { invoke(&uv_write, get(), handle, bufs.get(), nbufs, &defaultCallback); @@ -250,7 +241,9 @@ public: /** * @brief Writes data to the stream. * - * Data are written in order.
+ * Data are written in order. The handle takes the ownership of the data and + * it is in charge of delete them. + * * A WriteEvent event will be emitted when the data have been written.
* An ErrorEvent event will be emitted in case of errors. * @@ -258,18 +251,55 @@ public: * @param len The lenght of the submitted data. */ void write(std::unique_ptr data, std::size_t len) { - const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) }; + constexpr std::size_t N = 1; + + auto write = this->loop().template resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data.release(), len) }, + [](uv_buf_t *bufs) { + std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; }); + delete[] bufs; + } + }, N); auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) { ptr->publish(event); }; - auto write = this->loop().template resource(bufs); write->template once(listener); write->template once(listener); write->write(this->template get()); } + /** + * @brief Writes data to the stream. + * + * Data are written in order. The handle doesn't take the ownership of the + * data. Be sure that their lifetime overcome the one of the request. + * + * A WriteEvent event will be emitted when the data have been written.
+ * An ErrorEvent event will be emitted in case of errors. + * + * @param data The data to be written to the stream. + * @param len The lenght of the submitted data. + */ + void write(char *data, std::size_t len) { + constexpr std::size_t N = 1; + + auto write = this->loop().template resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data, len) }, + [](uv_buf_t *bufs) { delete[] bufs; } + }, N); + + auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) { + ptr->publish(event); + }; + + write->template once(listener); + write->template once(listener); + write->write(this->template get()); + } /** * @brief Extended write function for sending handles over a pipe handle. @@ -280,6 +310,9 @@ public: * connection (listening or connected state). Bound sockets or pipes will be * assumed to be servers. * + * The handle takes the ownership of the data and it is in charge of delete + * them. + * * A WriteEvent event will be emitted when the data have been written.
* An ErrorEvent wvent will be emitted in case of errors. * @@ -289,13 +322,59 @@ public: */ template void write(S &send, std::unique_ptr data, std::size_t len) { - const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) }; + constexpr std::size_t N = 1; + + auto write = this->loop().template resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data.release(), len) }, + [](uv_buf_t *bufs) { + std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; }); + delete[] bufs; + } + }, N); + + auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) { + ptr->publish(event); + }; + + write->template once(listener); + write->template once(listener); + write->write(this->template get(), send.template get()); + } + + /** + * @brief Extended write function for sending handles over a pipe handle. + * + * The pipe must be initialized with `ipc == true`. + * + * `send` must be a TcpHandle or PipeHandle handle, which is a server or a + * connection (listening or connected state). Bound sockets or pipes will be + * assumed to be servers. + * + * The handle doesn't take the ownership of the data. Be sure that their + * lifetime overcome the one of the request. + * + * A WriteEvent event will be emitted when the data have been written.
+ * An ErrorEvent wvent will be emitted in case of errors. + * + * @param send The handle over which to write data. + * @param data The data to be written to the stream. + * @param len The lenght of the submitted data. + */ + template + void write(S &send, char *data, std::size_t len) { + constexpr std::size_t N = 1; + + auto write = this->loop().template resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data, len) }, + [](uv_buf_t *bufs) { delete[] bufs; } + }, N); auto listener = [ptr = this->shared_from_this()](const auto &event, details::WriteReq &) { ptr->publish(event); }; - auto write = this->loop().template resource(bufs); write->template once(listener); write->template once(listener); write->write(this->template get(), send.template get()); diff --git a/src/uvw/udp.hpp b/src/uvw/udp.hpp index 5777af8e..e71c88ad 100644 --- a/src/uvw/udp.hpp +++ b/src/uvw/udp.hpp @@ -59,23 +59,14 @@ enum class UVMembership: std::underlying_type_t { class SendReq final: public Request { +public: using Deleter = void(*)(uv_buf_t *); - template - static void deleter(uv_buf_t *bufs) { - std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; }); - delete[] bufs; - } - -public: - template - SendReq(ConstructorAccess ca, std::shared_ptr loop, const uv_buf_t (&arr)[N]) + SendReq(ConstructorAccess ca, std::shared_ptr loop, std::unique_ptr bufs, std::size_t nbufs) : Request{std::move(ca), std::move(loop)}, - bufs{new uv_buf_t[N], &deleter}, - nbufs{N} - { - std::copy_n(std::begin(arr), N, bufs.get()); - } + bufs{std::move(bufs)}, + nbufs{nbufs} + { } void send(uv_udp_t *handle, const struct sockaddr* addr) { invoke(&uv_udp_send, get(), handle, bufs.get(), nbufs, addr, &defaultCallback); @@ -293,6 +284,9 @@ public: * will be bound to `0.0.0.0` (the _all interfaces_ IPv4 address) and a * random port number. * + * The handle takes the ownership of the data and it is in charge of delete + * them. + * * A SendEvent event will be emitted when the data have been sent.
* An ErrorEvent event will be emitted in case of errors. * @@ -303,16 +297,64 @@ public: */ template void send(std::string ip, unsigned int port, std::unique_ptr data, std::size_t len) { + constexpr std::size_t N = 1; + typename details::IpTraits::Type addr; details::IpTraits::addrFunc(ip.data(), port, &addr); - const uv_buf_t bufs[] = { uv_buf_init(data.release(), len) }; + auto send = loop().resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data.release(), len) }, + [](uv_buf_t *bufs) { + std::for_each(bufs, bufs+N, [](uv_buf_t &buf){ delete[] buf.base; }); + delete[] bufs; + } + }, N); + + auto listener = [ptr = shared_from_this()](const auto &event, details::SendReq &) { + ptr->publish(event); + }; + + send->once(listener); + send->once(listener); + send->send(get(), reinterpret_cast(&addr)); + } + + /** + * @brief Sends data over the UDP socket. + * + * Note that if the socket has not previously been bound with `bind()`, it + * will be bound to `0.0.0.0` (the _all interfaces_ IPv4 address) and a + * random port number. + * + * The handle doesn't take the ownership of the data. Be sure that their + * lifetime overcome the one of the request. + * + * A SendEvent event will be emitted when the data have been sent.
+ * An ErrorEvent event will be emitted in case of errors. + * + * @param ip The address to which to send data. + * @param port The port to which to send data. + * @param data The data to be sent. + * @param len The lenght of the submitted data. + */ + template + void send(std::string ip, unsigned int port, char *data, std::size_t len) { + constexpr std::size_t N = 1; + + typename details::IpTraits::Type addr; + details::IpTraits::addrFunc(ip.data(), port, &addr); + + auto send = loop().resource( + std::unique_ptr{ + new uv_buf_t[N]{ uv_buf_init(data, len) }, + [](uv_buf_t *bufs) { delete[] bufs; } + }, N); auto listener = [ptr = shared_from_this()](const auto &event, details::SendReq &) { ptr->publish(event); }; - auto send = loop().resource(bufs); send->once(listener); send->once(listener); send->send(get(), reinterpret_cast(&addr)); diff --git a/test/uvw/file_req.cpp b/test/uvw/file_req.cpp index e021e68f..850eaac6 100644 --- a/test/uvw/file_req.cpp +++ b/test/uvw/file_req.cpp @@ -54,7 +54,7 @@ TEST(FileReq, OpenAndCloseSync) { } -TEST(FileReq, RW) { +TEST(FileReq, RWChecked) { const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"}; auto loop = uvw::Loop::getDefault(); @@ -93,6 +93,46 @@ TEST(FileReq, RW) { } +TEST(FileReq, RWUnchecked) { + const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"}; + std::unique_ptr data{new char[1]{ 42 }}; + + auto loop = uvw::Loop::getDefault(); + auto request = loop->resource(); + + bool checkFileWriteEvent = false; + bool checkFileReadEvent = false; + + request->on([](const auto &, auto &) { + FAIL(); + }); + + request->on>([&checkFileReadEvent](const auto &event, auto &request) { + ASSERT_FALSE(checkFileReadEvent); + ASSERT_EQ(event.data[0], 42); + checkFileReadEvent = true; + request.close(); + }); + + request->on>([&checkFileWriteEvent](const auto &, auto &request) { + ASSERT_FALSE(checkFileWriteEvent); + checkFileWriteEvent = true; + request.read(0, 1); + }); + + request->on>([&data](const auto &, auto &request) { + request.write(data.get(), 1, 0); + }); + + request->open(filename, O_CREAT | O_RDWR | O_TRUNC, 0644); + + loop->run(); + + ASSERT_TRUE(checkFileWriteEvent); + ASSERT_TRUE(checkFileReadEvent); +} + + TEST(FileReq, RWSync) { const std::string filename = std::string{TARGET_FILE_REQ_DIR} + std::string{"/test.file"};