From e177cc23bbeef75fa0f11c840bb2d31d88a34fe2 Mon Sep 17 00:00:00 2001 From: Michele Caini Date: Tue, 21 Jun 2016 12:25:32 +0200 Subject: [PATCH] WIP: Stream/Connection --- src/uvw/check.hpp | 15 +++++---- src/uvw/loop.hpp | 59 ++++++++++++++++++++++++--------- src/uvw/resource.hpp | 60 +++++++++++++++++++++++----------- src/uvw/stream.hpp | 77 ++++++++++++++++++++++++++++---------------- src/uvw/tcp.hpp | 46 ++++++++++++++------------ src/uvw/timer.hpp | 19 +++++------ test/main.cpp | 8 ++--- 7 files changed, 184 insertions(+), 100 deletions(-) diff --git a/src/uvw/check.hpp b/src/uvw/check.hpp index 83ab02e1..3dc80e1a 100644 --- a/src/uvw/check.hpp +++ b/src/uvw/check.hpp @@ -1,6 +1,8 @@ #pragma once +#include +#include #include #include "resource.hpp" #include "error.hpp" @@ -9,7 +11,7 @@ namespace uvw { -class Check final: public Resource { +class Check final: public Resource { static void proto(uv_check_t* h) { static_cast(h->data)->callback(UVWError{}); } @@ -17,23 +19,24 @@ class Check final: public Resource { public: using Callback = std::function; - explicit Check(uv_loop_t *loop): Resource{&handle} { - uv_check_init(loop, &handle); + explicit Check(std::shared_ptr ref) + : Resource{HandleType{}, ref} + { + uv_check_init(parent(), get()); } void start(Callback cb) noexcept { callback = cb; - auto err = uv_check_start(&handle, &proto); + auto err = uv_check_start(get(), &proto); if(err) { callback(UVWError{err}); } } - bool stop() noexcept { return (uv_check_stop(&handle) == 0); } + bool stop() noexcept { return (uv_check_stop(get()) == 0); } private: - uv_check_t handle; Callback callback; }; diff --git a/src/uvw/loop.hpp b/src/uvw/loop.hpp index 51c0f856..3bc7f16c 100644 --- a/src/uvw/loop.hpp +++ b/src/uvw/loop.hpp @@ -11,6 +11,7 @@ namespace uvw { +class Resource; class Loop; @@ -19,17 +20,21 @@ class Handle { friend class Loop; template - explicit constexpr Handle(uv_loop_t *loop, Args&&... args) - : res{std::make_shared(loop, std::forward(args)...)} + explicit constexpr Handle(std::shared_ptr&& l, Args&&... args) + : res{std::make_shared(std::move(l), std::forward(args)...)} { } public: + explicit constexpr Handle(): res{} { } + constexpr Handle(const Handle &other): res{other.res} { } constexpr Handle(Handle &&other): res{std::move(other.res)} { } constexpr void operator=(const Handle &other) { res = other.res; } constexpr void operator=(Handle &&other) { res = std::move(other.res); } + constexpr explicit operator bool() { return static_cast(res); } + constexpr operator R&() noexcept { return *res; } operator const R&() const noexcept { return *res; } @@ -38,20 +43,43 @@ private: }; -class Loop final { -public: - Loop(bool def = true) - : loop{def ? uv_default_loop() : new uv_loop_t, [def](uv_loop_t *l){ if(!def) delete l; }} - { - if(!def) { - auto err = uv_loop_init(loop.get()); +class Loop final: public std::enable_shared_from_this { + friend class Resource; - if(err) { - throw UVWException{err}; - } - } else if(!loop) { - throw std::bad_alloc{}; + using Deleter = std::function; + + Loop(std::unique_ptr ptr): loop{std::move(ptr)} { } + +public: + static std::shared_ptr create(bool def = true) { + auto ptr = std::unique_ptr{new uv_loop_t, [](uv_loop_t *l){ delete l; }}; + auto loop = std::shared_ptr(new Loop{std::move(ptr)}); + + if(!uv_loop_init(loop->loop.get())) { + loop = nullptr; } + + return loop; + } + + static std::shared_ptr getDefault() { + static std::weak_ptr ref; + std::shared_ptr loop; + + if(ref.expired()) { + auto def = uv_default_loop(); + + if(def) { + auto ptr = std::unique_ptr(def, [](uv_loop_t *){ }); + loop = std::shared_ptr(new Loop{std::move(ptr)}); + } + + ref = loop; + } else { + loop = ref.lock(); + } + + return loop; } Loop(const Loop &) = delete; @@ -67,7 +95,7 @@ public: template Handle handle(Args&&... args) { - return Handle{loop.get(), std::forward(args)...}; + return Handle{shared_from_this(), std::forward(args)...}; } bool close() noexcept { @@ -95,7 +123,6 @@ public: } private: - using Deleter = std::function; std::unique_ptr loop; }; diff --git a/src/uvw/resource.hpp b/src/uvw/resource.hpp index fb8c6544..63d8471a 100644 --- a/src/uvw/resource.hpp +++ b/src/uvw/resource.hpp @@ -11,43 +11,67 @@ namespace uvw { -template -class Resource { -protected: - template - explicit Resource(U *u): handle{reinterpret_cast(u)} { - handle->data = this; - } +template +struct HandleType { }; + +class Resource { static void proto(uv_handle_t* h) { static_cast(h->data)->callback(UVWError{}); } +protected: + template + explicit Resource(HandleType, std::shared_ptr r) + : ref{std::move(r)}, res{std::make_shared()} + { + get()->data = this; + } + + template + T* get() const noexcept { return reinterpret_cast(res.get()); } + + uv_loop_t* parent() const noexcept { return ref->loop.get(); } + + template + Handle spawn(Args&&... args) { + auto h = ref->handle(std::forward(args)...); + static_cast(h).res = res; + return h; + } + + template + void reset() { + res = std::make_shared(); + get()->data = this; + } + public: using Callback = std::function; - virtual ~Resource() { static_assert(std::is_base_of, T>::value, "!"); } - - Resource(const Resource &) = delete; - Resource(Resource &&) = delete; + explicit Resource(const Resource &) = delete; + explicit Resource(Resource &&) = delete; void operator=(const Resource &) = delete; void operator=(Resource &&) = delete; - bool active() const noexcept { return !(uv_is_active(handle) == 0); } - bool closing() const noexcept { return !(uv_is_closing(handle) == 0); } + std::shared_ptr loop() const noexcept { return ref; } + + bool active() const noexcept { return !(uv_is_active(get()) == 0); } + bool closing() const noexcept { return !(uv_is_closing(get()) == 0); } void close(Callback cb) noexcept { callback = cb; - uv_close(handle, &proto); + uv_close(get(), &proto); } - void reference() noexcept { uv_ref(handle); } - void unreference() noexcept { uv_ref(handle); } - bool referenced() const noexcept { return !(uv_has_ref(handle) == 0); } + void reference() noexcept { uv_ref(get()); } + void unreference() noexcept { uv_ref(get()); } + bool referenced() const noexcept { return !(uv_has_ref(get()) == 0); } private: - uv_handle_t *handle; + std::shared_ptr ref; + std::shared_ptr res; Callback callback; }; diff --git a/src/uvw/stream.hpp b/src/uvw/stream.hpp index d889dd3b..605fb4ff 100644 --- a/src/uvw/stream.hpp +++ b/src/uvw/stream.hpp @@ -1,50 +1,73 @@ #pragma once +#include +#include "resource.hpp" +#include "loop.hpp" + + namespace uvw { -template -class Connection: public Resource { - using Resource::Resource; +class Connection final: public Resource { + // TODO proxy on a stream client public: + explicit Connection(std::shared_ptr ref, uv_stream_t *srv) + : Resource{HandleType{}, ref} + { + // TODO initialized... HOW???? + uv_tcp_init(parent(), get());get() + auto err = uv_accept(srv, get()); + } +}; + + +class Stream: public Resource { + static void protoListen(uv_stream_t* srv, int status) { + // TODO invoke accept + + Stream &stream = *(static_cast(srv->data)); + + if(status) { + stream.listenCallback(UVWError{status}, Handle{}); + } else { + stream.listenCallback(UVWError{}, loop()->handle(get())); + } + } + +protected: + using Resource::Resource; + +public: + using CallbackListen = std::function)>; + + // TODO shutdown + + void listen(int backlog, CallbackListen cb) noexcept { + listenCallback = cb; + auto err = uv_listen(get(), backlog, &protoListen); + + if(err) { + listenCallback(UVWError{err}, Handle{}); + } + } + // TODO read // TODO stop // TODO write // TODO tryWrite -}; - - -template -class Stream: public Connection { - using Connection::Connection; - - static void protoListen(uv_stream_t* srv, int status) { - // TODO - } - -protected: - template - explicit Stream(U *u) - : Connection{u}, - handle{reinterpret_cast(u)} - { } - -public: - // TODO shutdown - // TODO listen bool readable() const noexcept { - return (uv_is_readable(handle) == 1); + return (uv_is_readable(get()) == 1); } bool writable() const noexcept { - return (uv_is_writable(handle) == 1); + return (uv_is_writable(get()) == 1); } private: - uv_stream_t *handle; + CallbackListen listenCallback; }; diff --git a/src/uvw/tcp.hpp b/src/uvw/tcp.hpp index e19d7488..e932bc94 100644 --- a/src/uvw/tcp.hpp +++ b/src/uvw/tcp.hpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -12,67 +13,72 @@ namespace uvw { -class Tcp final: public Stream { +class Tcp final: public Stream { static void protoConnect(uv_connect_t* req, int status) { - auto handle = req->handle; - delete req; - static_cast(handle->data)->connCb(UVWError{status}); + Tcp &tcp = *(static_cast(req->handle->data)); + + if(status) { + tcp.connCb(UVWError{status}, Handle{}); + } else { + auto h = tcp.spawn(); + tcp.reset(); + uv_tcp_init(tcp.parent(), tcp.get()); + tcp.connCb(UVWError{}, h); + } } public: using Time = std::chrono::duration; - using CallbackConnect = std::function; + using CallbackConnect = std::function)>; enum { IPv4, IPv6 }; - explicit Tcp(uv_loop_t *loop): Stream{&handle} { - uv_tcp_init(loop, &handle); - } - - ~Tcp() { - close([](UVWError){}); + explicit Tcp(std::shared_ptr ref) + : Stream{HandleType{}, ref}, + conn{std::make_unique()} + { + uv_tcp_init(parent(), get()); } bool noDelay(bool value = false) noexcept { - return (uv_tcp_nodelay(&handle, value ? 1 : 0) == 0); + return (uv_tcp_nodelay(get(), value ? 1 : 0) == 0); } bool keepAlive(bool enable = false, Time time = Time{0}) noexcept { - return (uv_tcp_keepalive(&handle, enable ? 1 : 0, time.count()) == 0); + return (uv_tcp_keepalive(get(), enable ? 1 : 0, time.count()) == 0); } template void connect(std::string, int, CallbackConnect) noexcept; private: + std::unique_ptr conn; CallbackConnect connCb; - uv_tcp_t handle; }; template<> void Tcp::connect(std::string ip, int port, CallbackConnect cb) noexcept { - uv_connect_t *conn = new uv_connect_t; sockaddr_in addr; uv_ip4_addr(ip.c_str(), port, &addr); connCb = cb; - auto err = uv_tcp_connect(conn, &handle, reinterpret_cast(&addr), &protoConnect); + auto err = uv_tcp_connect(conn.get(), get(), reinterpret_cast(&addr), &protoConnect); if(err) { - connCb(UVWError{err}); + connCb(UVWError{err}, Handle{}); } } + template<> void Tcp::connect(std::string ip, int port, CallbackConnect cb) noexcept { - uv_connect_t *conn = new uv_connect_t; sockaddr_in6 addr; uv_ip6_addr(ip.c_str(), port, &addr); connCb = cb; - auto err = uv_tcp_connect(conn, &handle, reinterpret_cast(&addr), &protoConnect); + auto err = uv_tcp_connect(conn.get(), get(), reinterpret_cast(&addr), &protoConnect); if(err) { - connCb(UVWError{err}); + connCb(UVWError{err}, Handle{}); } } diff --git a/src/uvw/timer.hpp b/src/uvw/timer.hpp index b476b9af..d97ca38c 100644 --- a/src/uvw/timer.hpp +++ b/src/uvw/timer.hpp @@ -12,7 +12,7 @@ namespace uvw { -class Timer final: public Resource { +class Timer final: public Resource { static void proto(uv_timer_t* h) { static_cast(h->data)->callback(UVWError{}); } @@ -21,26 +21,27 @@ public: using Time = std::chrono::duration; using Callback = std::function; - explicit Timer(uv_loop_t *loop): Resource{&handle} { - uv_timer_init(loop, &handle); + explicit Timer(std::shared_ptr ref) + : Resource{HandleType{}, ref} + { + uv_timer_init(parent(), get()); } void start(const Time &timeout, const Time &rep, Callback cb) noexcept { callback = cb; - auto err = uv_timer_start(&handle, &proto, timeout.count(), rep.count()); + auto err = uv_timer_start(get(), &proto, timeout.count(), rep.count()); if(err) { callback(UVWError{err}); } } - void stop() noexcept { uv_timer_stop(&handle); } - void again() noexcept { uv_timer_again(&handle); } - void repeat(const Time &rep) noexcept { uv_timer_set_repeat(&handle, rep.count()); } - Time repeat() const noexcept { return Time{uv_timer_get_repeat(&handle)}; } + void stop() noexcept { uv_timer_stop(get()); } + void again() noexcept { uv_timer_again(get()); } + void repeat(const Time &rep) noexcept { uv_timer_set_repeat(get(), rep.count()); } + Time repeat() const noexcept { return Time{uv_timer_get_repeat(get())}; } private: - uv_timer_t handle; Callback callback; }; diff --git a/test/main.cpp b/test/main.cpp index be5928c2..4822ffcd 100644 --- a/test/main.cpp +++ b/test/main.cpp @@ -4,14 +4,14 @@ void f(uvw::Loop &loop) { uvw::Handle handle = loop.handle(); - auto cb = [h = handle](uvw::UVWError err){ std::cout << "---" << ((bool)err) << std::endl; }; + auto cb = [h = handle](uvw::UVWError err, uvw::Handle conn){ std::cout << "---" << ((bool)err) << std::endl; }; uvw::Tcp &tcp = handle; tcp.connect(std::string{"127.0.0.1"}, 80, cb); } int main() { - uvw::Loop loop; - f(loop); - loop.runWait(); + auto loop = uvw::Loop::getDefault(); + f(*loop); + loop->runWait(); }