From b01de7341f40e1f30d78178442b0b87a46b3b7a7 Mon Sep 17 00:00:00 2001 From: John Barboza Date: Mon, 10 Apr 2017 08:55:37 -0400 Subject: [PATCH] zos: implement uv_fs_event* functions This commit uses the Register File Interest feature on z/OS to enable users to monitor file system events. The poll call is used to check for file descriptors as well as a message queue that z/OS will report file system events on. The last item on the list used by poll will contain the message queue id instead of a file descriptor. Limitation: Writes to a directory (that is, file creation and deletion) do not generate a change message for a registered directory. PR-URL: https://github.com/libuv/libuv/pull/1311 Reviewed-By: Ben Noordhuis --- Makefile.am | 1 - docs/src/fs_event.rst | 6 ++ include/uv-os390.h | 3 + src/unix/os390-syscalls.c | 73 ++++++++++++++++++++-- src/unix/os390-syscalls.h | 1 + src/unix/os390.c | 128 +++++++++++++++++++++++++++++++++++++- test/task.h | 2 +- test/test-fork.c | 4 +- test/test-fs-event.c | 4 ++ test/test-spawn.c | 3 - uv.gyp | 1 - 11 files changed, 212 insertions(+), 14 deletions(-) diff --git a/Makefile.am b/Makefile.am index 128ac566..2093ac23 100644 --- a/Makefile.am +++ b/Makefile.am @@ -458,7 +458,6 @@ libuv_la_CFLAGS += -D_UNIX03_THREADS \ -qFLOAT=IEEE libuv_la_LDFLAGS += -qXPLINK libuv_la_SOURCES += src/unix/pthread-fixes.c \ - src/unix/no-fsevents.c \ src/unix/os390.c \ src/unix/os390-syscalls.c \ src/unix/proctitle.c diff --git a/docs/src/fs_event.rst b/docs/src/fs_event.rst index 2af3e980..bd076aae 100644 --- a/docs/src/fs_event.rst +++ b/docs/src/fs_event.rst @@ -19,7 +19,13 @@ the best backend for the job on each platform. See documentation_ for more details. + The z/OS file system events monitoring infrastructure does not notify of file + creation/deletion within a directory that is being monitored. + See the `IBM Knowledge centre`_ for more details. + .. _documentation: http://www.ibm.com/developerworks/aix/library/au-aix_event_infrastructure/ + .. _`IBM Knowledge centre`: https://www.ibm.com/support/knowledgecenter/en/SSLTBW_2.2.0/com.ibm.zos.v2r1.bpxb100/ioc.htm + diff --git a/include/uv-os390.h b/include/uv-os390.h index 58f92611..39e7384d 100644 --- a/include/uv-os390.h +++ b/include/uv-os390.h @@ -27,4 +27,7 @@ #define UV_PLATFORM_LOOP_FIELDS \ void* ep; \ +#define UV_PLATFORM_FS_EVENT_FIELDS \ + char rfis_rftok[8]; \ + #endif /* UV_MVS_H */ diff --git a/src/unix/os390-syscalls.c b/src/unix/os390-syscalls.c index 5bc48938..21558ea8 100644 --- a/src/unix/os390-syscalls.c +++ b/src/unix/os390-syscalls.c @@ -25,6 +25,8 @@ #include #include #include +#include +#include #define CW_CONDVAR 32 @@ -103,10 +105,19 @@ static void maybe_resize(uv__os390_epoll* lst, unsigned int len) { unsigned int newsize; unsigned int i; struct pollfd* newlst; + struct pollfd event; if (len <= lst->size) return; + if (lst->size == 0) + event.fd = -1; + else { + /* Extract the message queue at the end. */ + event = lst->items[lst->size - 1]; + lst->items[lst->size - 1].fd = -1; + } + newsize = next_power_of_two(len); newlst = uv__realloc(lst->items, newsize * sizeof(lst->items[0])); @@ -115,11 +126,40 @@ static void maybe_resize(uv__os390_epoll* lst, unsigned int len) { for (i = lst->size; i < newsize; ++i) newlst[i].fd = -1; + /* Restore the message queue at the end */ + newlst[newsize - 1] = event; + lst->items = newlst; lst->size = newsize; } +static void init_message_queue(uv__os390_epoll* lst) { + struct { + long int header; + char body; + } msg; + + /* initialize message queue */ + lst->msg_queue = msgget(IPC_PRIVATE, 0622 | IPC_CREAT); + if (lst->msg_queue == -1) + abort(); + + /* + On z/OS, the message queue will be affiliated with the process only + when a send is performed on it. Once this is done, the system + can be queried for all message queues belonging to our process id. + */ + msg.header = 1; + if (msgsnd(lst->msg_queue, &msg, sizeof(msg.body), 0) != 0) + abort(); + + /* Clean up the dummy message sent above */ + if (msgrcv(lst->msg_queue, &msg, sizeof(msg.body), 0, 0) != sizeof(msg.body)) + abort(); +} + + static void before_fork(void) { uv_mutex_lock(&global_epoll_lock); } @@ -139,8 +179,13 @@ static void child_fork(void) { /* reset epoll list */ while (!QUEUE_EMPTY(&global_epoll_queue)) { + uv__os390_epoll* lst; q = QUEUE_HEAD(&global_epoll_queue); QUEUE_REMOVE(q); + lst = QUEUE_DATA(q, uv__os390_epoll, member); + uv__free(lst->items); + lst->items = NULL; + lst->size = 0; } uv_mutex_unlock(&global_epoll_lock); @@ -166,6 +211,10 @@ uv__os390_epoll* epoll_create1(int flags) { /* initialize list */ lst->size = 0; lst->items = NULL; + init_message_queue(lst); + maybe_resize(lst, 1); + lst->items[lst->size - 1].fd = lst->msg_queue; + lst->items[lst->size - 1].events = POLLIN; uv_once(&once, epoll_init); uv_mutex_lock(&global_epoll_lock); QUEUE_INSERT_TAIL(&global_epoll_queue, &lst->member); @@ -182,15 +231,20 @@ int epoll_ctl(uv__os390_epoll* lst, struct epoll_event *event) { uv_mutex_lock(&global_epoll_lock); - if(op == EPOLL_CTL_DEL) { + if (op == EPOLL_CTL_DEL) { if (fd >= lst->size || lst->items[fd].fd == -1) { uv_mutex_unlock(&global_epoll_lock); errno = ENOENT; return -1; } lst->items[fd].fd = -1; - } else if(op == EPOLL_CTL_ADD) { - maybe_resize(lst, fd + 1); + } else if (op == EPOLL_CTL_ADD) { + + /* Resizing to 'fd + 1' would expand the list to contain at least + * 'fd'. But we need to guarantee that the last index on the list + * is reserved for the message queue. So specify 'fd + 2' instead. + */ + maybe_resize(lst, fd + 2); if (lst->items[fd].fd != -1) { uv_mutex_unlock(&global_epoll_lock); errno = EEXIST; @@ -198,7 +252,7 @@ int epoll_ctl(uv__os390_epoll* lst, } lst->items[fd].fd = fd; lst->items[fd].events = event->events; - } else if(op == EPOLL_CTL_MOD) { + } else if (op == EPOLL_CTL_MOD) { if (fd >= lst->size || lst->items[fd].fd == -1) { uv_mutex_unlock(&global_epoll_lock); errno = ENOENT; @@ -215,17 +269,19 @@ int epoll_ctl(uv__os390_epoll* lst, int epoll_wait(uv__os390_epoll* lst, struct epoll_event* events, int maxevents, int timeout) { - size_t size; + nmsgsfds_t size; struct pollfd* pfds; int pollret; int reventcount; - size = lst->size; + size = _SET_FDS_MSGS(size, 1, lst->size - 1); pfds = lst->items; pollret = poll(pfds, size, timeout); if (pollret <= 0) return pollret; + pollret = _NFDS(pollret) + _NMSGS(pollret); + reventcount = 0; for (int i = 0; i < lst->size && i < maxevents && reventcount < pollret; ++i) { @@ -261,9 +317,14 @@ int epoll_file_close(int fd) { } void epoll_queue_close(uv__os390_epoll* lst) { + /* Remove epoll instance from global queue */ uv_mutex_lock(&global_epoll_lock); QUEUE_REMOVE(&lst->member); uv_mutex_unlock(&global_epoll_lock); + + /* Free resources */ + msgctl(lst->msg_queue, IPC_RMID, NULL); + lst->msg_queue = -1; uv__free(lst->items); lst->items = NULL; } diff --git a/src/unix/os390-syscalls.h b/src/unix/os390-syscalls.h index 5ce6a681..6e34a88c 100644 --- a/src/unix/os390-syscalls.h +++ b/src/unix/os390-syscalls.h @@ -50,6 +50,7 @@ typedef struct { QUEUE member; struct pollfd* items; unsigned long size; + int msg_queue; } uv__os390_epoll; /* epoll api */ diff --git a/src/unix/os390.c b/src/unix/os390.c index 127656db..081438e8 100644 --- a/src/unix/os390.c +++ b/src/unix/os390.c @@ -26,6 +26,8 @@ #include #include #include +#include +#include #if defined(__clang__) #include "csrsic.h" #else @@ -684,11 +686,124 @@ int uv__io_check_fd(uv_loop_t* loop, int fd) { return 0; } + +void uv__fs_event_close(uv_fs_event_t* handle) { + uv_fs_event_stop(handle); +} + + +int uv_fs_event_init(uv_loop_t* loop, uv_fs_event_t* handle) { + uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT); + return 0; +} + + +int uv_fs_event_start(uv_fs_event_t* handle, uv_fs_event_cb cb, + const char* filename, unsigned int flags) { + uv__os390_epoll* ep; + _RFIS reg_struct; + char* path; + int rc; + + if (uv__is_active(handle)) + return -EINVAL; + + ep = handle->loop->ep; + assert(ep->msg_queue != -1); + + reg_struct.__rfis_cmd = _RFIS_REG; + reg_struct.__rfis_qid = ep->msg_queue; + reg_struct.__rfis_type = 1; + memcpy(reg_struct.__rfis_utok, &handle, sizeof(handle)); + + path = uv__strdup(filename); + if (path == NULL) + return -ENOMEM; + + rc = __w_pioctl(path, _IOCC_REGFILEINT, sizeof(reg_struct), ®_struct); + if (rc != 0) + return -errno; + + uv__handle_start(handle); + handle->path = path; + handle->cb = cb; + memcpy(handle->rfis_rftok, reg_struct.__rfis_rftok, + sizeof(handle->rfis_rftok)); + + return 0; +} + + +int uv_fs_event_stop(uv_fs_event_t* handle) { + uv__os390_epoll* ep; + _RFIS reg_struct; + int rc; + + if (!uv__is_active(handle)) + return 0; + + ep = handle->loop->ep; + assert(ep->msg_queue != -1); + + reg_struct.__rfis_cmd = _RFIS_UNREG; + reg_struct.__rfis_qid = ep->msg_queue; + reg_struct.__rfis_type = 1; + memcpy(reg_struct.__rfis_rftok, handle->rfis_rftok, + sizeof(handle->rfis_rftok)); + + /* + * This call will take "/" as the path argument in case we + * don't care to supply the correct path. The system will simply + * ignore it. + */ + rc = __w_pioctl("/", _IOCC_REGFILEINT, sizeof(reg_struct), ®_struct); + if (rc != 0 && errno != EALREADY && errno != ENOENT) + abort(); + + uv__handle_stop(handle); + + return 0; +} + + +static int os390_message_queue_handler(uv__os390_epoll* ep) { + uv_fs_event_t* handle; + int msglen; + int events; + _RFIM msg; + + if (ep->msg_queue == -1) + return 0; + + msglen = msgrcv(ep->msg_queue, &msg, sizeof(msg), 0, IPC_NOWAIT); + + if (msglen == -1 && errno == ENOMSG) + return 0; + + if (msglen == -1) + abort(); + + events = 0; + if (msg.__rfim_event == _RFIM_ATTR || msg.__rfim_event == _RFIM_WRITE) + events = UV_CHANGE; + else if (msg.__rfim_event == _RFIM_RENAME) + events = UV_RENAME; + else + /* Some event that we are not interested in. */ + return 0; + + handle = *(uv_fs_event_t**)(msg.__rfim_utok); + handle->cb(handle, uv__basename_r(handle->path), events, 0); + return 1; +} + + void uv__io_poll(uv_loop_t* loop, int timeout) { static const int max_safe_timeout = 1789569; struct epoll_event events[1024]; struct epoll_event* pe; struct epoll_event e; + uv__os390_epoll* ep; int real_timeout; QUEUE* q; uv__io_t* w; @@ -802,6 +917,12 @@ void uv__io_poll(uv_loop_t* loop, int timeout) { if (fd == -1) continue; + ep = loop->ep; + if (fd == ep->msg_queue) { + os390_message_queue_handler(ep); + continue; + } + assert(fd >= 0); assert((unsigned) fd < loop->nwatchers); @@ -866,7 +987,12 @@ void uv__set_process_title(const char* title) { } int uv__io_fork(uv_loop_t* loop) { - uv__platform_loop_delete(loop); + /* + Nullify the msg queue but don't close it because + it is still being used by the parent. + */ + loop->ep = NULL; + uv__platform_loop_delete(loop); return uv__platform_loop_init(loop); } diff --git a/test/task.h b/test/task.h index 67eb9804..af99d92f 100644 --- a/test/task.h +++ b/test/task.h @@ -209,7 +209,7 @@ UNUSED static int can_ipv6(void) { return supported; } -#if defined(__MVS__) || defined(__CYGWIN__) || defined(__MSYS__) +#if defined(__CYGWIN__) || defined(__MSYS__) # define NO_FS_EVENTS "Filesystem watching not supported on this platform." #endif diff --git a/test/test-fork.c b/test/test-fork.c index ba85b531..924c65b2 100644 --- a/test/test-fork.c +++ b/test/test-fork.c @@ -533,10 +533,12 @@ TEST_IMPL(fork_fs_events_file_parent_child) { #if defined(NO_FS_EVENTS) RETURN_SKIP(NO_FS_EVENTS); #endif -#if defined(__sun) || defined(_AIX) +#if defined(__sun) || defined(_AIX) || defined(__MVS__) /* It's not possible to implement this without additional * bookkeeping on SunOS. For AIX it is possible, but has to be * written. See https://github.com/libuv/libuv/pull/846#issuecomment-287170420 + * TODO: On z/OS, we need to open another message queue and subscribe to the + * same events as the parent. */ return 0; #else diff --git a/test/test-fs-event.c b/test/test-fs-event.c index fba6b544..dc47b3a6 100644 --- a/test/test-fs-event.c +++ b/test/test-fs-event.c @@ -396,6 +396,8 @@ static void timer_cb_watch_twice(uv_timer_t* handle) { TEST_IMPL(fs_event_watch_dir) { #if defined(NO_FS_EVENTS) RETURN_SKIP(NO_FS_EVENTS); +#elif defined(__MVS__) + RETURN_SKIP("Directory watching not supported on this platform."); #endif uv_loop_t* loop = uv_default_loop(); @@ -820,6 +822,8 @@ static void fs_event_cb_close(uv_fs_event_t* handle, const char* filename, TEST_IMPL(fs_event_close_in_callback) { #if defined(NO_FS_EVENTS) RETURN_SKIP(NO_FS_EVENTS); +#elif defined(__MVS__) + RETURN_SKIP("Directory watching not supported on this platform."); #endif uv_loop_t* loop; int r; diff --git a/test/test-spawn.c b/test/test-spawn.c index 9a30f6cb..4086ab15 100644 --- a/test/test-spawn.c +++ b/test/test-spawn.c @@ -1562,9 +1562,6 @@ TEST_IMPL(spawn_fs_open) { #ifndef _WIN32 TEST_IMPL(closed_fd_events) { -#if defined(__MVS__) - RETURN_SKIP("Filesystem watching not supported on this platform."); -#endif uv_stdio_container_t stdio[3]; uv_pipe_t pipe_handle; int fd[2]; diff --git a/uv.gyp b/uv.gyp index dade2125..fbb384a4 100644 --- a/uv.gyp +++ b/uv.gyp @@ -340,7 +340,6 @@ ['OS=="zos"', { 'sources': [ 'src/unix/pthread-fixes.c', - 'src/unix/no-fsevents.c', 'src/unix/os390.c', 'src/unix/os390-syscalls.c' ]