WIP: Pipe

This commit is contained in:
Michele Caini 2016-07-21 11:52:33 +02:00
parent af2b27e809
commit 399878f94e
6 changed files with 122 additions and 66 deletions

View File

@ -18,6 +18,7 @@ template<> struct HandleType<uv_async_t> { };
template<> struct HandleType<uv_check_t> { }; template<> struct HandleType<uv_check_t> { };
template<> struct HandleType<uv_fs_poll_t> { }; template<> struct HandleType<uv_fs_poll_t> { };
template<> struct HandleType<uv_idle_t> { }; template<> struct HandleType<uv_idle_t> { };
template<> struct HandleType<uv_pipe_t> { };
template<> struct HandleType<uv_poll_t> { }; template<> struct HandleType<uv_poll_t> { };
template<> struct HandleType<uv_prepare_t> { }; template<> struct HandleType<uv_prepare_t> { };
template<> struct HandleType<uv_signal_t> { }; template<> struct HandleType<uv_signal_t> { };

58
src/uvw/pipe.hpp Normal file
View File

@ -0,0 +1,58 @@
#pragma once
#include <type_traits>
#include <utility>
#include <memory>
#include <string>
#include <uv.h>
#include "event.hpp"
#include "request.hpp"
#include "stream.hpp"
#include "util.hpp"
namespace uvw {
class Pipe final: public Stream<Pipe> {
explicit Pipe(std::shared_ptr<Loop> ref)
: Stream{HandleType<uv_pipe_t>{}, std::move(ref)}
{ }
public:
template<typename... Args>
static std::shared_ptr<Pipe> create(Args&&... args) {
return std::shared_ptr<Pipe>{new Pipe{std::forward<Args>(args)...}};
}
bool init(bool ipc = false) { return initialize<uv_pipe_t>(&uv_ipc_init, ipc); }
void bind(std::string name) {
invoke(&uv_pipe_bind, get<uv_pipe_t>(), name.data());
}
void connect(std::string name) {
std::weak_ptr<Pipe> weak = this->shared_from_this();
auto listener = [weak](const auto &event, details::Connect &) {
auto ptr = weak.lock();
if(ptr) { ptr->publish(event); }
};
auto connect = loop().resource<details::Connect>();
connect->once<ErrorEvent>(listener);
connect->once<ConnectEvent>(listener);
connect->connect(&uv_pipe_connect, get<uv_pipe_t>(), name.data());
}
std::string sock() const noexcept { return details::path(&uv_pipe_getsockname, get<uv_pipe_t>()); }
std::string peer() const noexcept { return details::path(&uv_pipe_getpeername, get<uv_pipe_t>()); }
// TODO uv_pipe_pending_instances
// TODO uv_pipe_pending_count
// TODO uv_pipe_pending_type
};
}

View File

@ -18,6 +18,24 @@ namespace uvw {
namespace details { namespace details {
class Connect final: public Request<Connect> {
explicit Connect(std::shared_ptr<Loop> ref)
: Request{RequestType<uv_connect_t>{}, std::move(ref)}
{ }
public:
template<typename... Args>
static std::shared_ptr<Connect> create(Args&&... args) {
return std::shared_ptr<Connect>{new Connect{std::forward<Args>(args)...}};
}
template<typename F, typename... A>
void connect(F &&f, A... args) {
exec<uv_connect_t, ConnectEvent>(std::forward<F>(f), get<uv_connect_t>(), std::forward<A>(args)...);
}
};
class Shutdown final: public Request<Shutdown> { class Shutdown final: public Request<Shutdown> {
explicit Shutdown(std::shared_ptr<Loop> ref) explicit Shutdown(std::shared_ptr<Loop> ref)
: Request{RequestType<uv_shutdown_t>{}, std::move(ref)} : Request{RequestType<uv_shutdown_t>{}, std::move(ref)}
@ -111,6 +129,8 @@ public:
listen(DEFAULT_BACKLOG); listen(DEFAULT_BACKLOG);
} }
virtual void accept(T &) = 0;
void read() { void read() {
this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback); this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
} }

View File

@ -16,47 +16,24 @@
namespace uvw { namespace uvw {
namespace details {
class Connect final: public Request<Connect> {
explicit Connect(std::shared_ptr<Loop> ref)
: Request{RequestType<uv_connect_t>{}, std::move(ref)}
{ }
public:
template<typename... Args>
static std::shared_ptr<Connect> create(Args&&... args) {
return std::shared_ptr<Connect>{new Connect{std::forward<Args>(args)...}};
}
void connect(uv_tcp_t *handle, const sockaddr *addr) {
exec<uv_connect_t, ConnectEvent>(&uv_tcp_connect, get<uv_connect_t>(), handle, addr);
}
};
}
class Tcp final: public Stream<Tcp> { class Tcp final: public Stream<Tcp> {
using AddressFunctionType = Addr(*)(const Tcp &); using SockFunctionType = Addr(*)(const Tcp &);
using RemoteFunctionType = AddressFunctionType; using PeerFunctionType = SockFunctionType;
template<typename I> template<typename I>
static Addr tAddress(const Tcp &tcp) noexcept { static Addr tSock(const Tcp &tcp) noexcept {
return details::address<I>(uv_tcp_getsockname, tcp.get<uv_tcp_t>()); return details::address<I>(uv_tcp_getsockname, tcp.get<uv_tcp_t>());
} }
template<typename I> template<typename I>
static Addr tRemote(const Tcp &tcp) noexcept { static Addr tPeer(const Tcp &tcp) noexcept {
return details::address<I>(uv_tcp_getpeername, tcp.get<uv_tcp_t>()); return details::address<I>(uv_tcp_getpeername, tcp.get<uv_tcp_t>());
} }
explicit Tcp(std::shared_ptr<Loop> ref) explicit Tcp(std::shared_ptr<Loop> ref)
: Stream{HandleType<uv_tcp_t>{}, std::move(ref)}, : Stream{HandleType<uv_tcp_t>{}, std::move(ref)},
addressF{&tAddress<details::IPv4>}, sockF{&tSock<details::IPv4>},
remoteF{&tRemote<details::IPv4>} peerF{&tPeer<details::IPv4>}
{ } { }
public: public:
@ -90,8 +67,8 @@ public:
Traits::AddrFunc(ip.data(), port, &addr); Traits::AddrFunc(ip.data(), port, &addr);
if(0 == invoke(&uv_tcp_bind, get<uv_tcp_t>(), reinterpret_cast<const sockaddr *>(&addr), flags)) { if(0 == invoke(&uv_tcp_bind, get<uv_tcp_t>(), reinterpret_cast<const sockaddr *>(&addr), flags)) {
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
} }
@ -100,8 +77,8 @@ public:
bind<I>(addr.ip, addr.port, flags); bind<I>(addr.ip, addr.port, flags);
} }
Addr address() const noexcept { return addressF(*this); } Addr sock() const noexcept { return sockF(*this); }
Addr remote() const noexcept { return remoteF(*this); } Addr peer() const noexcept { return peerF(*this); }
template<typename I, typename..., typename Traits = details::IpTraits<I>> template<typename I, typename..., typename Traits = details::IpTraits<I>>
void connect(std::string ip, unsigned int port) { void connect(std::string ip, unsigned int port) {
@ -114,8 +91,8 @@ public:
auto ptr = weak.lock(); auto ptr = weak.lock();
if(ptr) { if(ptr) {
ptr->addressF = &tAddress<I>; ptr->sockF = &tSock<I>;
ptr->remoteF = &tRemote<I>; ptr->peerF = &tPeer<I>;
ptr->publish(event); ptr->publish(event);
} }
}; };
@ -123,22 +100,22 @@ public:
auto connect = loop().resource<details::Connect>(); auto connect = loop().resource<details::Connect>();
connect->once<ErrorEvent>(listener); connect->once<ErrorEvent>(listener);
connect->once<ConnectEvent>(listener); connect->once<ConnectEvent>(listener);
connect->connect(get<uv_tcp_t>(), reinterpret_cast<const sockaddr *>(&addr)); connect->connect(&uv_tcp_connect, get<uv_tcp_t>(), reinterpret_cast<const sockaddr *>(&addr));
} }
template<typename I, typename..., typename Traits = details::IpTraits<I>> template<typename I, typename..., typename Traits = details::IpTraits<I>>
void connect(Addr addr) { connect<I>(addr.ip, addr.port); } void connect(Addr addr) { connect<I>(addr.ip, addr.port); }
void accept(Tcp &tcp) { void accept(Tcp &tcp) override {
if(0 == invoke(&uv_accept, get<uv_stream_t>(), tcp.get<uv_stream_t>())) { if(0 == invoke(&uv_accept, get<uv_stream_t>(), tcp.get<uv_stream_t>())) {
tcp.addressF = addressF; tcp.sockF = sockF;
tcp.remoteF = remoteF; tcp.peerF = peerF;
} }
} }
private: private:
AddressFunctionType addressF; SockFunctionType sockF;
RemoteFunctionType remoteF; PeerFunctionType peerF;
}; };

View File

@ -39,16 +39,16 @@ public:
class Udp final: public Handle<Udp> { class Udp final: public Handle<Udp> {
using AddressFunctionType = Addr(*)(const Udp &); using SockFunctionType = Addr(*)(const Udp &);
using RemoteFunctionType = Addr(*)(const sockaddr *); using PeerFunctionType = Addr(*)(const sockaddr *);
template<typename I> template<typename I>
static Addr tAddress(const Udp &udp) noexcept { static Addr tSock(const Udp &udp) noexcept {
return details::address<I>(uv_udp_getsockname, udp.get<uv_udp_t>()); return details::address<I>(uv_udp_getsockname, udp.get<uv_udp_t>());
} }
template<typename I, typename..., typename Traits = details::IpTraits<I>> template<typename I, typename..., typename Traits = details::IpTraits<I>>
static Addr tRemote(const sockaddr *addr) noexcept { static Addr tPeer(const sockaddr *addr) noexcept {
const typename Traits::Type *aptr = reinterpret_cast<const typename Traits::Type *>(addr); const typename Traits::Type *aptr = reinterpret_cast<const typename Traits::Type *>(addr);
int len = sizeof(*addr); int len = sizeof(*addr);
return details::address<I>(aptr, len); return details::address<I>(aptr, len);
@ -56,8 +56,8 @@ class Udp final: public Handle<Udp> {
explicit Udp(std::shared_ptr<Loop> ref) explicit Udp(std::shared_ptr<Loop> ref)
: Handle{HandleType<uv_udp_t>{}, std::move(ref)}, : Handle{HandleType<uv_udp_t>{}, std::move(ref)},
addressF{&tAddress<details::IPv4>}, sockF{&tSock<details::IPv4>},
remoteF{&tRemote<details::IPv4>} peerF{&tPeer<details::IPv4>}
{ } { }
static void recvCallback(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const sockaddr *addr, unsigned flags) { static void recvCallback(uv_udp_t *handle, ssize_t nread, const uv_buf_t *buf, const sockaddr *addr, unsigned flags) {
@ -67,12 +67,12 @@ class Udp final: public Handle<Udp> {
if(nread > 0) { if(nread > 0) {
// data available (can be truncated) // data available (can be truncated)
udp.publish(UDPDataEvent{udp.remoteF(addr), std::move(data), nread, flags & UV_UDP_PARTIAL}); udp.publish(UDPDataEvent{udp.peerF(addr), std::move(data), nread, flags & UV_UDP_PARTIAL});
} else if(nread == 0 && addr == nullptr) { } else if(nread == 0 && addr == nullptr) {
// no more data to be read, doing nothing is fine // no more data to be read, doing nothing is fine
} else if(nread == 0 && addr != nullptr) { } else if(nread == 0 && addr != nullptr) {
// empty udp packet // empty udp packet
udp.publish(UDPDataEvent{udp.remoteF(addr), std::move(data), nread, false}); udp.publish(UDPDataEvent{udp.peerF(addr), std::move(data), nread, false});
} else { } else {
// transmission error // transmission error
udp.publish(ErrorEvent(nread)); udp.publish(ErrorEvent(nread));
@ -106,8 +106,8 @@ public:
Traits::AddrFunc(ip.data(), port, &addr); Traits::AddrFunc(ip.data(), port, &addr);
if(0 == invoke(&uv_udp_bind, get<uv_udp_t>(), reinterpret_cast<const sockaddr *>(&addr), flags)) { if(0 == invoke(&uv_udp_bind, get<uv_udp_t>(), reinterpret_cast<const sockaddr *>(&addr), flags)) {
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
} }
@ -116,13 +116,13 @@ public:
bind<I>(addr.ip, addr.port, flags); bind<I>(addr.ip, addr.port, flags);
} }
Addr address() const noexcept { return addressF(*this); } Addr sock() const noexcept { return sockF(*this); }
template<typename I> template<typename I>
void multicastMembership(std::string multicast, std::string interface, Membership membership) { void multicastMembership(std::string multicast, std::string interface, Membership membership) {
if(0 == invoke(&uv_udp_set_membership, get<uv_udp_t>(), multicast.data(), interface.data(), static_cast<uv_membership>(membership))) { if(0 == invoke(&uv_udp_set_membership, get<uv_udp_t>(), multicast.data(), interface.data(), static_cast<uv_membership>(membership))) {
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
} }
@ -137,8 +137,8 @@ public:
template<typename I> template<typename I>
void multicastInterface(std::string interface) { void multicastInterface(std::string interface) {
if(0 == invoke(&uv_udp_set_multicast_interface, get<uv_udp_t>(), interface.data())) { if(0 == invoke(&uv_udp_set_multicast_interface, get<uv_udp_t>(), interface.data())) {
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
} }
@ -157,8 +157,8 @@ public:
auto ptr = weak.lock(); auto ptr = weak.lock();
if(ptr) { if(ptr) {
ptr->addressF = &tAddress<I>; ptr->sockF = &tSock<I>;
ptr->remoteF = &tRemote<I>; ptr->peerF = &tPeer<I>;
ptr->publish(event); ptr->publish(event);
} }
}; };
@ -168,8 +168,8 @@ public:
send->once<SendEvent>(listener); send->once<SendEvent>(listener);
send->send(get<uv_udp_t>(), bufs, 1, reinterpret_cast<const sockaddr *>(&addr)); send->send(get<uv_udp_t>(), bufs, 1, reinterpret_cast<const sockaddr *>(&addr));
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
template<typename I, typename..., typename Traits = details::IpTraits<I>> template<typename I, typename..., typename Traits = details::IpTraits<I>>
@ -189,8 +189,8 @@ public:
this->publish(ErrorEvent{bw}); this->publish(ErrorEvent{bw});
bw = 0; bw = 0;
} else { } else {
addressF = &tAddress<I>; sockF = &tSock<I>;
remoteF = &tRemote<I>; peerF = &tPeer<I>;
} }
return bw; return bw;
@ -205,8 +205,8 @@ public:
void stop() { invoke(&uv_udp_recv_stop, get<uv_udp_t>()); } void stop() { invoke(&uv_udp_recv_stop, get<uv_udp_t>()); }
private: private:
AddressFunctionType addressF; SockFunctionType sockF;
RemoteFunctionType remoteF; PeerFunctionType peerF;
}; };

View File

@ -30,10 +30,10 @@ void listen(uvw::Loop &loop) {
srv.accept(*client); srv.accept(*client);
uvw::Addr local = srv.address(); uvw::Addr local = srv.sock();
std::cout << "local: " << local.ip << " " << local.port << std::endl; std::cout << "local: " << local.ip << " " << local.port << std::endl;
uvw::Addr remote = client->remote(); uvw::Addr remote = client->peer();
std::cout << "remote: " << remote.ip << " " << remote.port << std::endl; std::cout << "remote: " << remote.ip << " " << remote.port << std::endl;
client->on<uvw::DataEvent>([](const uvw::DataEvent &event, uvw::Tcp &) { client->on<uvw::DataEvent>([](const uvw::DataEvent &event, uvw::Tcp &) {