uvw  2.12.1
stream.h
1 #ifndef UVW_STREAM_INCLUDE_H
2 #define UVW_STREAM_INCLUDE_H
3 
4 #include <algorithm>
5 #include <cstddef>
6 #include <iterator>
7 #include <memory>
8 #include <utility>
9 #include <uv.h>
10 #include "handle.hpp"
11 #include "loop.h"
12 #include "request.hpp"
13 
14 namespace uvw {
15 
21 struct ConnectEvent {};
22 
28 struct EndEvent {};
29 
35 struct ListenEvent {};
36 
42 struct ShutdownEvent {};
43 
49 struct WriteEvent {};
50 
56 struct DataEvent {
57  explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept;
58 
59  std::unique_ptr<char[]> data;
60  std::size_t length;
61 };
62 
63 namespace details {
64 
65 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
66  using Request::Request;
67 
68  template<typename F, typename... Args>
69  void connect(F &&f, Args &&...args) {
70  invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
71  }
72 };
73 
74 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
75  using Request::Request;
76 
77  void shutdown(uv_stream_t *handle);
78 };
79 
80 template<typename Deleter>
81 class WriteReq final: public Request<WriteReq<Deleter>, uv_write_t> {
82  using ConstructorAccess = typename Request<WriteReq<Deleter>, uv_write_t>::ConstructorAccess;
83 
84 public:
85  WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
86  : Request<WriteReq<Deleter>, uv_write_t>{ca, std::move(loop)},
87  data{std::move(dt)},
88  buf{uv_buf_init(data.get(), len)} {}
89 
90  void write(uv_stream_t *handle) {
91  this->invoke(&uv_write, this->get(), handle, &buf, 1, &this->template defaultCallback<WriteEvent>);
92  }
93 
94  void write(uv_stream_t *handle, uv_stream_t *send) {
95  this->invoke(&uv_write2, this->get(), handle, &buf, 1, send, &this->template defaultCallback<WriteEvent>);
96  }
97 
98 private:
99  std::unique_ptr<char[], Deleter> data;
100  uv_buf_t buf;
101 };
102 
103 } // namespace details
104 
112 template<typename T, typename U>
113 class StreamHandle: public Handle<T, U> {
114  static constexpr unsigned int DEFAULT_BACKLOG = 128;
115 
116  static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
117  T &ref = *(static_cast<T *>(handle->data));
118  // data will be destroyed no matter of what the value of nread is
119  std::unique_ptr<char[]> data{buf->base};
120 
121  // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
122  // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
123  // for we don't have data to emit though, it's fine to suppress it
124 
125  if(nread == UV_EOF) {
126  // end of stream
127  ref.publish(EndEvent{});
128  } else if(nread > 0) {
129  // data available
130  ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
131  } else if(nread < 0) {
132  // transmission error
133  ref.publish(ErrorEvent(nread));
134  }
135  }
136 
137  static void listenCallback(uv_stream_t *handle, int status) {
138  if(T &ref = *(static_cast<T *>(handle->data)); status) {
139  ref.publish(ErrorEvent{status});
140  } else {
141  ref.publish(ListenEvent{});
142  }
143  }
144 
145 public:
146 #ifdef _MSC_VER
147  StreamHandle(typename Handle<T, U>::ConstructorAccess ca, std::shared_ptr<Loop> ref)
148  : Handle<T, U>{ca, std::move(ref)} {}
149 #else
150  using Handle<T, U>::Handle;
151 #endif
152 
160  void shutdown() {
161  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
162  ptr->publish(event);
163  };
164 
165  auto shutdown = this->loop().template resource<details::ShutdownReq>();
166  shutdown->template once<ErrorEvent>(listener);
167  shutdown->template once<ShutdownEvent>(listener);
168  shutdown->shutdown(this->template get<uv_stream_t>());
169  }
170 
181  void listen(int backlog = DEFAULT_BACKLOG) {
182  this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
183  }
184 
204  template<typename S>
205  void accept(S &ref) {
206  this->invoke(&uv_accept, this->template get<uv_stream_t>(), this->template get<uv_stream_t>(ref));
207  }
208 
216  void read() {
217  this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
218  }
219 
225  void stop() {
226  this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
227  }
228 
241  template<typename Deleter>
242  void write(std::unique_ptr<char[], Deleter> data, unsigned int len) {
243  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
244  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
245  ptr->publish(event);
246  };
247 
248  req->template once<ErrorEvent>(listener);
249  req->template once<WriteEvent>(listener);
250  req->write(this->template get<uv_stream_t>());
251  }
252 
265  void write(char *data, unsigned int len) {
266  auto req = this->loop().template resource<details::WriteReq<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
267  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
268  ptr->publish(event);
269  };
270 
271  req->template once<ErrorEvent>(listener);
272  req->template once<WriteEvent>(listener);
273  req->write(this->template get<uv_stream_t>());
274  }
275 
295  template<typename S, typename Deleter>
296  void write(S &send, std::unique_ptr<char[], Deleter> data, unsigned int len) {
297  auto req = this->loop().template resource<details::WriteReq<Deleter>>(std::move(data), len);
298  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
299  ptr->publish(event);
300  };
301 
302  req->template once<ErrorEvent>(listener);
303  req->template once<WriteEvent>(listener);
304  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
305  }
306 
326  template<typename S>
327  void write(S &send, char *data, unsigned int len) {
328  auto req = this->loop().template resource<details::WriteReq<void (*)(char *)>>(std::unique_ptr<char[], void (*)(char *)>{data, [](char *) {}}, len);
329  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
330  ptr->publish(event);
331  };
332 
333  req->template once<ErrorEvent>(listener);
334  req->template once<WriteEvent>(listener);
335  req->write(this->template get<uv_stream_t>(), this->template get<uv_stream_t>(send));
336  }
337 
349  int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
350  uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
351  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
352 
353  if(bw < 0) {
354  this->publish(ErrorEvent{bw});
355  bw = 0;
356  }
357 
358  return bw;
359  }
360 
372  template<typename V, typename W>
373  int tryWrite(std::unique_ptr<char[]> data, unsigned int len, StreamHandle<V, W> &send) {
374  uv_buf_t bufs[] = {uv_buf_init(data.get(), len)};
375  auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
376 
377  if(bw < 0) {
378  this->publish(ErrorEvent{bw});
379  bw = 0;
380  }
381 
382  return bw;
383  }
384 
396  int tryWrite(char *data, unsigned int len) {
397  uv_buf_t bufs[] = {uv_buf_init(data, len)};
398  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
399 
400  if(bw < 0) {
401  this->publish(ErrorEvent{bw});
402  bw = 0;
403  }
404 
405  return bw;
406  }
407 
419  template<typename V, typename W>
420  int tryWrite(char *data, unsigned int len, StreamHandle<V, W> &send) {
421  uv_buf_t bufs[] = {uv_buf_init(data, len)};
422  auto bw = uv_try_write2(this->template get<uv_stream_t>(), bufs, 1, send.raw());
423 
424  if(bw < 0) {
425  this->publish(ErrorEvent{bw});
426  bw = 0;
427  }
428 
429  return bw;
430  }
431 
436  bool readable() const noexcept {
437  return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
438  }
439 
444  bool writable() const noexcept {
445  return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
446  }
447 
463  bool blocking(bool enable = false) {
464  return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
465  }
466 
471  size_t writeQueueSize() const noexcept {
472  return uv_stream_get_write_queue_size(this->template get<uv_stream_t>());
473  }
474 };
475 
476 } // namespace uvw
477 
478 #ifndef UVW_AS_LIB
479 # include "stream.cpp"
480 #endif
481 
482 #endif // UVW_STREAM_INCLUDE_H
Handle base class.
Definition: handle.hpp:26
Request base class.
Definition: request.hpp:18
std::shared_ptr< R > data() const
Gets user-defined data. uvw won't use this field in any case.
Definition: resource.hpp:48
The StreamHandle handle.
Definition: stream.h:113
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:327
void write(S &send, std::unique_ptr< char[], Deleter > data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.h:296
void read()
Starts reading data from an incoming stream.
Definition: stream.h:216
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition: stream.h:160
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:349
bool writable() const noexcept
Checks if the stream is writable.
Definition: stream.h:444
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition: stream.h:463
int tryWrite(char *data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:420
size_t writeQueueSize() const noexcept
Gets the amount of queued bytes waiting to be sent.
Definition: stream.h:471
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.h:396
bool readable() const noexcept
Checks if the stream is readable.
Definition: stream.h:436
void stop()
Stops reading data from the stream.
Definition: stream.h:225
void write(char *data, unsigned int len)
Writes data to the stream.
Definition: stream.h:265
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition: stream.h:181
void accept(S &ref)
Accepts incoming connections.
Definition: stream.h:205
int tryWrite(std::unique_ptr< char[]> data, unsigned int len, StreamHandle< V, W > &send)
Queues a write request if it can be completed immediately.
Definition: stream.h:373
void write(std::unique_ptr< char[], Deleter > data, unsigned int len)
Writes data to the stream.
Definition: stream.h:242
Loop & loop() const noexcept
Gets the loop from which the resource was originated.
const U * raw() const noexcept
Gets the underlying raw data structure.
uvw default namespace.
Definition: async.h:8
ConnectEvent event.
Definition: stream.h:21
DataEvent event.
Definition: stream.h:56
std::unique_ptr< char[]> data
Definition: stream.h:59
std::size_t length
Definition: stream.h:60
EndEvent event.
Definition: stream.h:28
The ErrorEvent event.
Definition: emitter.h:22
ListenEvent event.
Definition: stream.h:35
ShutdownEvent event.
Definition: stream.h:42
WriteEvent event.
Definition: stream.h:49