uvw  2.6.0
stream.h
1 #ifndef UVW_STREAM_INCLUDE_H
2 #define UVW_STREAM_INCLUDE_H
3 
4 
5 #include <algorithm>
6 #include <iterator>
7 #include <cstddef>
8 #include <utility>
9 #include <memory>
10 #include <uv.h>
11 #include "request.hpp"
12 #include "handle.hpp"
13 #include "loop.h"
14 
15 
16 namespace uvw {
17 
18 
24 struct ConnectEvent {};
25 
26 
32 struct EndEvent {};
33 
34 
40 struct ListenEvent {};
41 
42 
48 struct ShutdownEvent {};
49 
50 
56 struct WriteEvent {};
57 
58 
64 struct DataEvent {
65  explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept;
66 
67  std::unique_ptr<char[]> data;
68  std::size_t length;
69 };
70 
71 
72 namespace details {
73 
74 
75 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
76  using Request::Request;
77 
78  template<typename F, typename... Args>
79  void connect(F &&f, Args&&... args) {
80  invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
81  }
82 };
83 
84 
85 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
86  using Request::Request;
87 
88  void shutdown(uv_stream_t *handle);
89 };
90 
91 
92 template<typename Deleter>
93 class WriteReq final: public Request<WriteReq<Deleter>, uv_write_t> {
94  using ConstructorAccess = typename Request<WriteReq<Deleter>, uv_write_t>::ConstructorAccess;
95 
96 public:
97  WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
98  : Request<WriteReq<Deleter>, uv_write_t>{ca, std::move(loop)},
99  data{std::move(dt)},
100  buf{uv_buf_init(data.get(), len)}
101  {}
102 
103  void write(uv_stream_t *handle) {
104  this->invoke(&uv_write, this->get(), handle, &buf, 1, &this->template defaultCallback<WriteEvent>);
105  }
106 
107  void write(uv_stream_t *handle, uv_stream_t *send) {
108  this->invoke(&uv_write2, this->get(), handle, &buf, 1, send, &this->template defaultCallback<WriteEvent>);
109  }
110 
111 private:
112  std::unique_ptr<char[], Deleter> data;
113  uv_buf_t buf;
114 };
115 
116 
117 }
118 
119 
127 template<typename T, typename U>
128 class StreamHandle: public Handle<T, U> {
129  static constexpr unsigned int DEFAULT_BACKLOG = 128;
130 
131  static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
132  T &ref = *(static_cast<T*>(handle->data));
133  // data will be destroyed no matter of what the value of nread is
134  std::unique_ptr<char[]> data{buf->base};
135 
136  // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
137  // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
138  // for we don't have data to emit though, it's fine to suppress it
139 
140  if(nread == UV_EOF) {
141  // end of stream
142  ref.publish(EndEvent{});
143  } else if(nread > 0) {
144  // data available
145  ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
146  } else if(nread < 0) {
147  // transmission error
148  ref.publish(ErrorEvent(nread));
149  }
150  }
151 
152  static void listenCallback(uv_stream_t *handle, int status) {
153  T &ref = *(static_cast<T*>(handle->data));
154  if(status) { ref.publish(ErrorEvent{status}); }
155  else { ref.publish(ListenEvent{}); }
156  }
157 
158 public:
159 #ifdef _MSC_VER
160  StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref)
161  : Handle<T, U>{ca, std::move(ref)}
162  {}
163 #else
164  using Handle<T, U>::Handle;
165 #endif
166 
174  void shutdown() {
175  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
176  ptr->publish(event);
177  };
178 
179  auto shutdown = this->loop().template resource<details::ShutdownReq>();
180  shutdown->template once<ErrorEvent>(listener);
181  shutdown->template once<ShutdownEvent>(listener);
182  shutdown->shutdown(this->template get<uv_stream_t>());
183  }
184 
195  void listen(int backlog = DEFAULT_BACKLOG) {
196  this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
197  }
198 
218  template<typename S>
219  void accept(S &ref) {
220  this->invoke(&uv_accept, this->template get<uv_stream_t>(), this->template get<uv_stream_t>(ref));
221  }
222 
230  void read() {
231  this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
232  }
233 
239  void stop() {
240  this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
241  }
242 
255  template<typename Deleter>
256  void write(std::unique_ptr<char[], Deleter> data, unsigned int len) {
257  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
258  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
259  ptr->publish(event);
260  };
261 
262  req->template once<ErrorEvent>(listener);
263  req->template once<WriteEvent>(listener);
264  req->write(this->template get<uv_stream_t>());
265  }
266 
279  void write(char *data, unsigned int len) {
280  auto req = this->loop().template resource<details::WriteReq<void(*)(char *)>>(std::unique_ptr<char[], void(*)(char *)>{data, [](char *) {}}, len);
281  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
282  ptr->publish(event);
283  };
284 
285  req->template once<ErrorEvent>(listener);
286  req->template once<WriteEvent>(listener);
287  req->write(this->template get<uv_stream_t>());
288  }
289 
309  template<typename S, typename Deleter>
310  void write(S &send, std::unique_ptr<char[], Deleter> data, unsigned int len) {
311  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
312  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
313  ptr->publish(event);
314  };
315 
316  req->template once<ErrorEvent>(listener);
317  req->template once<WriteEvent>(listener);
318  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
319  }
320 
340  template<typename S>
341  void write(S &send, char *data, unsigned int len) {
342  auto req = this->loop().template resource<details::WriteReq<void(*)(char *)>>(std::unique_ptr<char[], void(*)(char *)>{data, [](char *) {}}, len);
343  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
344  ptr->publish(event);
345  };
346 
347  req->template once<ErrorEvent>(listener);
348  req->template once<WriteEvent>(listener);
349  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
350  }
351 
363  int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
364  uv_buf_t bufs[] = { uv_buf_init(data.get(), len) };
365  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
366 
367  if(bw < 0) {
368  this->publish(ErrorEvent{bw});
369  bw = 0;
370  }
371 
372  return bw;
373  }
374 
386  int tryWrite(char *data, unsigned int len) {
387  uv_buf_t bufs[] = { uv_buf_init(data, len) };
388  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
389 
390  if(bw < 0) {
391  this->publish(ErrorEvent{bw});
392  bw = 0;
393  }
394 
395  return bw;
396  }
397 
402  bool readable() const noexcept {
403  return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
404  }
405 
410  bool writable() const noexcept {
411  return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
412  }
413 
429  bool blocking(bool enable = false) {
430  return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
431  }
432 
437  size_t writeQueueSize() const noexcept {
438  return uv_stream_get_write_queue_size(this->template get<uv_stream_t>());
439  }
440 };
441 
442 
443 }
444 
445 
446 #ifndef UVW_AS_LIB
447 #include "stream.cpp"
448 #endif
449 
450 #endif // UVW_STREAM_INCLUDE_H
uvw::Resource::data
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
Definition: resource.hpp:54
uvw::StreamHandle
The StreamHandle handle.
Definition: stream.h:128
uvw::StreamHandle::listen
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition: stream.h:195
uvw
uvw default namespace.
Definition: async.h:10
uvw::UnderlyingType::loop
Loop & loop() const noexcept
Gets the loop from which the resource was originated.
Definition: underlying_type.hpp:71
uvw::StreamHandle::accept
void accept(S &ref)
Accepts incoming connections.
Definition: stream.h:219
uvw::EndEvent
EndEvent event.
Definition: stream.h:32
uvw::StreamHandle::tryWrite
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:386
uvw::DataEvent::length
std::size_t length
Definition: stream.h:68
uvw::StreamHandle::write
void write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:310
uvw::ErrorEvent
The ErrorEvent event.
Definition: emitter.h:24
uvw::WriteEvent
WriteEvent event.
Definition: stream.h:56
uvw::StreamHandle::read
void read()
Starts reading data from an incoming stream.
Definition: stream.h:230
uvw::StreamHandle::stop
void stop()
Stops reading data from the stream.
Definition: stream.h:239
uvw::StreamHandle::writable
bool writable() const noexcept
Checks if the stream is writable.
Definition: stream.h:410
uvw::StreamHandle::blocking
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition: stream.h:429
uvw::DataEvent::data
std::unique_ptr< char[]> data
Definition: stream.h:67
uvw::ShutdownEvent
ShutdownEvent event.
Definition: stream.h:48
uvw::StreamHandle::readable
bool readable() const noexcept
Checks if the stream is readable.
Definition: stream.h:402
uvw::DataEvent
DataEvent event.
Definition: stream.h:64
uvw::Handle
Handle base class.
Definition: handle.hpp:30
uvw::ListenEvent
ListenEvent event.
Definition: stream.h:40
uvw::StreamHandle::tryWrite
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:363
uvw::ConnectEvent
ConnectEvent event.
Definition: stream.h:24
uvw::StreamHandle::shutdown
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition: stream.h:174
uvw::StreamHandle::write
void write(char *data, unsigned int len)
Writes data to the stream.
Definition: stream.h:279
uvw::StreamHandle::write
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:341
uvw::StreamHandle::writeQueueSize
size_t writeQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: stream.h:437
uvw::StreamHandle::write
void write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Definition: stream.h:256