Add UDP support to libuv.

This commit is contained in:
Ben Noordhuis 2011-08-19 23:22:30 +02:00
parent 52024061e4
commit 36ce74f2ca
25 changed files with 1489 additions and 27 deletions

View File

@ -55,6 +55,16 @@ typedef struct {
#define UV_CONNECT_PRIVATE_FIELDS \
ngx_queue_t queue;
#define UV_UDP_SEND_PRIVATE_FIELDS \
ngx_queue_t queue; \
struct sockaddr_storage addr; \
socklen_t addrlen; \
uv_buf_t* bufs; \
int bufcnt; \
ssize_t status; \
uv_udp_send_cb send_cb; \
uv_buf_t bufsml[UV_REQ_BUFSML_SIZE]; \
#define UV_PRIVATE_REQ_TYPES /* empty */
@ -83,6 +93,16 @@ typedef struct {
#define UV_TCP_PRIVATE_FIELDS
/* UV_UDP */
#define UV_UDP_PRIVATE_FIELDS \
uv_alloc_cb alloc_cb; \
uv_udp_recv_cb recv_cb; \
ev_io read_watcher; \
ev_io write_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \
/* UV_NAMED_PIPE */
#define UV_PIPE_PRIVATE_TYPEDEF
#define UV_PIPE_PRIVATE_FIELDS \

View File

@ -114,6 +114,8 @@ typedef struct uv_buf_t {
struct { uv_tcp_connection_fields }; \
};
#define UV_UDP_PRIVATE_FIELDS
#define uv_pipe_server_fields \
uv_pipe_accept_t accept_reqs[4]; \
uv_pipe_accept_t* pending_accepts;

View File

@ -45,6 +45,7 @@ typedef struct uv_err_s uv_err_t;
typedef struct uv_handle_s uv_handle_t;
typedef struct uv_stream_s uv_stream_t;
typedef struct uv_tcp_s uv_tcp_t;
typedef struct uv_udp_s uv_udp_t;
typedef struct uv_pipe_s uv_pipe_t;
typedef struct uv_timer_s uv_timer_t;
typedef struct uv_prepare_s uv_prepare_t;
@ -58,6 +59,7 @@ typedef struct uv_req_s uv_req_t;
typedef struct uv_shutdown_s uv_shutdown_t;
typedef struct uv_write_s uv_write_t;
typedef struct uv_connect_s uv_connect_t;
typedef struct uv_udp_send_s uv_udp_send_t;
#if defined(__unix__) || defined(__POSIX__) || defined(__APPLE__)
# include "uv-unix.h"
@ -106,8 +108,8 @@ int64_t uv_now();
* In the case of uv_read_cb the uv_buf_t returned should be freed by the
* user.
*/
typedef uv_buf_t (*uv_alloc_cb)(uv_stream_t* tcp, size_t suggested_size);
typedef void (*uv_read_cb)(uv_stream_t* tcp, ssize_t nread, uv_buf_t buf);
typedef uv_buf_t (*uv_alloc_cb)(uv_handle_t* handle, size_t suggested_size);
typedef void (*uv_read_cb)(uv_stream_t* stream, ssize_t nread, uv_buf_t buf);
typedef void (*uv_write_cb)(uv_write_t* req, int status);
typedef void (*uv_connect_cb)(uv_connect_t* req, int status);
typedef void (*uv_shutdown_cb)(uv_shutdown_t* req, int status);
@ -146,6 +148,7 @@ typedef enum {
UV_EINVAL,
UV_EISCONN,
UV_EMFILE,
UV_EMSGSIZE,
UV_ENETDOWN,
UV_ENETUNREACH,
UV_ENFILE,
@ -172,6 +175,7 @@ typedef enum {
typedef enum {
UV_UNKNOWN_HANDLE = 0,
UV_TCP,
UV_UDP,
UV_NAMED_PIPE,
UV_TTY,
UV_FILE,
@ -194,6 +198,7 @@ typedef enum {
UV_WRITE,
UV_SHUTDOWN,
UV_WAKEUP,
UV_UDP_SEND,
UV_REQ_TYPE_PRIVATE
} uv_req_type;
@ -418,6 +423,188 @@ struct uv_connect_s {
int uv_getsockname(uv_handle_t* handle, struct sockaddr* name, int* namelen);
/*
* UDP support.
*/
/*
* Flag constants. Input flags are meant to be passed to `uv_udp_init()`
* and `uv_udp_init6()`. Output flags are passed to your `uv_udp_recv_cb`
* callback.
*/
/*
* Input flag. Disables dual stack mode. Only valid for `uv_udp_init6()`.
*/
#define UV_UDP_IPV6ONLY (1 << 0)
/*
* Output flag. The message has been truncated because
* the read buffer was too small. The remainder has been
* discarded by the operating system.
*/
#define UV_UDP_PARTIAL (1 << 10)
/*
* Function prototype for the callback that is invoked
* when the UDP datagram has been sent.
*
* Arguments:
* req Request handle.
* status Status indicator. 0 on success, -1 on error.
*/
typedef void (*uv_udp_send_cb)(uv_udp_send_t* req, int status);
/*
* Function prototype for the callback that is invoked when
* a new UDP datagram is received.
*
* Arguments:
* handle UDP handle.
* nread Number of bytes that have been received.
* 0 if there is no more data to read. You may
* discard or repurpose the read buffer.
* -1 if a transmission error was detected.
* buf uv_buf_t with the received data.
* addr struct sockaddr_in or struct sockaddr_in6.
* Valid for the duration of the callback only.
* flags One or more OR'ed UV_UDP_* constants.
* Right now only UV_UDP_PARTIAL is used.
*/
typedef void (*uv_udp_recv_cb)(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags);
/*
* Subclass of uv_handle_t
*/
struct uv_udp_s {
UV_HANDLE_FIELDS
UV_UDP_PRIVATE_FIELDS
};
/*
* Subclass of uv_req_t
*/
struct uv_udp_send_s {
UV_REQ_FIELDS
uv_udp_t* handle;
UV_UDP_SEND_PRIVATE_FIELDS
};
/*
* Initialize a new UDP handle. The actual socket is created lazily.
*
* Arguments:
* handle UDP handle to initialize.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_init(uv_udp_t* handle);
/*
* Bind to a IPv4 address and port.
*
* Arguments:
* handle UDP handle. Should have been initialized with `uv_udp_init`.
* addr struct sockaddr_in with the address and port to bind to.
* flags Bind flags. One or more OR'ed UV_UDP_* constants.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags);
/*
* Bind to a IPv6 address and port.
*
* Arguments:
* handle UDP handle. Should have been initialized with `uv_udp_init`.
* addr struct sockaddr_in with the address and port to bind to.
* flags One or more OR'ed UV_UDP_* constants.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags);
/*
* Send data. If the socket has not previously been bound with `uv_udp_bind`
* or `uv_udp_bind6`, it is bound to 0.0.0.0 (the "all interfaces" address)
* and a random port number.
*
* Arguments:
* req UDP request handle. Need not be initialized.
* handle UDP handle. Should have been initialized with `uv_udp_init`.
* bufs List of buffers to send.
* bufcnt Number of buffers in `bufs`.
* addr Address of the remote peer. See `uv_ip4_addr`.
* send_cb Callback to invoke when the data has been sent out.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_send(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr_in addr,
uv_udp_send_cb send_cb);
/*
* Send data. If the socket has not previously been bound with `uv_udp_bind6`,
* it is bound to :::0 (the "all interfaces" address) and a random port number.
*
* Arguments:
* req UDP request handle. Need not be initialized.
* handle UDP handle. Should have been initialized with `uv_udp_init`.
* bufs List of buffers to send.
* bufcnt Number of buffers in `bufs`.
* addr Address of the remote peer. See `uv_ip6_addr`.
* send_cb Callback to invoke when the data has been sent out.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_send6(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr_in6 addr,
uv_udp_send_cb send_cb);
/*
* Send data. If the socket has not previously been bound with `uv_udp_bind`
* or `uv_udp_bind6`, it is bound to 0.0.0.0 (the "all interfaces" address)
* and a random port number.
*
* Arguments:
* handle UDP handle. Should have been initialized with `uv_udp_init`.
* alloc_cb Callback to invoke when temporary storage is needed.
* recv_cb Callback to invoke with received data.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb);
/*
* Stop listening for incoming datagrams.
*
* Arguments:
* handle UDP handle. Should have been initialized with `uv_udp_init`.
*
* Returns:
* 0 on success, -1 on error.
*/
int uv_udp_recv_stop(uv_udp_t* handle);
/*
* uv_pipe_t is a subclass of uv_stream_t
*
@ -699,6 +886,7 @@ typedef struct {
uint64_t handle_init;
uint64_t stream_init;
uint64_t tcp_init;
uint64_t udp_init;
uint64_t pipe_init;
uint64_t prepare_init;
uint64_t check_init;

View File

@ -103,6 +103,28 @@ static void uv__stream_connect(uv_stream_t*);
static void uv__stream_io(EV_P_ ev_io* watcher, int revents);
static void uv__pipe_accept(EV_P_ ev_io* watcher, int revents);
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w);
static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w);
static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_destroy(uv_udp_t* handle);
static void uv__udp_recvmsg(uv_udp_t* handle);
static void uv__udp_sendmsg(uv_udp_t* handle);
static void uv__udp_io(EV_P_ ev_io* w, int events);
static int uv__udp_bind(uv_udp_t* handle,
int domain,
struct sockaddr* addr,
socklen_t len,
unsigned flags);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain);
static int uv__udp_send(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr* addr,
socklen_t addrlen,
uv_udp_send_cb send_cb);
#ifndef __GNUC__
#define __attribute__(a)
#endif
@ -173,6 +195,7 @@ static uv_err_code uv_translate_sys_error(int sys_errno) {
case ECONNRESET: return UV_ECONNRESET;
case EFAULT: return UV_EFAULT;
case EMFILE: return UV_EMFILE;
case EMSGSIZE: return UV_EMSGSIZE;
case EINVAL: return UV_EINVAL;
case ECONNREFUSED: return UV_ECONNREFUSED;
case EADDRINUSE: return UV_EADDRINUSE;
@ -201,6 +224,7 @@ static uv_err_t uv_err_new(uv_handle_t* handle, int sys_error) {
void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
uv_udp_t* udp;
uv_async_t* async;
uv_timer_t* timer;
uv_stream_t* stream;
@ -231,6 +255,14 @@ void uv_close(uv_handle_t* handle, uv_close_cb close_cb) {
assert(!ev_is_active(&stream->write_watcher));
break;
case UV_UDP:
udp = (uv_udp_t*)handle;
uv__udp_watcher_stop(udp, &udp->read_watcher);
uv__udp_watcher_stop(udp, &udp->write_watcher);
uv__close(udp->fd);
udp->fd = -1;
break;
case UV_PREPARE:
uv_prepare_stop((uv_prepare_t*) handle);
break;
@ -305,6 +337,489 @@ static void uv__handle_init(uv_handle_t* handle, uv_handle_type type) {
}
static void uv__udp_watcher_start(uv_udp_t* handle, ev_io* w) {
int flags;
assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
w->data = handle;
ev_set_cb(w, uv__udp_io);
ev_io_set(w, handle->fd, flags);
ev_io_start(EV_DEFAULT_UC_ w);
}
static void uv__udp_watcher_stop(uv_udp_t* handle, ev_io* w) {
int flags;
assert(w == &handle->read_watcher
|| w == &handle->write_watcher);
flags = (w == &handle->read_watcher ? EV_READ : EV_WRITE);
ev_io_stop(EV_DEFAULT_UC_ w);
ev_io_set(w, -1, flags);
ev_set_cb(w, NULL);
w->data = (void*)0xDEADBABE;
}
static void uv__udp_destroy(uv_udp_t* handle) {
uv_udp_send_t* req;
ngx_queue_t* q;
uv__udp_run_completed(handle);
while (!ngx_queue_empty(&handle->write_queue)) {
q = ngx_queue_head(&handle->write_queue);
ngx_queue_remove(q);
req = ngx_queue_data(q, uv_udp_send_t, queue);
if (req->send_cb) {
/* FIXME proper error code like UV_EABORTED */
uv_err_new_artificial((uv_handle_t*)handle, UV_EINTR);
req->send_cb(req, -1);
}
}
/* Now tear down the handle. */
handle->flags = 0;
handle->recv_cb = NULL;
handle->alloc_cb = NULL;
/* but _do not_ touch close_cb */
if (handle->fd != -1) {
uv__close(handle->fd);
handle->fd = -1;
}
uv__udp_watcher_stop(handle, &handle->read_watcher);
uv__udp_watcher_stop(handle, &handle->write_watcher);
}
static void uv__udp_run_pending(uv_udp_t* handle) {
uv_udp_send_t* req;
ngx_queue_t* q;
struct msghdr h;
ssize_t size;
while (!ngx_queue_empty(&handle->write_queue)) {
q = ngx_queue_head(&handle->write_queue);
assert(q != NULL);
req = ngx_queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
h.msg_namelen = req->addrlen;
h.msg_iov = (struct iovec*)req->bufs;
h.msg_iovlen = req->bufcnt;
do {
size = sendmsg(handle->fd, &h, 0);
}
while (size == -1 && errno == EINTR);
/* TODO try to write once or twice more in the
* hope that the socket becomes readable again?
*/
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
break;
req->status = (size == -1 ? -errno : size);
#ifndef NDEBUG
/* Sanity check. */
if (size != -1) {
ssize_t nbytes;
int i;
for (nbytes = i = 0; i < req->bufcnt; i++)
nbytes += req->bufs[i].len;
assert(size == nbytes);
}
#endif
/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
ngx_queue_remove(&req->queue);
ngx_queue_insert_tail(&handle->write_completed_queue, &req->queue);
}
}
static void uv__udp_run_completed(uv_udp_t* handle) {
uv_udp_send_t* req;
ngx_queue_t* q;
while (!ngx_queue_empty(&handle->write_completed_queue)) {
q = ngx_queue_head(&handle->write_completed_queue);
assert(q != NULL);
ngx_queue_remove(q);
req = ngx_queue_data(q, uv_udp_send_t, queue);
assert(req != NULL);
if (req->bufs != req->bufsml)
free(req->bufs);
if (req->send_cb == NULL)
continue;
/* req->status >= 0 == bytes written
* req->status < 0 == errno
*/
if (req->status >= 0) {
req->send_cb(req, 0);
}
else {
uv_err_new((uv_handle_t*)handle, -req->status);
req->send_cb(req, -1);
}
}
}
static void uv__udp_recvmsg(uv_udp_t* handle) {
struct sockaddr_storage peer;
struct msghdr h;
ssize_t nread;
uv_buf_t buf;
int flags;
assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);
do {
/* FIXME: hoist alloc_cb out the loop but for now follow uv__read() */
buf = handle->alloc_cb((uv_handle_t*)handle, 64 * 1024);
assert(buf.len > 0);
assert(buf.base != NULL);
memset(&h, 0, sizeof h);
h.msg_name = &peer;
h.msg_namelen = sizeof peer;
h.msg_iov = (struct iovec*)&buf;
h.msg_iovlen = 1;
do {
nread = recvmsg(handle->fd, &h, 0);
}
while (nread == -1 && errno == EINTR);
if (nread == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
uv_err_new((uv_handle_t*)handle, EAGAIN);
handle->recv_cb(handle, 0, buf, NULL, 0);
}
else {
uv_err_new((uv_handle_t*)handle, errno);
handle->recv_cb(handle, -1, buf, NULL, 0);
}
}
else {
flags = 0;
if (h.msg_flags & MSG_TRUNC)
flags |= UV_UDP_PARTIAL;
handle->recv_cb(handle,
nread,
buf,
(struct sockaddr*)&peer,
flags);
}
}
/* recv_cb callback may decide to pause or close the handle */
while (nread != -1
&& handle->fd != -1
&& handle->recv_cb != NULL);
}
static void uv__udp_sendmsg(uv_udp_t* handle) {
assert(!ngx_queue_empty(&handle->write_queue)
|| !ngx_queue_empty(&handle->write_completed_queue));
/* Write out pending data first. */
uv__udp_run_pending(handle);
/* Drain 'request completed' queue. */
uv__udp_run_completed(handle);
if (!ngx_queue_empty(&handle->write_completed_queue)) {
/* Schedule completion callbacks. */
ev_feed_event(EV_DEFAULT_ &handle->write_watcher, EV_WRITE);
}
else if (ngx_queue_empty(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__udp_watcher_stop(handle, &handle->write_watcher);
}
}
static void uv__udp_io(EV_P_ ev_io* w, int events) {
uv_udp_t* handle;
handle = w->data;
assert(handle != NULL);
assert(handle->type == UV_UDP);
assert(handle->fd >= 0);
assert(!(events & ~(EV_READ|EV_WRITE)));
if (events & EV_READ)
uv__udp_recvmsg(handle);
if (events & EV_WRITE)
uv__udp_sendmsg(handle);
}
static int uv__udp_bind(uv_udp_t* handle,
int domain,
struct sockaddr* addr,
socklen_t len,
unsigned flags) {
int saved_errno;
int status;
int yes;
int fd;
saved_errno = errno;
status = -1;
/* Check for bad flags. */
if (flags & ~UV_UDP_IPV6ONLY) {
uv_err_new((uv_handle_t*)handle, EINVAL);
goto out;
}
/* Cannot set IPv6-only mode on non-IPv6 socket. */
if ((flags & UV_UDP_IPV6ONLY) && domain != AF_INET6) {
uv_err_new((uv_handle_t*)handle, EINVAL);
goto out;
}
/* Check for already active socket. */
if (handle->fd != -1) {
uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY);
goto out;
}
if ((fd = uv__socket(domain, SOCK_DGRAM, 0)) == -1) {
uv_err_new((uv_handle_t*)handle, errno);
goto out;
}
if (flags & UV_UDP_IPV6ONLY) {
#ifdef IPV6_V6ONLY
yes = 1;
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof yes) == -1) {
uv_err_new((uv_handle_t*)handle, errno);
goto out;
}
#else
uv_err_new((uv_handle_t*)handle, ENOTSUP);
goto out;
#endif
}
if (bind(fd, addr, len) == -1) {
uv_err_new((uv_handle_t*)handle, errno);
goto out;
}
handle->fd = fd;
status = 0;
out:
if (status)
uv__close(fd);
errno = saved_errno;
return status;
}
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle, int domain) {
struct sockaddr_storage taddr;
socklen_t addrlen;
assert(domain == AF_INET || domain == AF_INET6);
if (handle->fd != -1)
return 0;
switch (domain) {
case AF_INET:
{
struct sockaddr_in* addr = (void*)&taddr;
memset(addr, 0, sizeof *addr);
addr->sin_family = AF_INET;
addr->sin_addr.s_addr = INADDR_ANY;
addrlen = sizeof *addr;
break;
}
case AF_INET6:
{
struct sockaddr_in6* addr = (void*)&taddr;
memset(addr, 0, sizeof *addr);
addr->sin6_family = AF_INET6;
addr->sin6_addr = in6addr_any;
addrlen = sizeof *addr;
break;
}
default:
assert(0 && "unsupported address family");
abort();
}
return uv__udp_bind(handle, domain, (struct sockaddr*)&taddr, addrlen, 0);
}
static int uv__udp_send(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr* addr,
socklen_t addrlen,
uv_udp_send_cb send_cb) {
if (uv__udp_maybe_deferred_bind(handle, addr->sa_family))
return -1;
/* Don't use uv__req_init(), it zeroes the data field. */
uv_counters()->req_init++;
memcpy(&req->addr, addr, addrlen);
req->addrlen = addrlen;
req->send_cb = send_cb;
req->handle = handle;
req->bufcnt = bufcnt;
req->type = UV_UDP_SEND;
if (bufcnt <= UV_REQ_BUFSML_SIZE) {
req->bufs = req->bufsml;
}
else if ((req->bufs = malloc(bufcnt * sizeof(bufs[0]))) == NULL) {
uv_err_new((uv_handle_t*)handle, ENOMEM);
return -1;
}
memcpy(req->bufs, bufs, bufcnt * sizeof(bufs[0]));
ngx_queue_insert_tail(&handle->write_queue, &req->queue);
uv__udp_watcher_start(handle, &handle->write_watcher);
return 0;
}
int uv_udp_init(uv_udp_t* handle) {
memset(handle, 0, sizeof *handle);
uv__handle_init((uv_handle_t*)handle, UV_UDP);
uv_counters()->udp_init++;
handle->fd = -1;
ngx_queue_init(&handle->write_queue);
ngx_queue_init(&handle->write_completed_queue);
return 0;
}
int uv_udp_bind(uv_udp_t* handle, struct sockaddr_in addr, unsigned flags) {
return uv__udp_bind(handle,
AF_INET,
(struct sockaddr*)&addr,
sizeof addr,
flags);
}
int uv_udp_bind6(uv_udp_t* handle, struct sockaddr_in6 addr, unsigned flags) {
return uv__udp_bind(handle,
AF_INET6,
(struct sockaddr*)&addr,
sizeof addr,
flags);
}
int uv_udp_send(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr_in addr,
uv_udp_send_cb send_cb) {
return uv__udp_send(req,
handle,
bufs,
bufcnt,
(struct sockaddr*)&addr,
sizeof addr,
send_cb);
}
int uv_udp_send6(uv_udp_send_t* req,
uv_udp_t* handle,
uv_buf_t bufs[],
int bufcnt,
struct sockaddr_in6 addr,
uv_udp_send_cb send_cb) {
return uv__udp_send(req,
handle,
bufs,
bufcnt,
(struct sockaddr*)&addr,
sizeof addr,
send_cb);
}
int uv_udp_recv_start(uv_udp_t* handle,
uv_alloc_cb alloc_cb,
uv_udp_recv_cb recv_cb) {
if (alloc_cb == NULL || recv_cb == NULL) {
uv_err_new_artificial((uv_handle_t*)handle, UV_EINVAL);
return -1;
}
if (ev_is_active(&handle->read_watcher)) {
uv_err_new_artificial((uv_handle_t*)handle, UV_EALREADY);
return -1;
}
if (uv__udp_maybe_deferred_bind(handle, AF_INET))
return -1;
handle->alloc_cb = alloc_cb;
handle->recv_cb = recv_cb;
uv__udp_watcher_start(handle, &handle->read_watcher);
return 0;
}
int uv_udp_recv_stop(uv_udp_t* handle) {
uv__udp_watcher_stop(handle, &handle->read_watcher);
handle->alloc_cb = NULL;
handle->recv_cb = NULL;
return 0;
}
int uv_tcp_init(uv_tcp_t* tcp) {
uv__handle_init((uv_handle_t*)tcp, UV_TCP);
uv_counters()->tcp_init++;
@ -332,8 +847,10 @@ int uv_tcp_init(uv_tcp_t* tcp) {
}
static int uv__bind(uv_tcp_t* tcp, int domain, struct sockaddr* addr,
int addrsize) {
static int uv__tcp_bind(uv_tcp_t* tcp,
int domain,
struct sockaddr* addr,
int addrsize) {
int saved_errno;
int status;
@ -379,8 +896,10 @@ int uv_tcp_bind(uv_tcp_t* tcp, struct sockaddr_in addr) {
return -1;
}
return uv__bind(tcp, AF_INET, (struct sockaddr*)&addr,
sizeof(struct sockaddr_in));
return uv__tcp_bind(tcp,
AF_INET,
(struct sockaddr*)&addr,
sizeof(struct sockaddr_in));
}
@ -390,8 +909,10 @@ int uv_tcp_bind6(uv_tcp_t* tcp, struct sockaddr_in6 addr) {
return -1;
}
return uv__bind(tcp, AF_INET6, (struct sockaddr*)&addr,
sizeof(struct sockaddr_in6));
return uv__tcp_bind(tcp,
AF_INET6,
(struct sockaddr*)&addr,
sizeof(struct sockaddr_in6));
}
@ -590,6 +1111,13 @@ void uv__finish_close(uv_handle_t* handle) {
assert(!ev_is_active(&((uv_stream_t*)handle)->write_watcher));
break;
case UV_UDP:
assert(!ev_is_active(&((uv_udp_t*)handle)->read_watcher));
assert(!ev_is_active(&((uv_udp_t*)handle)->write_watcher));
assert(((uv_udp_t*)handle)->fd == -1);
uv__udp_destroy((uv_udp_t*)handle);
break;
case UV_PROCESS:
assert(!ev_is_active(&((uv_process_t*)handle)->child_watcher));
break;
@ -823,7 +1351,7 @@ static void uv__read(uv_stream_t* stream) {
*/
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
assert(stream->alloc_cb);
buf = stream->alloc_cb(stream, 64 * 1024);
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);
assert(buf.len > 0);
assert(buf.base);

View File

@ -773,7 +773,7 @@ void uv_process_pipe_read_req(uv_pipe_t* handle, uv_req_t* req) {
break;
}
buf = handle->alloc_cb((uv_stream_t*)handle, avail);
buf = handle->alloc_cb((uv_handle_t*) handle, avail);
assert(buf.len > 0);
if (ReadFile(handle->handle,

View File

@ -291,7 +291,7 @@ static void uv_tcp_queue_read(uv_tcp_t* handle) {
*/
if (active_tcp_streams < uv_active_tcp_streams_threshold) {
handle->flags &= ~UV_HANDLE_ZERO_READ;
handle->read_buffer = handle->alloc_cb((uv_stream_t*)handle, 65536);
handle->read_buffer = handle->alloc_cb((uv_handle_t*) handle, 65536);
assert(handle->read_buffer.len > 0);
buf = handle->read_buffer;
} else {
@ -659,7 +659,7 @@ void uv_process_tcp_read_req(uv_tcp_t* handle, uv_req_t* req) {
/* Do nonblocking reads until the buffer is empty */
while (handle->flags & UV_HANDLE_READING) {
buf = handle->alloc_cb((uv_stream_t*)handle, 65536);
buf = handle->alloc_cb((uv_handle_t*) handle, 65536);
assert(buf.len > 0);
flags = 0;
if (WSARecv(handle->socket,

View File

@ -29,6 +29,16 @@ BENCHMARK_DECLARE (tcp_pump100_client)
BENCHMARK_DECLARE (tcp_pump1_client)
BENCHMARK_DECLARE (pipe_pump100_client)
BENCHMARK_DECLARE (pipe_pump1_client)
BENCHMARK_DECLARE (udp_packet_storm_1v1)
BENCHMARK_DECLARE (udp_packet_storm_1v10)
BENCHMARK_DECLARE (udp_packet_storm_1v100)
BENCHMARK_DECLARE (udp_packet_storm_1v1000)
BENCHMARK_DECLARE (udp_packet_storm_10v10)
BENCHMARK_DECLARE (udp_packet_storm_10v100)
BENCHMARK_DECLARE (udp_packet_storm_10v1000)
BENCHMARK_DECLARE (udp_packet_storm_100v100)
BENCHMARK_DECLARE (udp_packet_storm_100v1000)
BENCHMARK_DECLARE (udp_packet_storm_1000v1000)
BENCHMARK_DECLARE (gethostbyname)
BENCHMARK_DECLARE (getaddrinfo)
BENCHMARK_DECLARE (spawn)
@ -68,6 +78,17 @@ TASK_LIST_START
BENCHMARK_ENTRY (pipe_pound_1000)
BENCHMARK_HELPER (pipe_pound_1000, pipe_echo_server)
BENCHMARK_ENTRY (udp_packet_storm_1v1)
BENCHMARK_ENTRY (udp_packet_storm_1v10)
BENCHMARK_ENTRY (udp_packet_storm_1v100)
BENCHMARK_ENTRY (udp_packet_storm_1v1000)
BENCHMARK_ENTRY (udp_packet_storm_10v10)
BENCHMARK_ENTRY (udp_packet_storm_10v100)
BENCHMARK_ENTRY (udp_packet_storm_10v1000)
BENCHMARK_ENTRY (udp_packet_storm_100v100)
BENCHMARK_ENTRY (udp_packet_storm_100v1000)
BENCHMARK_ENTRY (udp_packet_storm_1000v1000)
BENCHMARK_ENTRY (gethostbyname)
BENCHMARK_HELPER (gethostbyname, dns_server)

View File

@ -52,7 +52,7 @@ static int completed_pingers = 0;
static int64_t start_time;
static uv_buf_t buf_alloc(uv_stream_t* tcp, size_t size) {
static uv_buf_t buf_alloc(uv_handle_t* tcp, size_t size) {
buf_t* ab;
ab = buf_freelist;

View File

@ -72,13 +72,13 @@ static uint64_t start; /* in ms */
static int closed_streams;
static int conns_failed;
static uv_buf_t alloc_cb(uv_stream_t* stream, size_t suggested_size);
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size);
static void connect_cb(uv_connect_t* conn_req, int status);
static void read_cb(uv_stream_t* stream, ssize_t nread, uv_buf_t buf);
static void close_cb(uv_handle_t* handle);
static uv_buf_t alloc_cb(uv_stream_t* stream, size_t suggested_size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) {
static char slab[65536];
uv_buf_t buf;
buf.base = slab;

View File

@ -41,7 +41,7 @@ static void maybe_connect_some();
static uv_req_t* req_alloc();
static void req_free(uv_req_t* uv_req);
static uv_buf_t buf_alloc(uv_stream_t*, size_t size);
static uv_buf_t buf_alloc(uv_handle_t*, size_t size);
static void buf_free(uv_buf_t uv_buf_t);
@ -337,7 +337,7 @@ typedef struct buf_list_s {
static buf_list_t* buf_freelist = NULL;
static uv_buf_t buf_alloc(uv_stream_t* stream, size_t size) {
static uv_buf_t buf_alloc(uv_handle_t* handle, size_t size) {
buf_list_t* buf;
buf = buf_freelist;

View File

@ -69,7 +69,7 @@ static void exit_cb(uv_process_t* process, int exit_status, int term_signal) {
}
uv_buf_t on_alloc(uv_stream_t* tcp, size_t suggested_size) {
uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = output + output_used;
buf.len = OUTPUT_SIZE - output_used;

View File

@ -0,0 +1,245 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "task.h"
#include "uv.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define EXPECTED "RANG TANG DING DONG I AM THE JAPANESE SANDMAN" /* "Take eight!" */
#define TEST_DURATION 5000 /* ms */
#define MAX_SENDERS 1000
#define MAX_RECEIVERS 1000
#define BASE_PORT 12345
#define ARRAY_SIZE(a) (sizeof((a)) / sizeof((a)[0]))
static int n_senders_;
static int n_receivers_;
static uv_udp_t senders[MAX_SENDERS];
static uv_udp_t receivers[MAX_RECEIVERS];
static uv_buf_t bufs[5];
static int send_cb_called;
static int recv_cb_called;
static int close_cb_called;
typedef struct {
struct sockaddr_in addr;
} sender_state_t;
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) {
static char slab[65536];
ASSERT(suggested_size <= sizeof slab);
return uv_buf_init(slab, sizeof slab);
}
static void send_cb(uv_udp_send_t* req, int status) {
sender_state_t* ss;
int r;
if (status == -1) {
ASSERT(uv_last_error().code == UV_EINTR); /* FIXME change error code */
return;
}
ASSERT(req != NULL);
ASSERT(status == 0);
ss = req->data;
r = uv_udp_send(req, req->handle, bufs, ARRAY_SIZE(bufs), ss->addr, send_cb);
ASSERT(r == 0);
req->data = ss;
send_cb_called++;
}
static void recv_cb(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
if (nread == 0)
return;
if (nread == -1) {
ASSERT(uv_last_error().code == UV_EINTR); /* FIXME change error code */
return;
}
ASSERT(addr->sa_family == AF_INET);
ASSERT(!memcmp(buf.base, EXPECTED, nread));
recv_cb_called++;
}
static void close_cb(uv_handle_t* handle) {
ASSERT(handle != NULL);
close_cb_called++;
}
static void timeout_cb(uv_timer_t* timer, int status) {
int i;
for (i = 0; i < n_senders_; i++)
uv_close((uv_handle_t*)&senders[i], close_cb);
for (i = 0; i < n_receivers_; i++)
uv_close((uv_handle_t*)&receivers[i], close_cb);
}
static int do_packet_storm(int n_senders, int n_receivers) {
uv_timer_t timeout;
sender_state_t *ss;
uv_udp_send_t* req;
uv_udp_t* handle;
int i;
int r;
ASSERT(n_senders <= MAX_SENDERS);
ASSERT(n_receivers <= MAX_RECEIVERS);
uv_init();
n_senders_ = n_senders;
n_receivers_ = n_receivers;
r = uv_timer_init(&timeout);
ASSERT(r == 0);
r = uv_timer_start(&timeout, timeout_cb, TEST_DURATION, 0);
ASSERT(r == 0);
/* Timer should not keep loop alive. */
uv_unref();
for (i = 0; i < n_receivers; i++) {
struct sockaddr_in addr;
handle = &receivers[i];
r = uv_udp_init(handle);
ASSERT(r == 0);
addr = uv_ip4_addr("0.0.0.0", BASE_PORT + i);
r = uv_udp_bind(handle, addr, 0);
ASSERT(r == 0);
r = uv_udp_recv_start(handle, alloc_cb, recv_cb);
ASSERT(r == 0);
}
bufs[0] = uv_buf_init(EXPECTED + 0, 10);
bufs[1] = uv_buf_init(EXPECTED + 10, 10);
bufs[2] = uv_buf_init(EXPECTED + 20, 10);
bufs[3] = uv_buf_init(EXPECTED + 30, 10);
bufs[4] = uv_buf_init(EXPECTED + 40, 5);
for (i = 0; i < n_senders; i++) {
handle = &senders[i];
r = uv_udp_init(handle);
ASSERT(r == 0);
req = malloc(sizeof(*req) + sizeof(*ss));
ss = (void*)(req + 1);
ss->addr = uv_ip4_addr("0.0.0.0", BASE_PORT + (i % n_receivers));
r = uv_udp_send(req, handle, bufs, ARRAY_SIZE(bufs), ss->addr, send_cb);
ASSERT(r == 0);
req->data = ss;
}
uv_run();
printf("udp_packet_storm_%dv%d: %.0f/s received, %.0f/s sent\n",
n_receivers,
n_senders,
recv_cb_called / (TEST_DURATION / 1000.0),
send_cb_called / (TEST_DURATION / 1000.0));
return 0;
}
BENCHMARK_IMPL(udp_packet_storm_1v1) {
return do_packet_storm(1, 1);
}
BENCHMARK_IMPL(udp_packet_storm_1v10) {
return do_packet_storm(1, 10);
}
BENCHMARK_IMPL(udp_packet_storm_1v100) {
return do_packet_storm(1, 100);
}
BENCHMARK_IMPL(udp_packet_storm_1v1000) {
return do_packet_storm(1, 1000);
}
BENCHMARK_IMPL(udp_packet_storm_10v10) {
return do_packet_storm(10, 10);
}
BENCHMARK_IMPL(udp_packet_storm_10v100) {
return do_packet_storm(10, 100);
}
BENCHMARK_IMPL(udp_packet_storm_10v1000) {
return do_packet_storm(10, 1000);
}
BENCHMARK_IMPL(udp_packet_storm_100v100) {
return do_packet_storm(100, 100);
}
BENCHMARK_IMPL(udp_packet_storm_100v1000) {
return do_packet_storm(100, 1000);
}
BENCHMARK_IMPL(udp_packet_storm_1000v1000) {
return do_packet_storm(1000, 1000);
}

View File

@ -247,7 +247,7 @@ static void on_close(uv_handle_t* peer) {
}
static uv_buf_t buf_alloc(uv_stream_t* handle, size_t suggested_size) {
static uv_buf_t buf_alloc(uv_handle_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*) malloc(suggested_size);
buf.len = suggested_size;

View File

@ -123,7 +123,7 @@ static void on_close(uv_handle_t* peer) {
}
static uv_buf_t echo_alloc(uv_stream_t* handle, size_t suggested_size) {
static uv_buf_t echo_alloc(uv_handle_t* handle, size_t suggested_size) {
return uv_buf_init(malloc(suggested_size), suggested_size);
}

View File

@ -45,7 +45,7 @@ static int bytes_received = 0;
static int shutdown_cb_called = 0;
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.len = size;
buf.base = (char*) malloc(size);

View File

@ -30,7 +30,7 @@ static int close_cb_called = 0;
static int connect_cb_called = 0;
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;

View File

@ -33,7 +33,7 @@ static uv_connect_t connect_req;
static uv_tcp_t tcpServer;
static uv_buf_t alloc(uv_stream_t* handle, size_t suggested_size) {
static uv_buf_t alloc(uv_handle_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = (char*) malloc(suggested_size);
buf.len = suggested_size;

View File

@ -36,6 +36,10 @@ TEST_DECLARE (tcp_bind6_error_addrnotavail)
TEST_DECLARE (tcp_bind6_error_fault)
TEST_DECLARE (tcp_bind6_error_inval)
TEST_DECLARE (tcp_bind6_localhost_ok)
TEST_DECLARE (udp_send_and_recv)
TEST_DECLARE (udp_dgram_too_big)
TEST_DECLARE (udp_dual_stack)
TEST_DECLARE (udp_ipv6_only)
TEST_DECLARE (pipe_bind_error_addrinuse)
TEST_DECLARE (pipe_bind_error_addrnotavail)
TEST_DECLARE (pipe_bind_error_inval)
@ -106,6 +110,11 @@ TASK_LIST_START
TEST_ENTRY (tcp_bind6_error_inval)
TEST_ENTRY (tcp_bind6_localhost_ok)
TEST_ENTRY (udp_send_and_recv)
TEST_ENTRY (udp_dgram_too_big)
TEST_ENTRY (udp_dual_stack)
TEST_ENTRY (udp_ipv6_only)
TEST_ENTRY (pipe_bind_error_addrinuse)
TEST_ENTRY (pipe_bind_error_addrnotavail)
TEST_ENTRY (pipe_bind_error_inval)

View File

@ -51,7 +51,7 @@ typedef struct {
void pinger_try_read(pinger_t* pinger);
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;

View File

@ -39,7 +39,7 @@ static int called_timer_close_cb;
static int called_timer_cb;
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;

View File

@ -66,7 +66,7 @@ static void kill_cb(uv_process_t* process, int exit_status, int term_signal) {
}
uv_buf_t on_alloc(uv_stream_t* tcp, size_t suggested_size) {
uv_buf_t on_alloc(uv_handle_t* handle, size_t suggested_size) {
uv_buf_t buf;
buf.base = output + output_used;
buf.len = OUTPUT_SIZE - output_used;

View File

@ -45,7 +45,7 @@ static int bytes_received = 0;
static int bytes_received_done = 0;
static uv_buf_t alloc_cb(uv_stream_t* tcp, size_t size) {
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t size) {
uv_buf_t buf;
buf.base = (char*)malloc(size);
buf.len = size;

View File

@ -0,0 +1,88 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CHECK_HANDLE(handle) \
ASSERT((uv_udp_t*)(handle) == &handle_)
#define CHECK_REQ(req) \
ASSERT((req) == &req_);
static uv_udp_t handle_;
static uv_udp_send_t req_;
static int send_cb_called;
static int close_cb_called;
static void close_cb(uv_handle_t* handle) {
CHECK_HANDLE(handle);
close_cb_called++;
}
static void send_cb(uv_udp_send_t* req, int status) {
CHECK_REQ(req);
CHECK_HANDLE(req->handle);
ASSERT(status == -1);
ASSERT(uv_last_error().code == UV_EMSGSIZE);
uv_close((uv_handle_t*)req->handle, close_cb);
send_cb_called++;
}
TEST_IMPL(udp_dgram_too_big) {
char dgram[65536]; /* 64K MTU is unlikely, even on localhost */
struct sockaddr_in addr;
uv_buf_t buf;
int r;
memset(dgram, 42, sizeof dgram); /* silence valgrind */
uv_init();
r = uv_udp_init(&handle_);
ASSERT(r == 0);
buf = uv_buf_init(dgram, sizeof dgram);
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
r = uv_udp_send(&req_, &handle_, &buf, 1, addr, send_cb);
ASSERT(r == 0);
ASSERT(close_cb_called == 0);
ASSERT(send_cb_called == 0);
uv_run();
ASSERT(send_cb_called == 1);
ASSERT(close_cb_called == 1);
return 0;
}

158
test/test-udp-ipv6.c Normal file
View File

@ -0,0 +1,158 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CHECK_HANDLE(handle) \
ASSERT((uv_udp_t*)(handle) == &server \
|| (uv_udp_t*)(handle) == &client \
|| (uv_timer_t*)(handle) == &timeout)
#define CHECK_REQ(req) \
ASSERT((req) == &req_);
static uv_udp_t client;
static uv_udp_t server;
static uv_udp_send_t req_;
static uv_timer_t timeout;
static int send_cb_called;
static int recv_cb_called;
static int close_cb_called;
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) {
static char slab[65536];
CHECK_HANDLE(handle);
return uv_buf_init(slab, sizeof slab);
}
static void close_cb(uv_handle_t* handle) {
CHECK_HANDLE(handle);
close_cb_called++;
}
static void send_cb(uv_udp_send_t* req, int status) {
CHECK_REQ(req);
CHECK_HANDLE(req->handle);
ASSERT(status == 0);
send_cb_called++;
}
static void ipv6_recv_fail(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
ASSERT(0 && "this function should not have been called");
}
static void ipv6_recv_ok(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
CHECK_HANDLE(handle);
ASSERT(nread >= 0);
if (nread)
recv_cb_called++;
}
static void timeout_cb(uv_timer_t* timer, int status) {
uv_close((uv_handle_t*)&server, close_cb);
uv_close((uv_handle_t*)&client, close_cb);
uv_close((uv_handle_t*)&timeout, close_cb);
}
static void do_test(uv_udp_recv_cb recv_cb, int bind_flags) {
struct sockaddr_in6 addr6;
struct sockaddr_in addr;
uv_buf_t buf;
int r;
uv_init();
addr6 = uv_ip6_addr("::0", TEST_PORT);
r = uv_udp_init(&server);
ASSERT(r == 0);
r = uv_udp_bind6(&server, addr6, bind_flags);
ASSERT(r == 0);
r = uv_udp_recv_start(&server, alloc_cb, recv_cb);
ASSERT(r == 0);
r = uv_udp_init(&client);
ASSERT(r == 0);
buf = uv_buf_init("PING", 4);
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
r = uv_udp_send(&req_, &client, &buf, 1, addr, send_cb);
ASSERT(r == 0);
r = uv_timer_init(&timeout);
ASSERT(r == 0);
r = uv_timer_start(&timeout, timeout_cb, 500, 0);
ASSERT(r == 0);
ASSERT(close_cb_called == 0);
ASSERT(send_cb_called == 0);
ASSERT(recv_cb_called == 0);
uv_run();
ASSERT(close_cb_called == 3);
}
TEST_IMPL(udp_dual_stack) {
do_test(ipv6_recv_ok, 0);
ASSERT(recv_cb_called == 1);
ASSERT(send_cb_called == 1);
return 0;
}
TEST_IMPL(udp_ipv6_only) {
do_test(ipv6_recv_fail, UV_UDP_IPV6ONLY);
ASSERT(recv_cb_called == 0);
ASSERT(send_cb_called == 1);
return 0;
}

View File

@ -0,0 +1,203 @@
/* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/
#include "uv.h"
#include "task.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#define CHECK_HANDLE(handle) \
ASSERT((uv_udp_t*)(handle) == &server || (uv_udp_t*)(handle) == &client)
static uv_udp_t server;
static uv_udp_t client;
static int cl_send_cb_called;
static int cl_recv_cb_called;
static int sv_send_cb_called;
static int sv_recv_cb_called;
static int close_cb_called;
static uv_buf_t alloc_cb(uv_handle_t* handle, size_t suggested_size) {
static char slab[65536];
CHECK_HANDLE(handle);
ASSERT(suggested_size <= sizeof slab);
return uv_buf_init(slab, sizeof slab);
}
static void close_cb(uv_handle_t* handle) {
CHECK_HANDLE(handle);
close_cb_called++;
}
static void cl_recv_cb(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
CHECK_HANDLE(handle);
ASSERT(flags == 0);
if (nread == 0) {
ASSERT(addr == NULL);
uv_close((uv_handle_t*)handle, close_cb);
}
else if (nread > 0) {
ASSERT(addr != NULL);
ASSERT(nread == 4);
ASSERT(!memcmp("PONG", buf.base, nread));
}
else {
ASSERT(0 && "unexpected error");
}
cl_recv_cb_called++;
}
static void cl_send_cb(uv_udp_send_t* req, int status) {
int r;
ASSERT(req != NULL);
ASSERT(status == 0);
CHECK_HANDLE(req->handle);
r = uv_udp_recv_start(req->handle, alloc_cb, cl_recv_cb);
ASSERT(r == 0);
cl_send_cb_called++;
}
static void sv_send_cb(uv_udp_send_t* req, int status) {
ASSERT(req != NULL);
ASSERT(status == 0);
CHECK_HANDLE(req->handle);
uv_close((uv_handle_t*)req->handle, close_cb);
free(req);
sv_send_cb_called++;
}
static void sv_recv_cb(uv_udp_t* handle,
ssize_t nread,
uv_buf_t buf,
struct sockaddr* addr,
unsigned flags) {
uv_udp_send_t* req;
int r;
CHECK_HANDLE(handle);
ASSERT(flags == 0);
if (nread > 0) {
ASSERT(addr != NULL);
ASSERT(nread == 4);
ASSERT(!memcmp("PING", buf.base, nread));
/* FIXME? `uv_udp_recv_stop` does what it says: recv_cb is not called
* anymore. That's problematic because the read buffer won't be returned
* either... Not sure I like that but it's consistent with `uv_read_stop`.
*/
r = uv_udp_recv_stop(handle);
ASSERT(r == 0);
req = malloc(sizeof *req);
ASSERT(req != NULL);
buf = uv_buf_init("PONG", 4);
r = uv_udp_send(req,
handle,
&buf,
1,
*(struct sockaddr_in*)addr,
sv_send_cb);
ASSERT(r == 0);
}
else if (nread == 0) {
ASSERT(addr == NULL);
}
else {
ASSERT(0 && "unexpected error");
}
sv_recv_cb_called++;
}
TEST_IMPL(udp_send_and_recv) {
struct sockaddr_in addr;
uv_udp_send_t req;
uv_buf_t buf;
int r;
addr = uv_ip4_addr("0.0.0.0", TEST_PORT);
uv_init();
r = uv_udp_init(&server);
ASSERT(r == 0);
r = uv_udp_bind(&server, addr, 0);
ASSERT(r == 0);
r = uv_udp_recv_start(&server, alloc_cb, sv_recv_cb);
ASSERT(r == 0);
addr = uv_ip4_addr("127.0.0.1", TEST_PORT);
r = uv_udp_init(&client);
ASSERT(r == 0);
/* client sends "PING", expects "PONG" */
buf = uv_buf_init("PING", 4);
r = uv_udp_send(&req, &client, &buf, 1, addr, cl_send_cb);
ASSERT(r == 0);
ASSERT(close_cb_called == 0);
ASSERT(cl_send_cb_called == 0);
ASSERT(cl_recv_cb_called == 0);
ASSERT(sv_send_cb_called == 0);
ASSERT(sv_recv_cb_called == 0);
uv_run();
ASSERT(cl_send_cb_called == 1);
ASSERT(cl_recv_cb_called == 2); /* dgram + EOF == 2 */
ASSERT(sv_send_cb_called == 1);
ASSERT(sv_recv_cb_called == 1); /* dgram, no EOF == 1 */
ASSERT(close_cb_called == 2);
return 0;
}