socketpair: add eventfd and use SOCK_NONBLOCK for socketpair()

Currently, we use `pipe` for `wakeup_create`, which requires ***two***
file descriptors. Furthermore, given its complexity inside, `pipe` is a
bit heavyweight for just a simple event wait/notify mechanism.

`eventfd` would be a more suitable solution for this kind of scenario,
kernel also advocates for developers to use `eventfd` instead of `pipe`
in some simple use cases:

    Applications can use an eventfd file descriptor instead of a pipe
    (see pipe(2) in all cases where a pipe is used simply to signal
    events. The kernel overhead of an eventfd file descriptor is much
    lower than that of a pipe, and only one file descriptor is required
    (versus the two required for a pipe).

This change adds the new backend of `eventfd` for `wakeup_create` and
uses it where available, eliminating the overhead of `pipe`. Also, it
optimizes the `wakeup_create` to eliminate the system calls that make
file descriptors non-blocking by moving the logic of setting
non-blocking flags on file descriptors to `socketpair.c` and using
`SOCK_NONBLOCK` for `socketpair(2)`, `EFD_NONBLOCK` for `eventfd(2)`.

Ref:
https://man7.org/linux/man-pages/man7/pipe.7.html
https://man7.org/linux/man-pages/man2/eventfd.2.html
https://man7.org/linux/man-pages/man2/socketpair.2.html
https://www.gnu.org/software/gnulib/manual/html_node/eventfd.html

Closes #13874
This commit is contained in:
Andy Pan 2024-06-02 02:26:11 +08:00 committed by Daniel Stenberg
parent b71916b859
commit 23fe1a52dc
No known key found for this signature in database
GPG Key ID: 5CC908FDB71E12C2
10 changed files with 149 additions and 41 deletions

View File

@ -88,6 +88,7 @@ set(HAVE_GETPWUID_R 0)
set(HAVE_STRERROR_R 0)
set(HAVE_SIGINTERRUPT 0)
set(HAVE_PIPE 0)
set(HAVE_EVENTFD 0)
set(HAVE_IF_NAMETOINDEX 0)
set(HAVE_GETRLIMIT 0)
set(HAVE_SETRLIMIT 0)
@ -121,6 +122,7 @@ set(HAVE_POLL_H 0)
set(HAVE_POLL_FINE 0)
set(HAVE_PWD_H 0)
set(HAVE_STRINGS_H 0) # mingw-w64 has it (wrapper to string.h)
set(HAVE_SYS_EVENTFD_H 0)
set(HAVE_SYS_FILIO_H 0)
set(HAVE_SYS_WAIT_H 0)
set(HAVE_SYS_IOCTL_H 0)

View File

@ -1168,6 +1168,7 @@ if(WIN32)
endif()
endif()
check_include_file_concat("sys/eventfd.h" HAVE_SYS_EVENTFD_H)
check_include_file_concat("sys/filio.h" HAVE_SYS_FILIO_H)
check_include_file_concat("sys/wait.h" HAVE_SYS_WAIT_H)
check_include_file_concat("sys/ioctl.h" HAVE_SYS_IOCTL_H)
@ -1288,6 +1289,7 @@ check_symbol_exists(getaddrinfo "${CURL_INCLUDES};stdlib.h;string.h" HAVE_GET
check_symbol_exists(getifaddrs "${CURL_INCLUDES};stdlib.h" HAVE_GETIFADDRS)
check_symbol_exists(freeaddrinfo "${CURL_INCLUDES}" HAVE_FREEADDRINFO)
check_symbol_exists(pipe "${CURL_INCLUDES}" HAVE_PIPE)
check_symbol_exists(eventfd "${CURL_INCLUDES};sys/eventfd.h" HAVE_EVENTFD)
check_symbol_exists(ftruncate "${CURL_INCLUDES}" HAVE_FTRUNCATE)
check_symbol_exists(_fseeki64 "${CURL_INCLUDES};stdio.h" HAVE__FSEEKI64)
check_symbol_exists(getpeername "${CURL_INCLUDES}" HAVE_GETPEERNAME)

View File

@ -3575,6 +3575,7 @@ AC_CHECK_HEADERS(
stdbool.h \
sys/filio.h \
sys/wait.h \
sys/eventfd.h \
setjmp.h,
dnl to do if not found
[],
@ -3780,6 +3781,7 @@ AC_CHECK_DECLS([getpwuid_r], [], [AC_DEFINE(HAVE_DECL_GETPWUID_R_MISSING, 1, "Se
AC_CHECK_FUNCS([\
_fseeki64 \
arc4random \
eventfd \
fnmatch \
fseeko \
geteuid \

View File

@ -168,7 +168,7 @@ struct thread_sync_data {
duplicate */
#ifndef CURL_DISABLE_SOCKETPAIR
struct Curl_easy *data;
curl_socket_t sock_pair[2]; /* socket pair */
curl_socket_t sock_pair[2]; /* eventfd/pipes/socket pair */
#endif
int sock_error;
struct Curl_addrinfo *res;
@ -251,7 +251,7 @@ int init_thread_sync_data(struct thread_data *td,
#ifndef CURL_DISABLE_SOCKETPAIR
/* create socket pair or pipe */
if(wakeup_create(&tsd->sock_pair[0]) < 0) {
if(wakeup_create(tsd->sock_pair, FALSE) < 0) {
tsd->sock_pair[0] = CURL_SOCKET_BAD;
tsd->sock_pair[1] = CURL_SOCKET_BAD;
goto err_exit;
@ -302,6 +302,14 @@ query_complete(DWORD err, DWORD bytes, LPWSAOVERLAPPED overlapped)
struct Curl_addrinfo *ca;
struct Curl_addrinfo *cafirst = NULL;
struct Curl_addrinfo *calast = NULL;
#ifndef CURL_DISABLE_SOCKETPAIR
#ifdef USE_EVENTFD
const void *buf;
const uint64_t val = 1;
#else
char buf[1];
#endif
#endif
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wcast-align"
@ -421,11 +429,14 @@ query_complete(DWORD err, DWORD bytes, LPWSAOVERLAPPED overlapped)
}
else {
#ifndef CURL_DISABLE_SOCKETPAIR
char buf[1];
if(tsd->sock_pair[1] != CURL_SOCKET_BAD) {
/* DNS has been resolved, signal client task */
#ifdef USE_EVENTFD
buf = &val;
#else
buf[0] = 1;
if(swrite(tsd->sock_pair[1], buf, sizeof(buf)) < 0) {
#endif
/* DNS has been resolved, signal client task */
if(wakeup_write(tsd->sock_pair[1], buf, sizeof(buf)) < 0) {
/* update sock_erro to errno */
tsd->sock_error = SOCKERRNO;
}
@ -460,7 +471,12 @@ CURL_STDCALL getaddrinfo_thread(void *arg)
char service[12];
int rc;
#ifndef CURL_DISABLE_SOCKETPAIR
#ifdef USE_EVENTFD
const void *buf;
const uint64_t val = 1;
#else
char buf[1];
#endif
#endif
msnprintf(service, sizeof(service), "%d", tsd->port);
@ -486,9 +502,13 @@ CURL_STDCALL getaddrinfo_thread(void *arg)
else {
#ifndef CURL_DISABLE_SOCKETPAIR
if(tsd->sock_pair[1] != CURL_SOCKET_BAD) {
/* DNS has been resolved, signal client task */
#ifdef USE_EVENTFD
buf = &val;
#else
buf[0] = 1;
if(wakeup_write(tsd->sock_pair[1], buf, sizeof(buf)) < 0) {
#endif
/* DNS has been resolved, signal client task */
if(wakeup_write(tsd->sock_pair[1], buf, sizeof(buf)) < 0) {
/* update sock_erro to errno */
tsd->sock_error = SOCKERRNO;
}

View File

@ -419,6 +419,9 @@
/* Define to 1 if you have the `pipe' function. */
#cmakedefine HAVE_PIPE 1
/* Define to 1 if you have the `eventfd' function. */
#cmakedefine HAVE_EVENTFD 1
/* If you have a fine poll */
#cmakedefine HAVE_POLL_FINE 1
@ -539,6 +542,9 @@
/* Define to 1 if you have the timeval struct. */
#cmakedefine HAVE_STRUCT_TIMEVAL 1
/* Define to 1 if you have the <sys/eventfd.h> header file. */
#cmakedefine HAVE_SYS_EVENTFD_H 1
/* Define to 1 if you have the <sys/filio.h> header file. */
#cmakedefine HAVE_SYS_FILIO_H 1

View File

@ -426,14 +426,7 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */
goto error;
#else
#ifdef ENABLE_WAKEUP
if(wakeup_create(multi->wakeup_pair) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
else if(curlx_nonblock(multi->wakeup_pair[0], TRUE) < 0 ||
curlx_nonblock(multi->wakeup_pair[1], TRUE) < 0) {
wakeup_close(multi->wakeup_pair[0]);
wakeup_close(multi->wakeup_pair[1]);
if(wakeup_create(multi->wakeup_pair, TRUE) < 0) {
multi->wakeup_pair[0] = CURL_SOCKET_BAD;
multi->wakeup_pair[1] = CURL_SOCKET_BAD;
}
@ -1639,6 +1632,15 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
it has to be careful only to access parts of the
Curl_multi struct that are constant */
#if defined(ENABLE_WAKEUP) && !defined(USE_WINSOCK)
#ifdef USE_EVENTFD
const void *buf;
const uint64_t val = 1;
#else
char buf[1];
#endif
#endif
/* GOOD_MULTI_HANDLE can be safely called */
if(!GOOD_MULTI_HANDLE(multi))
return CURLM_BAD_HANDLE;
@ -1652,8 +1654,11 @@ CURLMcode curl_multi_wakeup(struct Curl_multi *multi)
making it safe to access from another thread after the init part
and before cleanup */
if(multi->wakeup_pair[1] != CURL_SOCKET_BAD) {
char buf[1];
#ifdef USE_EVENTFD
buf = &val;
#else
buf[0] = 1;
#endif
while(1) {
/* swrite() is not thread-safe in general, because concurrent calls
can have their messages interleaved, but in this case the content
@ -2882,8 +2887,10 @@ CURLMcode curl_multi_cleanup(struct Curl_multi *multi)
#else
#ifdef ENABLE_WAKEUP
wakeup_close(multi->wakeup_pair[0]);
#ifndef USE_EVENTFD
wakeup_close(multi->wakeup_pair[1]);
#endif
#endif
#endif
multi_xfer_bufs_free(multi);

View File

@ -158,8 +158,9 @@ struct Curl_multi {
WSAEVENT wsa_event; /* winsock event used for waits */
#else
#ifdef ENABLE_WAKEUP
curl_socket_t wakeup_pair[2]; /* pipe()/socketpair() used for wakeup
0 is used for read, 1 is used for write */
curl_socket_t wakeup_pair[2]; /* eventfd()/pipe()/socketpair() used for
wakeup 0 is used for read, 1 is used
for write */
#endif
#endif
unsigned int max_concurrent_streams;

View File

@ -27,14 +27,31 @@
#include "urldata.h"
#include "rand.h"
#if defined(HAVE_PIPE) && defined(HAVE_FCNTL)
#include <fcntl.h>
#if defined(USE_EVENTFD)
#ifdef HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
#endif
int Curl_pipe(curl_socket_t socks[2])
int Curl_eventfd(curl_socket_t socks[2], bool nonblocking)
{
int efd = eventfd(0, nonblocking ? EFD_CLOEXEC | EFD_NONBLOCK : EFD_CLOEXEC);
if(efd == -1) {
socks[0] = socks[1] = CURL_SOCKET_BAD;
return -1;
}
socks[0] = socks[1] = efd;
return 0;
}
#elif defined(HAVE_PIPE)
#ifdef HAVE_FCNTL
#include <fcntl.h>
#endif
int Curl_pipe(curl_socket_t socks[2], bool nonblocking)
{
if(pipe(socks))
return -1;
#ifdef HAVE_FCNTL
if(fcntl(socks[0], F_SETFD, FD_CLOEXEC) ||
fcntl(socks[1], F_SETFD, FD_CLOEXEC) ) {
close(socks[0]);
@ -42,6 +59,16 @@ int Curl_pipe(curl_socket_t socks[2])
socks[0] = socks[1] = CURL_SOCKET_BAD;
return -1;
}
#endif
if(nonblocking) {
if(curlx_nonblock(socks[0], TRUE) < 0 ||
curlx_nonblock(socks[1], TRUE) < 0) {
close(socks[0]);
close(socks[1]);
socks[0] = socks[1] = CURL_SOCKET_BAD;
return -1;
}
}
return 0;
}
@ -80,7 +107,7 @@ int Curl_pipe(curl_socket_t socks[2])
#include "memdebug.h"
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2])
curl_socket_t socks[2], bool nonblocking)
{
union {
struct sockaddr_in inaddr;
@ -198,6 +225,10 @@ int Curl_socketpair(int domain, int type, int protocol,
} while(1);
}
if(nonblocking)
if(curlx_nonblock(socks[0], TRUE) < 0 ||
curlx_nonblock(socks[1], TRUE) < 0)
goto error;
sclose(listener);
return 0;
@ -207,5 +238,25 @@ error:
sclose(socks[1]);
return -1;
}
#else
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2], bool nonblocking)
{
#ifdef SOCK_NONBLOCK
type = nonblocking ? type | SOCK_NONBLOCK : type;
#endif
if(socketpair(domain, type, protocol, socks))
return -1;
#ifndef SOCK_NONBLOCK
if(nonblocking) {
if(curlx_nonblock(socks[0], TRUE) < 0 ||
curlx_nonblock(socks[1], TRUE) < 0) {
close(socks[0]);
close(socks[1]);
return -1;
}
}
#endif
return 0;
}
#endif /* ! HAVE_SOCKETPAIR */

View File

@ -26,19 +26,42 @@
#include "curl_setup.h"
#ifdef HAVE_PIPE
#if defined(HAVE_EVENTFD) && \
defined(__x86_64__) && \
defined(__aarch64__) && \
defined(__ia64__) && \
defined(__ppc64__) && \
defined(__mips64) && \
defined(__sparc64__) && \
defined(__riscv_64e) && \
defined(__s390x__)
/* Use eventfd only with 64-bit CPU architectures because eventfd has a
* stringent rule of requiring the 8-byte buffer when calling read(2) and
* write(2) on it. In some rare cases, the C standard library implementation
* on a 32-bit system might choose to define uint64_t as a 32-bit type for
* various reasons (memory limitations, compatibility with older code),
* which makes eventfd broken.
*/
#define USE_EVENTFD 1
#define wakeup_write write
#define wakeup_read read
#define wakeup_close close
#define wakeup_create(p) Curl_pipe(p)
#define wakeup_create(p,nb) Curl_eventfd(p,nb)
#ifdef HAVE_FCNTL
#include <curl/curl.h>
int Curl_pipe(curl_socket_t socks[2]);
#else
#define Curl_pipe(p) pipe(p)
#endif
int Curl_eventfd(curl_socket_t socks[2], bool nonblocking);
#elif defined(HAVE_PIPE)
#define wakeup_write write
#define wakeup_read read
#define wakeup_close close
#define wakeup_create(p,nb) Curl_pipe(p,nb)
#include <curl/curl.h>
int Curl_pipe(curl_socket_t socks[2], bool nonblocking);
#else /* HAVE_PIPE */
@ -60,19 +83,13 @@ int Curl_pipe(curl_socket_t socks[2]);
#define SOCKETPAIR_TYPE SOCK_STREAM
#endif
#define wakeup_create(p)\
Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p)
#define wakeup_create(p,nb)\
Curl_socketpair(SOCKETPAIR_FAMILY, SOCKETPAIR_TYPE, 0, p, nb)
#endif /* HAVE_PIPE */
#ifndef HAVE_SOCKETPAIR
#include <curl/curl.h>
int Curl_socketpair(int domain, int type, int protocol,
curl_socket_t socks[2]);
#else
#define Curl_socketpair(a,b,c,d) socketpair(a,b,c,d)
#endif
curl_socket_t socks[2], bool nonblocking);
#endif /* HEADER_CURL_SOCKETPAIR_H */

View File

@ -883,7 +883,7 @@ static CURLcode cf_msh3_connect(struct Curl_cfilter *cf,
CF_DATA_SAVE(save, cf, data);
if(ctx->sock[SP_LOCAL] == CURL_SOCKET_BAD) {
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0]) < 0) {
if(Curl_socketpair(AF_UNIX, SOCK_STREAM, 0, &ctx->sock[0], FALSE) < 0) {
ctx->sock[SP_LOCAL] = CURL_SOCKET_BAD;
ctx->sock[SP_REMOTE] = CURL_SOCKET_BAD;
return CURLE_COULDNT_CONNECT;