uvw  1.3.0
stream.hpp
1 #pragma once
2 
3 
4 #include <algorithm>
5 #include <iterator>
6 #include <cstddef>
7 #include <utility>
8 #include <memory>
9 #include <uv.h>
10 #include "request.hpp"
11 #include "handle.hpp"
12 #include "loop.hpp"
13 
14 
15 namespace uvw {
16 
17 
23 struct ConnectEvent {};
24 
25 
31 struct EndEvent {};
32 
33 
39 struct ListenEvent {};
40 
41 
47 struct ShutdownEvent {};
48 
49 
55 struct WriteEvent {};
56 
57 
63 struct DataEvent {
64  explicit DataEvent(std::unique_ptr<char[]> buf, std::size_t len) noexcept
65  : data{std::move(buf)}, length{len}
66  {}
67 
68  std::unique_ptr<char[]> data;
69  std::size_t length;
70 };
71 
72 
73 namespace details {
74 
75 
76 struct ConnectReq final: public Request<ConnectReq, uv_connect_t> {
77  using Request::Request;
78 
79  template<typename F, typename... Args>
80  void connect(F &&f, Args&&... args) {
81  invoke(std::forward<F>(f), get(), std::forward<Args>(args)..., &defaultCallback<ConnectEvent>);
82  }
83 };
84 
85 
86 struct ShutdownReq final: public Request<ShutdownReq, uv_shutdown_t> {
87  using Request::Request;
88 
89  void shutdown(uv_stream_t *handle) {
90  invoke(&uv_shutdown, get(), handle, &defaultCallback<ShutdownEvent>);
91  }
92 };
93 
94 
95 class WriteReq final: public Request<WriteReq, uv_write_t> {
96 public:
97  using Deleter = void(*)(char *);
98 
99  WriteReq(ConstructorAccess ca, std::shared_ptr<Loop> loop, std::unique_ptr<char[], Deleter> dt, unsigned int len)
100  : Request<WriteReq, uv_write_t>{ca, std::move(loop)},
101  data{std::move(dt)},
102  buf{uv_buf_init(data.get(), len)}
103  {}
104 
105  void write(uv_stream_t *handle) {
106  invoke(&uv_write, get(), handle, &buf, 1, &defaultCallback<WriteEvent>);
107  }
108 
109  void write(uv_stream_t *handle, uv_stream_t *send) {
110  invoke(&uv_write2, get(), handle, &buf, 1, send, &defaultCallback<WriteEvent>);
111  }
112 
113 private:
114  std::unique_ptr<char[], Deleter> data;
115  uv_buf_t buf;
116 };
117 
118 
119 }
120 
121 
129 template<typename T, typename U>
130 class StreamHandle: public Handle<T, U> {
131  static constexpr unsigned int DEFAULT_BACKLOG = 128;
132 
133  static void readCallback(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) {
134  T &ref = *(static_cast<T*>(handle->data));
135  // data will be destroyed no matter of what the value of nread is
136  std::unique_ptr<char[]> data{buf->base};
137 
138  // nread == 0 is ignored (see http://docs.libuv.org/en/v1.x/stream.html)
139  // equivalent to EAGAIN/EWOULDBLOCK, it shouldn't be treated as an error
140  // for we don't have data to emit though, it's fine to suppress it
141 
142  if(nread == UV_EOF) {
143  // end of stream
144  ref.publish(EndEvent{});
145  } else if(nread > 0) {
146  // data available
147  ref.publish(DataEvent{std::move(data), static_cast<std::size_t>(nread)});
148  } else if(nread < 0) {
149  // transmission error
150  ref.publish(ErrorEvent(nread));
151  }
152  }
153 
154  static void listenCallback(uv_stream_t *handle, int status) {
155  T &ref = *(static_cast<T*>(handle->data));
156  if(status) { ref.publish(ErrorEvent{status}); }
157  else { ref.publish(ListenEvent{}); }
158  }
159 
160 public:
161 #ifdef _MSC_VER
162  StreamHandle(ConstructorAccess ca, std::shared_ptr<Loop> ref)
163  : Handle{ca, std::move(ref)}
164  {}
165 #else
166  using Handle<T, U>::Handle;
167 #endif
168 
176  void shutdown() {
177  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
178  ptr->publish(event);
179  };
180 
181  auto shutdown = this->loop().template resource<details::ShutdownReq>();
182  shutdown->template once<ErrorEvent>(listener);
183  shutdown->template once<ShutdownEvent>(listener);
184  shutdown->shutdown(this->template get<uv_stream_t>());
185  }
186 
197  void listen(int backlog = DEFAULT_BACKLOG) {
198  this->invoke(&uv_listen, this->template get<uv_stream_t>(), backlog, &listenCallback);
199  }
200 
219  template<typename S>
220  void accept(S &ref) {
221  this->invoke(&uv_accept, this->template get<uv_stream_t>(), ref.template get<uv_stream_t>());
222  }
223 
231  void read() {
232  this->invoke(&uv_read_start, this->template get<uv_stream_t>(), &this->allocCallback, &readCallback);
233  }
234 
240  void stop() {
241  this->invoke(&uv_read_stop, this->template get<uv_stream_t>());
242  }
243 
256  void write(std::unique_ptr<char[]> data, unsigned int len) {
257  auto req = this->loop().template resource<details::WriteReq>(
258  std::unique_ptr<char[], details::WriteReq::Deleter>{
259  data.release(), [](char *ptr) { delete[] ptr; }
260  }, len);
261 
262  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
263  ptr->publish(event);
264  };
265 
266  req->template once<ErrorEvent>(listener);
267  req->template once<WriteEvent>(listener);
268  req->write(this->template get<uv_stream_t>());
269  }
270 
283  void write(char *data, unsigned int len) {
284  auto req = this->loop().template resource<details::WriteReq>(
285  std::unique_ptr<char[], details::WriteReq::Deleter>{
286  data, [](char *) {}
287  }, len);
288 
289  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
290  ptr->publish(event);
291  };
292 
293  req->template once<ErrorEvent>(listener);
294  req->template once<WriteEvent>(listener);
295  req->write(this->template get<uv_stream_t>());
296  }
297 
317  template<typename S>
318  void write(S &send, std::unique_ptr<char[]> data, unsigned int len) {
319  auto req = this->loop().template resource<details::WriteReq>(
320  std::unique_ptr<char[], details::WriteReq::Deleter>{
321  data.release(), [](char *ptr) { delete[] ptr; }
322  }, len);
323 
324  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
325  ptr->publish(event);
326  };
327 
328  req->template once<ErrorEvent>(listener);
329  req->template once<WriteEvent>(listener);
330  req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());
331  }
332 
352  template<typename S>
353  void write(S &send, char *data, unsigned int len) {
354  auto req = this->loop().template resource<details::WriteReq>(
355  std::unique_ptr<char[], details::WriteReq::Deleter>{
356  data, [](char *) {}
357  }, len);
358 
359  auto listener = [ptr = this->shared_from_this()](const auto &event, const auto &) {
360  ptr->publish(event);
361  };
362 
363  req->template once<ErrorEvent>(listener);
364  req->template once<WriteEvent>(listener);
365  req->write(this->template get<uv_stream_t>(), send.template get<uv_stream_t>());
366  }
367 
379  int tryWrite(std::unique_ptr<char[]> data, unsigned int len) {
380  uv_buf_t bufs[] = { uv_buf_init(data.get(), len) };
381  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
382 
383  if(bw < 0) {
384  this->publish(ErrorEvent{bw});
385  bw = 0;
386  }
387 
388  return bw;
389  }
390 
402  int tryWrite(char *data, unsigned int len) {
403  uv_buf_t bufs[] = { uv_buf_init(data, len) };
404  auto bw = uv_try_write(this->template get<uv_stream_t>(), bufs, 1);
405 
406  if(bw < 0) {
407  this->publish(ErrorEvent{bw});
408  bw = 0;
409  }
410 
411  return bw;
412  }
413 
418  bool readable() const noexcept {
419  return (uv_is_readable(this->template get<uv_stream_t>()) == 1);
420  }
421 
426  bool writable() const noexcept {
427  return (uv_is_writable(this->template get<uv_stream_t>()) == 1);
428  }
429 
445  bool blocking(bool enable = false) {
446  return (0 == uv_stream_set_blocking(this->template get<uv_stream_t>(), enable));
447  }
448 };
449 
450 
451 }
WriteEvent event.
Definition: stream.hpp:55
void write(S &send, char *data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.hpp:353
bool blocking(bool enable=false)
Enables or disables blocking mode for a stream.
Definition: stream.hpp:445
void read()
Starts reading data from an incoming stream.
Definition: stream.hpp:231
bool writable() const noexcept
Checks if the stream is writable.
Definition: stream.hpp:426
DataEvent event.
Definition: stream.hpp:63
std::size_t length
Definition: stream.hpp:69
void listen(int backlog=DEFAULT_BACKLOG)
Starts listening for incoming connections.
Definition: stream.hpp:197
void accept(S &ref)
Accepts incoming connections.
Definition: stream.hpp:220
Handle base class.
Definition: handle.hpp:29
ShutdownEvent event.
Definition: stream.hpp:47
void write(std::unique_ptr< char[]> data, unsigned int len)
Writes data to the stream.
Definition: stream.hpp:256
int tryWrite(char *data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.hpp:402
ConnectEvent event.
Definition: stream.hpp:23
void shutdown()
Shutdowns the outgoing (write) side of a duplex stream.
Definition: stream.hpp:176
The StreamHandle handle.
Definition: stream.hpp:130
std::unique_ptr< char[]> data
Definition: stream.hpp:68
EndEvent event.
Definition: stream.hpp:31
int tryWrite(std::unique_ptr< char[]> data, unsigned int len)
Queues a write request if it can be completed immediately.
Definition: stream.hpp:379
The ErrorEvent event.
Definition: emitter.hpp:23
ListenEvent event.
Definition: stream.hpp:39
void write(char *data, unsigned int len)
Writes data to the stream.
Definition: stream.hpp:283
void stop()
Stops reading data from the stream.
Definition: stream.hpp:240
void write(S &send, std::unique_ptr< char[]> data, unsigned int len)
Extended write function for sending handles over a pipe handle.
Definition: stream.hpp:318
bool readable() const noexcept
Checks if the stream is readable.
Definition: stream.hpp:418
uvw default namespace.
Definition: async.hpp:11