zos: implement uv_fs_event* functions

This commit uses the Register File Interest feature on z/OS
to enable users to monitor file system events.
The poll call is used to check for file descriptors as well
as a message queue that z/OS will report file system events
on. The last item on the list used by poll will contain the
message queue id instead of a file descriptor.

Limitation:
Writes to a directory (that is, file creation and deletion)
do not generate a change message for a registered directory.

PR-URL: https://github.com/libuv/libuv/pull/1311
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
This commit is contained in:
John Barboza 2017-04-10 08:55:37 -04:00 committed by Santiago Gimeno
parent e771ede069
commit b01de7341f
No known key found for this signature in database
GPG Key ID: F28C3C8DA33C03BE
11 changed files with 212 additions and 14 deletions

View File

@ -458,7 +458,6 @@ libuv_la_CFLAGS += -D_UNIX03_THREADS \
-qFLOAT=IEEE
libuv_la_LDFLAGS += -qXPLINK
libuv_la_SOURCES += src/unix/pthread-fixes.c \
src/unix/no-fsevents.c \
src/unix/os390.c \
src/unix/os390-syscalls.c \
src/unix/proctitle.c

View File

@ -19,7 +19,13 @@ the best backend for the job on each platform.
See documentation_ for more details.
The z/OS file system events monitoring infrastructure does not notify of file
creation/deletion within a directory that is being monitored.
See the `IBM Knowledge centre`_ for more details.
.. _documentation: http://www.ibm.com/developerworks/aix/library/au-aix_event_infrastructure/
.. _`IBM Knowledge centre`: https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.2.0/com.ibm.zos.v2r1.bpxb100/ioc.htm

View File

@ -27,4 +27,7 @@
#define UV_PLATFORM_LOOP_FIELDS \
void* ep; \
#define UV_PLATFORM_FS_EVENT_FIELDS \
char rfis_rftok[8]; \
#endif /* UV_MVS_H */

View File

@ -25,6 +25,8 @@
#include <stdlib.h>
#include <assert.h>
#include <search.h>
#include <termios.h>
#include <sys/msg.h>
#define CW_CONDVAR 32
@ -103,10 +105,19 @@ static void maybe_resize(uv__os390_epoll* lst, unsigned int len) {
unsigned int newsize;
unsigned int i;
struct pollfd* newlst;
struct pollfd event;
if (len <= lst->size)
return;
if (lst->size == 0)
event.fd = -1;
else {
/* Extract the message queue at the end. */
event = lst->items[lst->size - 1];
lst->items[lst->size - 1].fd = -1;
}
newsize = next_power_of_two(len);
newlst = uv__realloc(lst->items, newsize * sizeof(lst->items[0]));
@ -115,11 +126,40 @@ static void maybe_resize(uv__os390_epoll* lst, unsigned int len) {
for (i = lst->size; i < newsize; ++i)
newlst[i].fd = -1;
/* Restore the message queue at the end */
newlst[newsize - 1] = event;
lst->items = newlst;
lst->size = newsize;
}
static void init_message_queue(uv__os390_epoll* lst) {
struct {
long int header;
char body;
} msg;
/* initialize message queue */
lst->msg_queue = msgget(IPC_PRIVATE, 0622 | IPC_CREAT);
if (lst->msg_queue == -1)
abort();
/*
On z/OS, the message queue will be affiliated with the process only
when a send is performed on it. Once this is done, the system
can be queried for all message queues belonging to our process id.
*/
msg.header = 1;
if (msgsnd(lst->msg_queue, &msg, sizeof(msg.body), 0) != 0)
abort();
/* Clean up the dummy message sent above */
if (msgrcv(lst->msg_queue, &msg, sizeof(msg.body), 0, 0) != sizeof(msg.body))
abort();
}
static void before_fork(void) {
uv_mutex_lock(&global_epoll_lock);
}
@ -139,8 +179,13 @@ static void child_fork(void) {
/* reset epoll list */
while (!QUEUE_EMPTY(&global_epoll_queue)) {
uv__os390_epoll* lst;
q = QUEUE_HEAD(&global_epoll_queue);
QUEUE_REMOVE(q);
lst = QUEUE_DATA(q, uv__os390_epoll, member);
uv__free(lst->items);
lst->items = NULL;
lst->size = 0;
}
uv_mutex_unlock(&global_epoll_lock);
@ -166,6 +211,10 @@ uv__os390_epoll* epoll_create1(int flags) {
/* initialize list */
lst->size = 0;
lst->items = NULL;
init_message_queue(lst);
maybe_resize(lst, 1);
lst->items[lst->size - 1].fd = lst->msg_queue;
lst->items[lst->size - 1].events = POLLIN;
uv_once(&once, epoll_init);
uv_mutex_lock(&global_epoll_lock);
QUEUE_INSERT_TAIL(&global_epoll_queue, &lst->member);
@ -182,15 +231,20 @@ int epoll_ctl(uv__os390_epoll* lst,
struct epoll_event *event) {
uv_mutex_lock(&global_epoll_lock);
if(op == EPOLL_CTL_DEL) {
if (op == EPOLL_CTL_DEL) {
if (fd >= lst->size || lst->items[fd].fd == -1) {
uv_mutex_unlock(&global_epoll_lock);
errno = ENOENT;
return -1;
}
lst->items[fd].fd = -1;
} else if(op == EPOLL_CTL_ADD) {
maybe_resize(lst, fd + 1);
} else if (op == EPOLL_CTL_ADD) {
/* Resizing to 'fd + 1' would expand the list to contain at least
* 'fd'. But we need to guarantee that the last index on the list
* is reserved for the message queue. So specify 'fd + 2' instead.
*/
maybe_resize(lst, fd + 2);
if (lst->items[fd].fd != -1) {
uv_mutex_unlock(&global_epoll_lock);
errno = EEXIST;
@ -198,7 +252,7 @@ int epoll_ctl(uv__os390_epoll* lst,
}
lst->items[fd].fd = fd;
lst->items[fd].events = event->events;
} else if(op == EPOLL_CTL_MOD) {
} else if (op == EPOLL_CTL_MOD) {
if (fd >= lst->size || lst->items[fd].fd == -1) {
uv_mutex_unlock(&global_epoll_lock);
errno = ENOENT;
@ -215,17 +269,19 @@ int epoll_ctl(uv__os390_epoll* lst,
int epoll_wait(uv__os390_epoll* lst, struct epoll_event* events,
int maxevents, int timeout) {
size_t size;
nmsgsfds_t size;
struct pollfd* pfds;
int pollret;
int reventcount;
size = lst->size;
size = _SET_FDS_MSGS(size, 1, lst->size - 1);
pfds = lst->items;
pollret = poll(pfds, size, timeout);
if (pollret <= 0)
return pollret;
pollret = _NFDS(pollret) + _NMSGS(pollret);
reventcount = 0;
for (int i = 0;
i < lst->size && i < maxevents && reventcount < pollret; ++i) {
@ -261,9 +317,14 @@ int epoll_file_close(int fd) {
}
void epoll_queue_close(uv__os390_epoll* lst) {
/* Remove epoll instance from global queue */
uv_mutex_lock(&global_epoll_lock);
QUEUE_REMOVE(&lst->member);
uv_mutex_unlock(&global_epoll_lock);
/* Free resources */
msgctl(lst->msg_queue, IPC_RMID, NULL);
lst->msg_queue = -1;
uv__free(lst->items);
lst->items = NULL;
}

View File

@ -50,6 +50,7 @@ typedef struct {
QUEUE member;
struct pollfd* items;
unsigned long size;
int msg_queue;
} uv__os390_epoll;
/* epoll api */

View File

@ -26,6 +26,8 @@
#include <unistd.h>
#include <sys/ps.h>
#include <builtins.h>
#include <termios.h>
#include <sys/msg.h>
#if defined(__clang__)
#include "csrsic.h"
#else
@ -684,11 +686,124 @@ int uv__io_check_fd(uv_loop_t* loop, int fd) {
return 0;
}
void uv__fs_event_close(uv_fs_event_t* handle) {
uv_fs_event_stop(handle);
}
int uv_fs_event_init(uv_loop_t* loop, uv_fs_event_t* handle) {
uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT);
return 0;
}
int uv_fs_event_start(uv_fs_event_t* handle, uv_fs_event_cb cb,
const char* filename, unsigned int flags) {
uv__os390_epoll* ep;
_RFIS reg_struct;
char* path;
int rc;
if (uv__is_active(handle))
return -EINVAL;
ep = handle->loop->ep;
assert(ep->msg_queue != -1);
reg_struct.__rfis_cmd = _RFIS_REG;
reg_struct.__rfis_qid = ep->msg_queue;
reg_struct.__rfis_type = 1;
memcpy(reg_struct.__rfis_utok, &handle, sizeof(handle));
path = uv__strdup(filename);
if (path == NULL)
return -ENOMEM;
rc = __w_pioctl(path, _IOCC_REGFILEINT, sizeof(reg_struct), &reg_struct);
if (rc != 0)
return -errno;
uv__handle_start(handle);
handle->path = path;
handle->cb = cb;
memcpy(handle->rfis_rftok, reg_struct.__rfis_rftok,
sizeof(handle->rfis_rftok));
return 0;
}
int uv_fs_event_stop(uv_fs_event_t* handle) {
uv__os390_epoll* ep;
_RFIS reg_struct;
int rc;
if (!uv__is_active(handle))
return 0;
ep = handle->loop->ep;
assert(ep->msg_queue != -1);
reg_struct.__rfis_cmd = _RFIS_UNREG;
reg_struct.__rfis_qid = ep->msg_queue;
reg_struct.__rfis_type = 1;
memcpy(reg_struct.__rfis_rftok, handle->rfis_rftok,
sizeof(handle->rfis_rftok));
/*
* This call will take "/" as the path argument in case we
* don't care to supply the correct path. The system will simply
* ignore it.
*/
rc = __w_pioctl("/", _IOCC_REGFILEINT, sizeof(reg_struct), &reg_struct);
if (rc != 0 && errno != EALREADY && errno != ENOENT)
abort();
uv__handle_stop(handle);
return 0;
}
static int os390_message_queue_handler(uv__os390_epoll* ep) {
uv_fs_event_t* handle;
int msglen;
int events;
_RFIM msg;
if (ep->msg_queue == -1)
return 0;
msglen = msgrcv(ep->msg_queue, &msg, sizeof(msg), 0, IPC_NOWAIT);
if (msglen == -1 && errno == ENOMSG)
return 0;
if (msglen == -1)
abort();
events = 0;
if (msg.__rfim_event == _RFIM_ATTR || msg.__rfim_event == _RFIM_WRITE)
events = UV_CHANGE;
else if (msg.__rfim_event == _RFIM_RENAME)
events = UV_RENAME;
else
/* Some event that we are not interested in. */
return 0;
handle = *(uv_fs_event_t**)(msg.__rfim_utok);
handle->cb(handle, uv__basename_r(handle->path), events, 0);
return 1;
}
void uv__io_poll(uv_loop_t* loop, int timeout) {
static const int max_safe_timeout = 1789569;
struct epoll_event events[1024];
struct epoll_event* pe;
struct epoll_event e;
uv__os390_epoll* ep;
int real_timeout;
QUEUE* q;
uv__io_t* w;
@ -802,6 +917,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) {
if (fd == -1)
continue;
ep = loop->ep;
if (fd == ep->msg_queue) {
os390_message_queue_handler(ep);
continue;
}
assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);
@ -866,7 +987,12 @@ void uv__set_process_title(const char* title) {
}
int uv__io_fork(uv_loop_t* loop) {
uv__platform_loop_delete(loop);
/*
Nullify the msg queue but don't close it because
it is still being used by the parent.
*/
loop->ep = NULL;
uv__platform_loop_delete(loop);
return uv__platform_loop_init(loop);
}

View File

@ -209,7 +209,7 @@ UNUSED static int can_ipv6(void) {
return supported;
}
#if defined(__MVS__) || defined(__CYGWIN__) || defined(__MSYS__)
#if defined(__CYGWIN__) || defined(__MSYS__)
# define NO_FS_EVENTS "Filesystem watching not supported on this platform."
#endif

View File

@ -533,10 +533,12 @@ TEST_IMPL(fork_fs_events_file_parent_child) {
#if defined(NO_FS_EVENTS)
RETURN_SKIP(NO_FS_EVENTS);
#endif
#if defined(__sun) || defined(_AIX)
#if defined(__sun) || defined(_AIX) || defined(__MVS__)
/* It's not possible to implement this without additional
* bookkeeping on SunOS. For AIX it is possible, but has to be
* written. See https://github.com/libuv/libuv/pull/846#issuecomment-287170420
* TODO: On z/OS, we need to open another message queue and subscribe to the
* same events as the parent.
*/
return 0;
#else

View File

@ -396,6 +396,8 @@ static void timer_cb_watch_twice(uv_timer_t* handle) {
TEST_IMPL(fs_event_watch_dir) {
#if defined(NO_FS_EVENTS)
RETURN_SKIP(NO_FS_EVENTS);
#elif defined(__MVS__)
RETURN_SKIP("Directory watching not supported on this platform.");
#endif
uv_loop_t* loop = uv_default_loop();
@ -820,6 +822,8 @@ static void fs_event_cb_close(uv_fs_event_t* handle, const char* filename,
TEST_IMPL(fs_event_close_in_callback) {
#if defined(NO_FS_EVENTS)
RETURN_SKIP(NO_FS_EVENTS);
#elif defined(__MVS__)
RETURN_SKIP("Directory watching not supported on this platform.");
#endif
uv_loop_t* loop;
int r;

View File

@ -1562,9 +1562,6 @@ TEST_IMPL(spawn_fs_open) {
#ifndef _WIN32
TEST_IMPL(closed_fd_events) {
#if defined(__MVS__)
RETURN_SKIP("Filesystem watching not supported on this platform.");
#endif
uv_stdio_container_t stdio[3];
uv_pipe_t pipe_handle;
int fd[2];

1
uv.gyp
View File

@ -340,7 +340,6 @@
['OS=="zos"', {
'sources': [
'src/unix/pthread-fixes.c',
'src/unix/no-fsevents.c',
'src/unix/os390.c',
'src/unix/os390-syscalls.c'
]