diff --git a/include/uv-unix.h b/include/uv-unix.h index 338092da..e918e4d4 100644 --- a/include/uv-unix.h +++ b/include/uv-unix.h @@ -55,6 +55,16 @@ typedef struct { #define UV_CONNECT_PRIVATE_FIELDS \ ngx_queue_t queue; +#define UV_UDP_SEND_PRIVATE_FIELDS \ + ngx_queue_t queue; \ + struct sockaddr_storage addr; \ + socklen_t addrlen; \ + uv_buf_t* bufs; \ + int bufcnt; \ + ssize_t status; \ + uv_udp_send_cb send_cb; \ + uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; \ + #define UV_PRIVATE_REQ_TYPES /* empty */ @@ -83,6 +93,16 @@ typedef struct { #define UV_TCP_PRIVATE_FIELDS +/* UV_UDP */ +#define UV_UDP_PRIVATE_FIELDS \ + uv_alloc_cb alloc_cb; \ + uv_udp_recv_cb recv_cb; \ + ev_io read_watcher; \ + ev_io write_watcher; \ + ngx_queue_t write_queue; \ + ngx_queue_t write_completed_queue; \ + + /* UV_NAMED_PIPE */ #define UV_PIPE_PRIVATE_TYPEDEF #define UV_PIPE_PRIVATE_FIELDS \ diff --git a/include/uv-win.h b/include/uv-win.h index 4a41e56d..07aeabb3 100644 --- a/include/uv-win.h +++ b/include/uv-win.h @@ -114,6 +114,8 @@ typedef struct uv_buf_t { struct { uv_tcp_connection_fields }; \ }; +#define UV_UDP_PRIVATE_FIELDS + #define uv_pipe_server_fields \ uv_pipe_accept_t accept_reqs[4]; \ uv_pipe_accept_t* pending_accepts; diff --git a/include/uv.h b/include/uv.h index 2e6016b8..040d423a 100644 --- a/include/uv.h +++ b/include/uv.h @@ -45,6 +45,7 @@ typedef struct uv_err_s uv_err_t; typedef struct uv_handle_s uv_handle_t; typedef struct uv_stream_s uv_stream_t; typedef struct uv_tcp_s uv_tcp_t; +typedef struct uv_udp_s uv_udp_t; typedef struct uv_pipe_s uv_pipe_t; typedef struct uv_timer_s uv_timer_t; typedef struct uv_prepare_s uv_prepare_t; @@ -58,6 +59,7 @@ typedef struct uv_req_s uv_req_t; typedef struct uv_shutdown_s uv_shutdown_t; typedef struct uv_write_s uv_write_t; typedef struct uv_connect_s uv_connect_t; +typedef struct uv_udp_send_s uv_udp_send_t; #if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__) # include "uv-unix.h" @@ -106,8 +108,8 @@ int64_t uv_now(); * In the case of uv_read_cb the uv_buf_t returned should be freed by the * user. */ -typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size); -typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf); +typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size); +typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); typedef void (*uv_write_cb)(uv_write_t* req, int status); typedef void (*uv_connect_cb)(uv_connect_t* req, int status); typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status); @@ -146,6 +148,7 @@ typedef enum { UV_EINVAL, UV_EISCONN, UV_EMFILE, + UV_EMSGSIZE, UV_ENETDOWN, UV_ENETUNREACH, UV_ENFILE, @@ -172,6 +175,7 @@ typedef enum { typedef enum { UV_UNKNOWN_HANDLE = 0, UV_TCP, + UV_UDP, UV_NAMED_PIPE, UV_TTY, UV_FILE, @@ -194,6 +198,7 @@ typedef enum { UV_WRITE, UV_SHUTDOWN, UV_WAKEUP, + UV_UDP_SEND, UV_REQ_TYPE_PRIVATE } uv_req_type; @@ -418,6 +423,188 @@ struct uv_connect_s { int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen); +/* + * UDP support. + */ + +/* + * Flag constants. Input flags are meant to be passed to `uv_udp_init()` + * and `uv_udp_init6()`. Output flags are passed to your `uv_udp_recv_cb` + * callback. + */ + +/* + * Input flag. Disables dual stack mode. Only valid for `uv_udp_init6()`. + */ +#define UV_UDP_IPV6ONLY (1 << 0) + +/* + * Output flag. The message has been truncated because + * the read buffer was too small. The remainder has been + * discarded by the operating system. + */ +#define UV_UDP_PARTIAL (1 << 10) + +/* + * Function prototype for the callback that is invoked + * when the UDP datagram has been sent. + * + * Arguments: + * req Request handle. + * status Status indicator. 0 on success, -1 on error. + */ +typedef void (*uv_udp_send_cb)(uv_udp_send_t* req, int status); + +/* + * Function prototype for the callback that is invoked when + * a new UDP datagram is received. + * + * Arguments: + * handle UDP handle. + * nread Number of bytes that have been received. + * 0 if there is no more data to read. You may + * discard or repurpose the read buffer. + * -1 if a transmission error was detected. + * buf uv_buf_t with the received data. + * addr struct sockaddr_in or struct sockaddr_in6. + * Valid for the duration of the callback only. + * flags One or more OR'ed UV_UDP_* constants. + * Right now only UV_UDP_PARTIAL is used. + */ +typedef void (*uv_udp_recv_cb)(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags); + +/* + * Subclass of uv_handle_t + */ +struct uv_udp_s { + UV_HANDLE_FIELDS + UV_UDP_PRIVATE_FIELDS +}; + +/* + * Subclass of uv_req_t + */ +struct uv_udp_send_s { + UV_REQ_FIELDS + uv_udp_t* handle; + UV_UDP_SEND_PRIVATE_FIELDS +}; + +/* + * Initialize a new UDP handle. The actual socket is created lazily. + * + * Arguments: + * handle UDP handle to initialize. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_init(uv_udp_t* handle); + +/* + * Bind to a IPv4 address and port. + * + * Arguments: + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * addr struct sockaddr_in with the address and port to bind to. + * flags Bind flags. One or more OR'ed UV_UDP_* constants. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags); + +/* + * Bind to a IPv6 address and port. + * + * Arguments: + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * addr struct sockaddr_in with the address and port to bind to. + * flags One or more OR'ed UV_UDP_* constants. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags); + +/* + * Send data. If the socket has not previously been bound with `uv_udp_bind` + * or `uv_udp_bind6`, it is bound to 0.0.0.0 (the "all interfaces" address) + * and a random port number. + * + * Arguments: + * req UDP request handle. Need not be initialized. + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * bufs List of buffers to send. + * bufcnt Number of buffers in `bufs`. + * addr Address of the remote peer. See `uv_ip4_addr`. + * send_cb Callback to invoke when the data has been sent out. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in addr, + uv_udp_send_cb send_cb); + +/* + * Send data. If the socket has not previously been bound with `uv_udp_bind6`, + * it is bound to :::0 (the "all interfaces" address) and a random port number. + * + * Arguments: + * req UDP request handle. Need not be initialized. + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * bufs List of buffers to send. + * bufcnt Number of buffers in `bufs`. + * addr Address of the remote peer. See `uv_ip6_addr`. + * send_cb Callback to invoke when the data has been sent out. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_send6(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in6 addr, + uv_udp_send_cb send_cb); + +/* + * Send data. If the socket has not previously been bound with `uv_udp_bind` + * or `uv_udp_bind6`, it is bound to 0.0.0.0 (the "all interfaces" address) + * and a random port number. + * + * Arguments: + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * alloc_cb Callback to invoke when temporary storage is needed. + * recv_cb Callback to invoke with received data. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_recv_start(uv_udp_t* handle, + uv_alloc_cb alloc_cb, + uv_udp_recv_cb recv_cb); + +/* + * Stop listening for incoming datagrams. + * + * Arguments: + * handle UDP handle. Should have been initialized with `uv_udp_init`. + * + * Returns: + * 0 on success, -1 on error. + */ +int uv_udp_recv_stop(uv_udp_t* handle); + + /* * uv_pipe_t is a subclass of uv_stream_t * @@ -699,6 +886,7 @@ typedef struct { uint64_t handle_init; uint64_t stream_init; uint64_t tcp_init; + uint64_t udp_init; uint64_t pipe_init; uint64_t prepare_init; uint64_t check_init; diff --git a/src/uv-unix.c b/src/uv-unix.c index 32a13e56..3044363b 100644 --- a/src/uv-unix.c +++ b/src/uv-unix.c @@ -103,6 +103,28 @@ static void uv__stream_connect(uv_stream_t*); static void uv__stream_io(EV_P_ ev_io* watcher, int revents); static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents); +static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w); +static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w); +static void uv__udp_run_completed(uv_udp_t* handle); +static void uv__udp_run_pending(uv_udp_t* handle); +static void uv__udp_destroy(uv_udp_t* handle); +static void uv__udp_recvmsg(uv_udp_t* handle); +static void uv__udp_sendmsg(uv_udp_t* handle); +static void uv__udp_io(EV_P_ ev_io* w, int events); +static int uv__udp_bind(uv_udp_t* handle, + int domain, + struct sockaddr* addr, + socklen_t len, + unsigned flags); +static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain); +static int uv__udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr* addr, + socklen_t addrlen, + uv_udp_send_cb send_cb); + #ifndef __GNUC__ #define __attribute__(a) #endif @@ -173,6 +195,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) { case ECONNRESET: return UV_ECONNRESET; case EFAULT: return UV_EFAULT; case EMFILE: return UV_EMFILE; + case EMSGSIZE: return UV_EMSGSIZE; case EINVAL: return UV_EINVAL; case ECONNREFUSED: return UV_ECONNREFUSED; case EADDRINUSE: return UV_EADDRINUSE; @@ -201,6 +224,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) { void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { + uv_udp_t* udp; uv_async_t* async; uv_timer_t* timer; uv_stream_t* stream; @@ -231,6 +255,14 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) { assert(!ev_is_active(&stream->write_watcher)); break; + case UV_UDP: + udp = (uv_udp_t*)handle; + uv__udp_watcher_stop(udp, &udp->read_watcher); + uv__udp_watcher_stop(udp, &udp->write_watcher); + uv__close(udp->fd); + udp->fd = -1; + break; + case UV_PREPARE: uv_prepare_stop((uv_prepare_t*) handle); break; @@ -305,6 +337,489 @@ static void uv__handle_init(uv_handle_t* handle, uv_handle_type type) { } +static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) { + int flags; + + assert(w == &handle->read_watcher + || w == &handle->write_watcher); + + flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); + + w->data = handle; + ev_set_cb(w, uv__udp_io); + ev_io_set(w, handle->fd, flags); + ev_io_start(EV_DEFAULT_UC_ w); +} + + +static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) { + int flags; + + assert(w == &handle->read_watcher + || w == &handle->write_watcher); + + flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE); + + ev_io_stop(EV_DEFAULT_UC_ w); + ev_io_set(w, -1, flags); + ev_set_cb(w, NULL); + w->data = (void*)0xDEADBABE; +} + + +static void uv__udp_destroy(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + + uv__udp_run_completed(handle); + + while (!ngx_queue_empty(&handle->write_queue)) { + q = ngx_queue_head(&handle->write_queue); + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + if (req->send_cb) { + /* FIXME proper error code like UV_EABORTED */ + uv_err_new_artificial((uv_handle_t*)handle, UV_EINTR); + req->send_cb(req, -1); + } + } + + /* Now tear down the handle. */ + handle->flags = 0; + handle->recv_cb = NULL; + handle->alloc_cb = NULL; + /* but _do not_ touch close_cb */ + + if (handle->fd != -1) { + uv__close(handle->fd); + handle->fd = -1; + } + + uv__udp_watcher_stop(handle, &handle->read_watcher); + uv__udp_watcher_stop(handle, &handle->write_watcher); +} + + +static void uv__udp_run_pending(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + struct msghdr h; + ssize_t size; + + while (!ngx_queue_empty(&handle->write_queue)) { + q = ngx_queue_head(&handle->write_queue); + assert(q != NULL); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + assert(req != NULL); + + memset(&h, 0, sizeof h); + h.msg_name = &req->addr; + h.msg_namelen = req->addrlen; + h.msg_iov = (struct iovec*)req->bufs; + h.msg_iovlen = req->bufcnt; + + do { + size = sendmsg(handle->fd, &h, 0); + } + while (size == -1 && errno == EINTR); + + /* TODO try to write once or twice more in the + * hope that the socket becomes readable again? + */ + if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) + break; + + req->status = (size == -1 ? -errno : size); + +#ifndef NDEBUG + /* Sanity check. */ + if (size != -1) { + ssize_t nbytes; + int i; + + for (nbytes = i = 0; i < req->bufcnt; i++) + nbytes += req->bufs[i].len; + + assert(size == nbytes); + } +#endif + + /* Sending a datagram is an atomic operation: either all data + * is written or nothing is (and EMSGSIZE is raised). That is + * why we don't handle partial writes. Just pop the request + * off the write queue and onto the completed queue, done. + */ + ngx_queue_remove(&req->queue); + ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue); + } +} + + +static void uv__udp_run_completed(uv_udp_t* handle) { + uv_udp_send_t* req; + ngx_queue_t* q; + + while (!ngx_queue_empty(&handle->write_completed_queue)) { + q = ngx_queue_head(&handle->write_completed_queue); + assert(q != NULL); + + ngx_queue_remove(q); + + req = ngx_queue_data(q, uv_udp_send_t, queue); + assert(req != NULL); + + if (req->bufs != req->bufsml) + free(req->bufs); + + if (req->send_cb == NULL) + continue; + + /* req->status >= 0 == bytes written + * req->status < 0 == errno + */ + if (req->status >= 0) { + req->send_cb(req, 0); + } + else { + uv_err_new((uv_handle_t*)handle, -req->status); + req->send_cb(req, -1); + } + } +} + + +static void uv__udp_recvmsg(uv_udp_t* handle) { + struct sockaddr_storage peer; + struct msghdr h; + ssize_t nread; + uv_buf_t buf; + int flags; + + assert(handle->recv_cb != NULL); + assert(handle->alloc_cb != NULL); + + do { + /* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */ + buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024); + assert(buf.len > 0); + assert(buf.base != NULL); + + memset(&h, 0, sizeof h); + h.msg_name = &peer; + h.msg_namelen = sizeof peer; + h.msg_iov = (struct iovec*)&buf; + h.msg_iovlen = 1; + + do { + nread = recvmsg(handle->fd, &h, 0); + } + while (nread == -1 && errno == EINTR); + + if (nread == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + uv_err_new((uv_handle_t*)handle, EAGAIN); + handle->recv_cb(handle, 0, buf, NULL, 0); + } + else { + uv_err_new((uv_handle_t*)handle, errno); + handle->recv_cb(handle, -1, buf, NULL, 0); + } + } + else { + flags = 0; + + if (h.msg_flags & MSG_TRUNC) + flags |= UV_UDP_PARTIAL; + + handle->recv_cb(handle, + nread, + buf, + (struct sockaddr*)&peer, + flags); + } + } + /* recv_cb callback may decide to pause or close the handle */ + while (nread != -1 + && handle->fd != -1 + && handle->recv_cb != NULL); +} + + +static void uv__udp_sendmsg(uv_udp_t* handle) { + assert(!ngx_queue_empty(&handle->write_queue) + || !ngx_queue_empty(&handle->write_completed_queue)); + + /* Write out pending data first. */ + uv__udp_run_pending(handle); + + /* Drain 'request completed' queue. */ + uv__udp_run_completed(handle); + + if (!ngx_queue_empty(&handle->write_completed_queue)) { + /* Schedule completion callbacks. */ + ev_feed_event(EV_DEFAULT_ &handle->write_watcher, EV_WRITE); + } + else if (ngx_queue_empty(&handle->write_queue)) { + /* Pending queue and completion queue empty, stop watcher. */ + uv__udp_watcher_stop(handle, &handle->write_watcher); + } +} + + +static void uv__udp_io(EV_P_ ev_io* w, int events) { + uv_udp_t* handle; + + handle = w->data; + assert(handle != NULL); + assert(handle->type == UV_UDP); + assert(handle->fd >= 0); + assert(!(events & ~(EV_READ|EV_WRITE))); + + if (events & EV_READ) + uv__udp_recvmsg(handle); + + if (events & EV_WRITE) + uv__udp_sendmsg(handle); +} + + +static int uv__udp_bind(uv_udp_t* handle, + int domain, + struct sockaddr* addr, + socklen_t len, + unsigned flags) { + int saved_errno; + int status; + int yes; + int fd; + + saved_errno = errno; + status = -1; + + /* Check for bad flags. */ + if (flags & ~UV_UDP_IPV6ONLY) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + /* Cannot set IPv6-only mode on non-IPv6 socket. */ + if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) { + uv_err_new((uv_handle_t*)handle, EINVAL); + goto out; + } + + /* Check for already active socket. */ + if (handle->fd != -1) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY); + goto out; + } + + if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + if (flags & UV_UDP_IPV6ONLY) { +#ifdef IPV6_V6ONLY + yes = 1; + if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } +#else + uv_err_new((uv_handle_t*)handle, ENOTSUP); + goto out; +#endif + } + + if (bind(fd, addr, len) == -1) { + uv_err_new((uv_handle_t*)handle, errno); + goto out; + } + + handle->fd = fd; + status = 0; + +out: + if (status) + uv__close(fd); + + errno = saved_errno; + return status; +} + + +static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) { + struct sockaddr_storage taddr; + socklen_t addrlen; + + assert(domain == AF_INET || domain == AF_INET6); + + if (handle->fd != -1) + return 0; + + switch (domain) { + case AF_INET: + { + struct sockaddr_in* addr = (void*)&taddr; + memset(addr, 0, sizeof *addr); + addr->sin_family = AF_INET; + addr->sin_addr.s_addr = INADDR_ANY; + addrlen = sizeof *addr; + break; + } + case AF_INET6: + { + struct sockaddr_in6* addr = (void*)&taddr; + memset(addr, 0, sizeof *addr); + addr->sin6_family = AF_INET6; + addr->sin6_addr = in6addr_any; + addrlen = sizeof *addr; + break; + } + default: + assert(0 && "unsupported address family"); + abort(); + } + + return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0); +} + + +static int uv__udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr* addr, + socklen_t addrlen, + uv_udp_send_cb send_cb) { + if (uv__udp_maybe_deferred_bind(handle, addr->sa_family)) + return -1; + + /* Don't use uv__req_init(), it zeroes the data field. */ + uv_counters()->req_init++; + + memcpy(&req->addr, addr, addrlen); + req->addrlen = addrlen; + req->send_cb = send_cb; + req->handle = handle; + req->bufcnt = bufcnt; + req->type = UV_UDP_SEND; + + if (bufcnt <= UV_REQ_BUFSML_SIZE) { + req->bufs = req->bufsml; + } + else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) { + uv_err_new((uv_handle_t*)handle, ENOMEM); + return -1; + } + memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0])); + + ngx_queue_insert_tail(&handle->write_queue, &req->queue); + uv__udp_watcher_start(handle, &handle->write_watcher); + + return 0; +} + + +int uv_udp_init(uv_udp_t* handle) { + memset(handle, 0, sizeof *handle); + + uv__handle_init((uv_handle_t*)handle, UV_UDP); + uv_counters()->udp_init++; + + handle->fd = -1; + ngx_queue_init(&handle->write_queue); + ngx_queue_init(&handle->write_completed_queue); + + return 0; +} + + +int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) { + return uv__udp_bind(handle, + AF_INET, + (struct sockaddr*)&addr, + sizeof addr, + flags); +} + + +int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) { + return uv__udp_bind(handle, + AF_INET6, + (struct sockaddr*)&addr, + sizeof addr, + flags); +} + + +int uv_udp_send(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in addr, + uv_udp_send_cb send_cb) { + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*)&addr, + sizeof addr, + send_cb); +} + + +int uv_udp_send6(uv_udp_send_t* req, + uv_udp_t* handle, + uv_buf_t bufs[], + int bufcnt, + struct sockaddr_in6 addr, + uv_udp_send_cb send_cb) { + return uv__udp_send(req, + handle, + bufs, + bufcnt, + (struct sockaddr*)&addr, + sizeof addr, + send_cb); +} + + +int uv_udp_recv_start(uv_udp_t* handle, + uv_alloc_cb alloc_cb, + uv_udp_recv_cb recv_cb) { + if (alloc_cb == NULL || recv_cb == NULL) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EINVAL); + return -1; + } + + if (ev_is_active(&handle->read_watcher)) { + uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY); + return -1; + } + + if (uv__udp_maybe_deferred_bind(handle, AF_INET)) + return -1; + + handle->alloc_cb = alloc_cb; + handle->recv_cb = recv_cb; + uv__udp_watcher_start(handle, &handle->read_watcher); + + return 0; +} + + +int uv_udp_recv_stop(uv_udp_t* handle) { + uv__udp_watcher_stop(handle, &handle->read_watcher); + handle->alloc_cb = NULL; + handle->recv_cb = NULL; + return 0; +} + + int uv_tcp_init(uv_tcp_t* tcp) { uv__handle_init((uv_handle_t*)tcp, UV_TCP); uv_counters()->tcp_init++; @@ -332,8 +847,10 @@ int uv_tcp_init(uv_tcp_t* tcp) { } -static int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr, - int addrsize) { +static int uv__tcp_bind(uv_tcp_t* tcp, + int domain, + struct sockaddr* addr, + int addrsize) { int saved_errno; int status; @@ -379,8 +896,10 @@ int uv_tcp_bind(uv_tcp_t* tcp, struct sockaddr_in addr) { return -1; } - return uv__bind(tcp, AF_INET, (struct sockaddr*)&addr, - sizeof(struct sockaddr_in)); + return uv__tcp_bind(tcp, + AF_INET, + (struct sockaddr*)&addr, + sizeof(struct sockaddr_in)); } @@ -390,8 +909,10 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) { return -1; } - return uv__bind(tcp, AF_INET6, (struct sockaddr*)&addr, - sizeof(struct sockaddr_in6)); + return uv__tcp_bind(tcp, + AF_INET6, + (struct sockaddr*)&addr, + sizeof(struct sockaddr_in6)); } @@ -590,6 +1111,13 @@ void uv__finish_close(uv_handle_t* handle) { assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher)); break; + case UV_UDP: + assert(!ev_is_active(&((uv_udp_t*)handle)->read_watcher)); + assert(!ev_is_active(&((uv_udp_t*)handle)->write_watcher)); + assert(((uv_udp_t*)handle)->fd == -1); + uv__udp_destroy((uv_udp_t*)handle); + break; + case UV_PROCESS: assert(!ev_is_active(&((uv_process_t*)handle)->child_watcher)); break; @@ -823,7 +1351,7 @@ static void uv__read(uv_stream_t* stream) { */ while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { assert(stream->alloc_cb); - buf = stream->alloc_cb(stream, 64 * 1024); + buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); assert(buf.len > 0); assert(buf.base); diff --git a/src/win/pipe.c b/src/win/pipe.c index 8521663c..39110e66 100644 --- a/src/win/pipe.c +++ b/src/win/pipe.c @@ -773,7 +773,7 @@ void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) { break; } - buf = handle->alloc_cb((uv_stream_t*)handle, avail); + buf = handle->alloc_cb((uv_handle_t*) handle, avail); assert(buf.len > 0); if (ReadFile(handle->handle, diff --git a/src/win/tcp.c b/src/win/tcp.c index cdde45b3..45b8a1e8 100644 --- a/src/win/tcp.c +++ b/src/win/tcp.c @@ -291,7 +291,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) { */ if (active_tcp_streams < uv_active_tcp_streams_threshold) { handle->flags &= ~UV_HANDLE_ZERO_READ; - handle->read_buffer = handle->alloc_cb((uv_stream_t*)handle, 65536); + handle->read_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536); assert(handle->read_buffer.len > 0); buf = handle->read_buffer; } else { @@ -659,7 +659,7 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) { /* Do nonblocking reads until the buffer is empty */ while (handle->flags & UV_HANDLE_READING) { - buf = handle->alloc_cb((uv_stream_t*)handle, 65536); + buf = handle->alloc_cb((uv_handle_t*) handle, 65536); assert(buf.len > 0); flags = 0; if (WSARecv(handle->socket, diff --git a/test/benchmark-list.h b/test/benchmark-list.h index c05f37f7..0e5467c7 100644 --- a/test/benchmark-list.h +++ b/test/benchmark-list.h @@ -29,6 +29,16 @@ BENCHMARK_DECLARE (tcp_pump100_client) BENCHMARK_DECLARE (tcp_pump1_client) BENCHMARK_DECLARE (pipe_pump100_client) BENCHMARK_DECLARE (pipe_pump1_client) +BENCHMARK_DECLARE (udp_packet_storm_1v1) +BENCHMARK_DECLARE (udp_packet_storm_1v10) +BENCHMARK_DECLARE (udp_packet_storm_1v100) +BENCHMARK_DECLARE (udp_packet_storm_1v1000) +BENCHMARK_DECLARE (udp_packet_storm_10v10) +BENCHMARK_DECLARE (udp_packet_storm_10v100) +BENCHMARK_DECLARE (udp_packet_storm_10v1000) +BENCHMARK_DECLARE (udp_packet_storm_100v100) +BENCHMARK_DECLARE (udp_packet_storm_100v1000) +BENCHMARK_DECLARE (udp_packet_storm_1000v1000) BENCHMARK_DECLARE (gethostbyname) BENCHMARK_DECLARE (getaddrinfo) BENCHMARK_DECLARE (spawn) @@ -68,6 +78,17 @@ TASK_LIST_START BENCHMARK_ENTRY (pipe_pound_1000) BENCHMARK_HELPER (pipe_pound_1000, pipe_echo_server) + BENCHMARK_ENTRY (udp_packet_storm_1v1) + BENCHMARK_ENTRY (udp_packet_storm_1v10) + BENCHMARK_ENTRY (udp_packet_storm_1v100) + BENCHMARK_ENTRY (udp_packet_storm_1v1000) + BENCHMARK_ENTRY (udp_packet_storm_10v10) + BENCHMARK_ENTRY (udp_packet_storm_10v100) + BENCHMARK_ENTRY (udp_packet_storm_10v1000) + BENCHMARK_ENTRY (udp_packet_storm_100v100) + BENCHMARK_ENTRY (udp_packet_storm_100v1000) + BENCHMARK_ENTRY (udp_packet_storm_1000v1000) + BENCHMARK_ENTRY (gethostbyname) BENCHMARK_HELPER (gethostbyname, dns_server) diff --git a/test/benchmark-ping-pongs.c b/test/benchmark-ping-pongs.c index 53cea782..80567239 100644 --- a/test/benchmark-ping-pongs.c +++ b/test/benchmark-ping-pongs.c @@ -52,7 +52,7 @@ static int completed_pingers = 0; static int64_t start_time; -static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) { +static uv_buf_t buf_alloc(uv_handle_t* tcp, size_t size) { buf_t* ab; ab = buf_freelist; diff --git a/test/benchmark-pound.c b/test/benchmark-pound.c index a5e38ac3..bae5e46f 100644 --- a/test/benchmark-pound.c +++ b/test/benchmark-pound.c @@ -72,13 +72,13 @@ static uint64_t start; /* in ms */ static int closed_streams; static int conns_failed; -static uv_buf_t alloc_cb(uv_stream_t* stream, size_t suggested_size); +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size); static void connect_cb(uv_connect_t* conn_req, int status); static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf); static void close_cb(uv_handle_t* handle); -static uv_buf_t alloc_cb(uv_stream_t* stream, size_t suggested_size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { static char slab[65536]; uv_buf_t buf; buf.base = slab; diff --git a/test/benchmark-pump.c b/test/benchmark-pump.c index c6cd2d14..7713a585 100644 --- a/test/benchmark-pump.c +++ b/test/benchmark-pump.c @@ -41,7 +41,7 @@ static void maybe_connect_some(); static uv_req_t* req_alloc(); static void req_free(uv_req_t* uv_req); -static uv_buf_t buf_alloc(uv_stream_t*, size_t size); +static uv_buf_t buf_alloc(uv_handle_t*, size_t size); static void buf_free(uv_buf_t uv_buf_t); @@ -337,7 +337,7 @@ typedef struct buf_list_s { static buf_list_t* buf_freelist = NULL; -static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) { +static uv_buf_t buf_alloc(uv_handle_t* handle, size_t size) { buf_list_t* buf; buf = buf_freelist; diff --git a/test/benchmark-spawn.c b/test/benchmark-spawn.c index 0297c74f..46ddca0e 100644 --- a/test/benchmark-spawn.c +++ b/test/benchmark-spawn.c @@ -69,7 +69,7 @@ static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { } -uv_buf_t on_alloc(uv_stream_t* tcp, size_t suggested_size) { +uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = output + output_used; buf.len = OUTPUT_SIZE - output_used; diff --git a/test/benchmark-udp-packet-storm.c b/test/benchmark-udp-packet-storm.c new file mode 100644 index 00000000..e2c7ad35 --- /dev/null +++ b/test/benchmark-udp-packet-storm.c @@ -0,0 +1,245 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "task.h" +#include "uv.h" + +#include +#include +#include + +#define EXPECTED "RANG TANG DING DONG I AM THE JAPANESE SANDMAN" /* "Take eight!" */ + +#define TEST_DURATION 5000 /* ms */ + +#define MAX_SENDERS 1000 +#define MAX_RECEIVERS 1000 + +#define BASE_PORT 12345 + +#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0])) + +static int n_senders_; +static int n_receivers_; +static uv_udp_t senders[MAX_SENDERS]; +static uv_udp_t receivers[MAX_RECEIVERS]; +static uv_buf_t bufs[5]; + +static int send_cb_called; +static int recv_cb_called; +static int close_cb_called; + +typedef struct { + struct sockaddr_in addr; +} sender_state_t; + + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char slab[65536]; + ASSERT(suggested_size <= sizeof slab); + return uv_buf_init(slab, sizeof slab); +} + + +static void send_cb(uv_udp_send_t* req, int status) { + sender_state_t* ss; + int r; + + if (status == -1) { + ASSERT(uv_last_error().code == UV_EINTR); /* FIXME change error code */ + return; + } + + ASSERT(req != NULL); + ASSERT(status == 0); + + ss = req->data; + + r = uv_udp_send(req, req->handle, bufs, ARRAY_SIZE(bufs), ss->addr, send_cb); + ASSERT(r == 0); + + req->data = ss; + + send_cb_called++; +} + + +static void recv_cb(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + if (nread == 0) + return; + + if (nread == -1) { + ASSERT(uv_last_error().code == UV_EINTR); /* FIXME change error code */ + return; + } + + ASSERT(addr->sa_family == AF_INET); + ASSERT(!memcmp(buf.base, EXPECTED, nread)); + + recv_cb_called++; +} + + +static void close_cb(uv_handle_t* handle) { + ASSERT(handle != NULL); + close_cb_called++; +} + + +static void timeout_cb(uv_timer_t* timer, int status) { + int i; + + for (i = 0; i < n_senders_; i++) + uv_close((uv_handle_t*)&senders[i], close_cb); + + for (i = 0; i < n_receivers_; i++) + uv_close((uv_handle_t*)&receivers[i], close_cb); +} + + +static int do_packet_storm(int n_senders, int n_receivers) { + uv_timer_t timeout; + sender_state_t *ss; + uv_udp_send_t* req; + uv_udp_t* handle; + int i; + int r; + + ASSERT(n_senders <= MAX_SENDERS); + ASSERT(n_receivers <= MAX_RECEIVERS); + + uv_init(); + + n_senders_ = n_senders; + n_receivers_ = n_receivers; + + r = uv_timer_init(&timeout); + ASSERT(r == 0); + + r = uv_timer_start(&timeout, timeout_cb, TEST_DURATION, 0); + ASSERT(r == 0); + + /* Timer should not keep loop alive. */ + uv_unref(); + + for (i = 0; i < n_receivers; i++) { + struct sockaddr_in addr; + handle = &receivers[i]; + + r = uv_udp_init(handle); + ASSERT(r == 0); + + addr = uv_ip4_addr("0.0.0.0", BASE_PORT + i); + + r = uv_udp_bind(handle, addr, 0); + ASSERT(r == 0); + + r = uv_udp_recv_start(handle, alloc_cb, recv_cb); + ASSERT(r == 0); + } + + bufs[0] = uv_buf_init(EXPECTED + 0, 10); + bufs[1] = uv_buf_init(EXPECTED + 10, 10); + bufs[2] = uv_buf_init(EXPECTED + 20, 10); + bufs[3] = uv_buf_init(EXPECTED + 30, 10); + bufs[4] = uv_buf_init(EXPECTED + 40, 5); + + for (i = 0; i < n_senders; i++) { + handle = &senders[i]; + + r = uv_udp_init(handle); + ASSERT(r == 0); + + req = malloc(sizeof(*req) + sizeof(*ss)); + + ss = (void*)(req + 1); + ss->addr = uv_ip4_addr("0.0.0.0", BASE_PORT + (i % n_receivers)); + + r = uv_udp_send(req, handle, bufs, ARRAY_SIZE(bufs), ss->addr, send_cb); + ASSERT(r == 0); + + req->data = ss; + } + + uv_run(); + + printf("udp_packet_storm_%dv%d: %.0f/s received, %.0f/s sent\n", + n_receivers, + n_senders, + recv_cb_called / (TEST_DURATION / 1000.0), + send_cb_called / (TEST_DURATION / 1000.0)); + + return 0; +} + + +BENCHMARK_IMPL(udp_packet_storm_1v1) { + return do_packet_storm(1, 1); +} + + +BENCHMARK_IMPL(udp_packet_storm_1v10) { + return do_packet_storm(1, 10); +} + + +BENCHMARK_IMPL(udp_packet_storm_1v100) { + return do_packet_storm(1, 100); +} + + +BENCHMARK_IMPL(udp_packet_storm_1v1000) { + return do_packet_storm(1, 1000); +} + + +BENCHMARK_IMPL(udp_packet_storm_10v10) { + return do_packet_storm(10, 10); +} + + +BENCHMARK_IMPL(udp_packet_storm_10v100) { + return do_packet_storm(10, 100); +} + + +BENCHMARK_IMPL(udp_packet_storm_10v1000) { + return do_packet_storm(10, 1000); +} + + +BENCHMARK_IMPL(udp_packet_storm_100v100) { + return do_packet_storm(100, 100); +} + + +BENCHMARK_IMPL(udp_packet_storm_100v1000) { + return do_packet_storm(100, 1000); +} + + +BENCHMARK_IMPL(udp_packet_storm_1000v1000) { + return do_packet_storm(1000, 1000); +} diff --git a/test/dns-server.c b/test/dns-server.c index f1593528..2e4a8f39 100644 --- a/test/dns-server.c +++ b/test/dns-server.c @@ -247,7 +247,7 @@ static void on_close(uv_handle_t* peer) { } -static uv_buf_t buf_alloc(uv_stream_t* handle, size_t suggested_size) { +static uv_buf_t buf_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = (char*) malloc(suggested_size); buf.len = suggested_size; diff --git a/test/echo-server.c b/test/echo-server.c index 82be72a5..270af378 100644 --- a/test/echo-server.c +++ b/test/echo-server.c @@ -123,7 +123,7 @@ static void on_close(uv_handle_t* peer) { } -static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) { +static uv_buf_t echo_alloc(uv_handle_t* handle, size_t suggested_size) { return uv_buf_init(malloc(suggested_size), suggested_size); } diff --git a/test/test-callback-stack.c b/test/test-callback-stack.c index 6582010a..64cc4d9e 100644 --- a/test/test-callback-stack.c +++ b/test/test-callback-stack.c @@ -45,7 +45,7 @@ static int bytes_received = 0; static int shutdown_cb_called = 0; -static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { uv_buf_t buf; buf.len = size; buf.base = (char*) malloc(size); diff --git a/test/test-delayed-accept.c b/test/test-delayed-accept.c index 10f041b2..5a2704d4 100644 --- a/test/test-delayed-accept.c +++ b/test/test-delayed-accept.c @@ -30,7 +30,7 @@ static int close_cb_called = 0; static int connect_cb_called = 0; -static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { uv_buf_t buf; buf.base = (char*)malloc(size); buf.len = size; diff --git a/test/test-getsockname.c b/test/test-getsockname.c index 69dca502..c804ecc5 100644 --- a/test/test-getsockname.c +++ b/test/test-getsockname.c @@ -33,7 +33,7 @@ static uv_connect_t connect_req; static uv_tcp_t tcpServer; -static uv_buf_t alloc(uv_stream_t* handle, size_t suggested_size) { +static uv_buf_t alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = (char*) malloc(suggested_size); buf.len = suggested_size; diff --git a/test/test-list.h b/test/test-list.h index 9d1f2e9b..8a9f8c7e 100644 --- a/test/test-list.h +++ b/test/test-list.h @@ -36,6 +36,10 @@ TEST_DECLARE (tcp_bind6_error_addrnotavail) TEST_DECLARE (tcp_bind6_error_fault) TEST_DECLARE (tcp_bind6_error_inval) TEST_DECLARE (tcp_bind6_localhost_ok) +TEST_DECLARE (udp_send_and_recv) +TEST_DECLARE (udp_dgram_too_big) +TEST_DECLARE (udp_dual_stack) +TEST_DECLARE (udp_ipv6_only) TEST_DECLARE (pipe_bind_error_addrinuse) TEST_DECLARE (pipe_bind_error_addrnotavail) TEST_DECLARE (pipe_bind_error_inval) @@ -106,6 +110,11 @@ TASK_LIST_START TEST_ENTRY (tcp_bind6_error_inval) TEST_ENTRY (tcp_bind6_localhost_ok) + TEST_ENTRY (udp_send_and_recv) + TEST_ENTRY (udp_dgram_too_big) + TEST_ENTRY (udp_dual_stack) + TEST_ENTRY (udp_ipv6_only) + TEST_ENTRY (pipe_bind_error_addrinuse) TEST_ENTRY (pipe_bind_error_addrnotavail) TEST_ENTRY (pipe_bind_error_inval) diff --git a/test/test-ping-pong.c b/test/test-ping-pong.c index 26718cd3..e9af29ae 100644 --- a/test/test-ping-pong.c +++ b/test/test-ping-pong.c @@ -51,7 +51,7 @@ typedef struct { void pinger_try_read(pinger_t* pinger); -static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { uv_buf_t buf; buf.base = (char*)malloc(size); buf.len = size; diff --git a/test/test-shutdown-eof.c b/test/test-shutdown-eof.c index 67af4953..cb57ad1e 100644 --- a/test/test-shutdown-eof.c +++ b/test/test-shutdown-eof.c @@ -39,7 +39,7 @@ static int called_timer_close_cb; static int called_timer_cb; -static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { uv_buf_t buf; buf.base = (char*)malloc(size); buf.len = size; diff --git a/test/test-spawn.c b/test/test-spawn.c index c0396e18..f2d84694 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -66,7 +66,7 @@ static void kill_cb(uv_process_t* process, int exit_status, int term_signal) { } -uv_buf_t on_alloc(uv_stream_t* tcp, size_t suggested_size) { +uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) { uv_buf_t buf; buf.base = output + output_used; buf.len = OUTPUT_SIZE - output_used; diff --git a/test/test-tcp-writealot.c b/test/test-tcp-writealot.c index 73cf45b1..6cb5e099 100644 --- a/test/test-tcp-writealot.c +++ b/test/test-tcp-writealot.c @@ -45,7 +45,7 @@ static int bytes_received = 0; static int bytes_received_done = 0; -static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) { +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) { uv_buf_t buf; buf.base = (char*)malloc(size); buf.len = size; diff --git a/test/test-udp-dgram-too-big.c b/test/test-udp-dgram-too-big.c new file mode 100644 index 00000000..7de9b49a --- /dev/null +++ b/test/test-udp-dgram-too-big.c @@ -0,0 +1,88 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#define CHECK_HANDLE(handle) \ + ASSERT((uv_udp_t*)(handle) == &handle_) + +#define CHECK_REQ(req) \ + ASSERT((req) == &req_); + +static uv_udp_t handle_; +static uv_udp_send_t req_; + +static int send_cb_called; +static int close_cb_called; + + +static void close_cb(uv_handle_t* handle) { + CHECK_HANDLE(handle); + close_cb_called++; +} + + +static void send_cb(uv_udp_send_t* req, int status) { + CHECK_REQ(req); + CHECK_HANDLE(req->handle); + + ASSERT(status == -1); + ASSERT(uv_last_error().code == UV_EMSGSIZE); + + uv_close((uv_handle_t*)req->handle, close_cb); + send_cb_called++; +} + + +TEST_IMPL(udp_dgram_too_big) { + char dgram[65536]; /* 64K MTU is unlikely, even on localhost */ + struct sockaddr_in addr; + uv_buf_t buf; + int r; + + memset(dgram, 42, sizeof dgram); /* silence valgrind */ + + uv_init(); + + r = uv_udp_init(&handle_); + ASSERT(r == 0); + + buf = uv_buf_init(dgram, sizeof dgram); + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_udp_send(&req_, &handle_, &buf, 1, addr, send_cb); + ASSERT(r == 0); + + ASSERT(close_cb_called == 0); + ASSERT(send_cb_called == 0); + + uv_run(); + + ASSERT(send_cb_called == 1); + ASSERT(close_cb_called == 1); + + return 0; +} diff --git a/test/test-udp-ipv6.c b/test/test-udp-ipv6.c new file mode 100644 index 00000000..489c17cc --- /dev/null +++ b/test/test-udp-ipv6.c @@ -0,0 +1,158 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#define CHECK_HANDLE(handle) \ + ASSERT((uv_udp_t*)(handle) == &server \ + || (uv_udp_t*)(handle) == &client \ + || (uv_timer_t*)(handle) == &timeout) + +#define CHECK_REQ(req) \ + ASSERT((req) == &req_); + +static uv_udp_t client; +static uv_udp_t server; +static uv_udp_send_t req_; +static uv_timer_t timeout; + +static int send_cb_called; +static int recv_cb_called; +static int close_cb_called; + + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char slab[65536]; + CHECK_HANDLE(handle); + return uv_buf_init(slab, sizeof slab); +} + + +static void close_cb(uv_handle_t* handle) { + CHECK_HANDLE(handle); + close_cb_called++; +} + + +static void send_cb(uv_udp_send_t* req, int status) { + CHECK_REQ(req); + CHECK_HANDLE(req->handle); + ASSERT(status == 0); + send_cb_called++; +} + + +static void ipv6_recv_fail(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + ASSERT(0 && "this function should not have been called"); +} + + +static void ipv6_recv_ok(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + CHECK_HANDLE(handle); + ASSERT(nread >= 0); + + if (nread) + recv_cb_called++; +} + + +static void timeout_cb(uv_timer_t* timer, int status) { + uv_close((uv_handle_t*)&server, close_cb); + uv_close((uv_handle_t*)&client, close_cb); + uv_close((uv_handle_t*)&timeout, close_cb); +} + + +static void do_test(uv_udp_recv_cb recv_cb, int bind_flags) { + struct sockaddr_in6 addr6; + struct sockaddr_in addr; + uv_buf_t buf; + int r; + + uv_init(); + + addr6 = uv_ip6_addr("::0", TEST_PORT); + + r = uv_udp_init(&server); + ASSERT(r == 0); + + r = uv_udp_bind6(&server, addr6, bind_flags); + ASSERT(r == 0); + + r = uv_udp_recv_start(&server, alloc_cb, recv_cb); + ASSERT(r == 0); + + r = uv_udp_init(&client); + ASSERT(r == 0); + + buf = uv_buf_init("PING", 4); + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_udp_send(&req_, &client, &buf, 1, addr, send_cb); + ASSERT(r == 0); + + r = uv_timer_init(&timeout); + ASSERT(r == 0); + + r = uv_timer_start(&timeout, timeout_cb, 500, 0); + ASSERT(r == 0); + + ASSERT(close_cb_called == 0); + ASSERT(send_cb_called == 0); + ASSERT(recv_cb_called == 0); + + uv_run(); + + ASSERT(close_cb_called == 3); +} + + +TEST_IMPL(udp_dual_stack) { + do_test(ipv6_recv_ok, 0); + + ASSERT(recv_cb_called == 1); + ASSERT(send_cb_called == 1); + + return 0; +} + + +TEST_IMPL(udp_ipv6_only) { + do_test(ipv6_recv_fail, UV_UDP_IPV6ONLY); + + ASSERT(recv_cb_called == 0); + ASSERT(send_cb_called == 1); + + return 0; +} diff --git a/test/test-udp-send-and-recv.c b/test/test-udp-send-and-recv.c new file mode 100644 index 00000000..fa2b79df --- /dev/null +++ b/test/test-udp-send-and-recv.c @@ -0,0 +1,203 @@ +/* Copyright Joyent, Inc. and other Node contributors. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to + * deal in the Software without restriction, including without limitation the + * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or + * sell copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS + * IN THE SOFTWARE. + */ + +#include "uv.h" +#include "task.h" + +#include +#include +#include + +#define CHECK_HANDLE(handle) \ + ASSERT((uv_udp_t*)(handle) == &server || (uv_udp_t*)(handle) == &client) + +static uv_udp_t server; +static uv_udp_t client; + +static int cl_send_cb_called; +static int cl_recv_cb_called; + +static int sv_send_cb_called; +static int sv_recv_cb_called; + +static int close_cb_called; + + +static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) { + static char slab[65536]; + + CHECK_HANDLE(handle); + ASSERT(suggested_size <= sizeof slab); + + return uv_buf_init(slab, sizeof slab); +} + + +static void close_cb(uv_handle_t* handle) { + CHECK_HANDLE(handle); + close_cb_called++; +} + + +static void cl_recv_cb(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + CHECK_HANDLE(handle); + ASSERT(flags == 0); + + if (nread == 0) { + ASSERT(addr == NULL); + uv_close((uv_handle_t*)handle, close_cb); + } + else if (nread > 0) { + ASSERT(addr != NULL); + ASSERT(nread == 4); + ASSERT(!memcmp("PONG", buf.base, nread)); + } + else { + ASSERT(0 && "unexpected error"); + } + + cl_recv_cb_called++; +} + + +static void cl_send_cb(uv_udp_send_t* req, int status) { + int r; + + ASSERT(req != NULL); + ASSERT(status == 0); + CHECK_HANDLE(req->handle); + + r = uv_udp_recv_start(req->handle, alloc_cb, cl_recv_cb); + ASSERT(r == 0); + + cl_send_cb_called++; +} + + +static void sv_send_cb(uv_udp_send_t* req, int status) { + ASSERT(req != NULL); + ASSERT(status == 0); + CHECK_HANDLE(req->handle); + + uv_close((uv_handle_t*)req->handle, close_cb); + free(req); + + sv_send_cb_called++; +} + + +static void sv_recv_cb(uv_udp_t* handle, + ssize_t nread, + uv_buf_t buf, + struct sockaddr* addr, + unsigned flags) { + uv_udp_send_t* req; + int r; + + CHECK_HANDLE(handle); + ASSERT(flags == 0); + + if (nread > 0) { + ASSERT(addr != NULL); + ASSERT(nread == 4); + ASSERT(!memcmp("PING", buf.base, nread)); + + /* FIXME? `uv_udp_recv_stop` does what it says: recv_cb is not called + * anymore. That's problematic because the read buffer won't be returned + * either... Not sure I like that but it's consistent with `uv_read_stop`. + */ + r = uv_udp_recv_stop(handle); + ASSERT(r == 0); + + req = malloc(sizeof *req); + ASSERT(req != NULL); + + buf = uv_buf_init("PONG", 4); + + r = uv_udp_send(req, + handle, + &buf, + 1, + *(struct sockaddr_in*)addr, + sv_send_cb); + ASSERT(r == 0); + } + else if (nread == 0) { + ASSERT(addr == NULL); + } + else { + ASSERT(0 && "unexpected error"); + } + + sv_recv_cb_called++; +} + + +TEST_IMPL(udp_send_and_recv) { + struct sockaddr_in addr; + uv_udp_send_t req; + uv_buf_t buf; + int r; + + addr = uv_ip4_addr("0.0.0.0", TEST_PORT); + + uv_init(); + + r = uv_udp_init(&server); + ASSERT(r == 0); + + r = uv_udp_bind(&server, addr, 0); + ASSERT(r == 0); + + r = uv_udp_recv_start(&server, alloc_cb, sv_recv_cb); + ASSERT(r == 0); + + addr = uv_ip4_addr("127.0.0.1", TEST_PORT); + + r = uv_udp_init(&client); + ASSERT(r == 0); + + /* client sends "PING", expects "PONG" */ + buf = uv_buf_init("PING", 4); + + r = uv_udp_send(&req, &client, &buf, 1, addr, cl_send_cb); + ASSERT(r == 0); + + ASSERT(close_cb_called == 0); + ASSERT(cl_send_cb_called == 0); + ASSERT(cl_recv_cb_called == 0); + ASSERT(sv_send_cb_called == 0); + ASSERT(sv_recv_cb_called == 0); + + uv_run(); + + ASSERT(cl_send_cb_called == 1); + ASSERT(cl_recv_cb_called == 2); /* dgram + EOF == 2 */ + ASSERT(sv_send_cb_called == 1); + ASSERT(sv_recv_cb_called == 1); /* dgram, no EOF == 1 */ + ASSERT(close_cb_called == 2); + + return 0; +}