diff --git a/docs/Makefile.am b/docs/Makefile.am index 45ebd13edc..e0a869bc0c 100644 --- a/docs/Makefile.am +++ b/docs/Makefile.am @@ -52,6 +52,7 @@ INTERNALDOCS = \ internals/HASH.md \ internals/LLIST.md \ internals/MQTT.md \ + internals/MULTI-EV.md \ internals/NEW-PROTOCOL.md \ internals/README.md \ internals/SPLAY.md \ diff --git a/docs/internals/MULTI-EV.md b/docs/internals/MULTI-EV.md new file mode 100644 index 0000000000..745955d5b6 --- /dev/null +++ b/docs/internals/MULTI-EV.md @@ -0,0 +1,127 @@ + + +# Multi Event Based + +A libcurl multi is operating "event based" when the application uses +and event library like `libuv` to monitor the sockets and file descriptors +libcurl uses to trigger transfer operations. How that works from the +applications point of view is described in libcurl-multi(3). + +This documents is about the internal handling. + +## Source Locations + +All code related to event based handling is found in `lib/multi_ev.c` +and `lib/multi_ev.h`. The header defines a set of internal functions +and `struct curl_multi_ev` that is embedded in each multi handle. + +There is `Curl_multi_ev_init()` and `Curl_multi_ev_cleanup()` to manage +the overall life cycle, call on creation and destruction of the multi +handle. + +## Tracking Events + +First, the various functions in `lib/multi_ev.h` only ever really do +something when the libcurl application has registered its callback +in `multi->socket_cb`. + +This is important as this callback gets informed about *changes* to sockets. +When a new socket is added, an existing is removed, or the `POLLIN/OUT` +flags change, `multi->socket_cb` needs to be invoked. `multi_ev` has to +track what it already reported to detect changes. + +Most applications are expected to go "event based" right from the start, +but the libcurl API does not prohibit an application to start another +way and then go for events later on, even in the middle of a transfer. + +### Transfer Events + +Most event that happen are in connection with a transfer. A transfer +opens a connection, which opens a socket, and waits for this socket +to become writable (`POLLOUT`) when using TCP, for example. + +The multi then calls `Curl_multi_ev_assess_xfer(multi, data)` to +let the multi event code detect what sockets the transfer is interested in. +If indeed a `multi->socket_cb` is set, the *current* transfer pollset is +retrieved via `Curl_multi_getsock()`. This current pollset is then +compared to the *previous* pollset. If relevant changes are detected, +`multi->socket_cb` gets informed about those. These can be: + + * a socket is in the current set, but not the previous one + * a socket was also in the previous one, but IN/OUT flags changed + * a socket in the previous one is no longer part of the current + +`multi_ev.c` keeps a `struct mev_sh_entry` for each sockets in a hash +with the socket as key. It tracks in each entry which transfers are +interested in this particular socket. How many transfer want to read +and/or write and what the summarized `POLLIN/POLLOUT` action, that +had been reported to `multi->socket_cb` was. + +This is necessary as a socket may be in use by several transfers +at the same time (think HTTP/2 on the same connection). When a transfer +is done and gets removed from the socket entry, it decrements +the reader and/or writer count (depending on what it was last +interested in). This *may* result in the entry's summarized action +to change, or not. + +### Connection Events + +There are also events not connected to any transfer that need to be tracked. +The multi connection cache, concerned with clean shutdowns of connections, +is interested in socket events during the shutdown. + +To allow use of the libcurl infrastructure, the connection cache operates +using an *internal* easy handle that is not a transfer as such. The +internal handle is used for all connection shutdown operations, being tied +to a particular connection only for a short time. This means tracking +the last pollset for an internal handle is useless. + +Instead, the connection cache uses `Curl_multi_ev_assess_conn()` to have +multi event handling check the connection and track a "last pollset" +for the connection alone. + +## Event Processing + +When the libcurl application is informed by the event library that +a particular socket has an event, it calls `curl_multi_socket_action()` +to make libcurl react to it. This internally invokes +`Curl_multi_ev_expire_xfers()` which expires all transfers that +are interested in the given socket, so the multi handle runs them. + +In addition `Curl_multi_ev_expire_xfers()` returns a `bool` to let +the multi know that connections are also interested in the socket, so +the connection pool should be informed as well. + +## All Things Pass + +When a transfer is done, e.g. removed from its multi handle, the +multi calls `Curl_multi_ev_xfer_done()`. This cleans up the pollset +tracking for the transfer. + +When a connection is done, and before it is destroyed, +`Curl_multi_ev_conn_done()` is called. This cleans up the pollset +tracking for this connection. + +When a socket is about to be closed, `Curl_multi_ev_socket_done()` +is called to cleanup the socket entry and all information kept there. + +These calls do not have to happen in any particular order. A transfer's +socket may be around while the transfer is ongoing. Or it might disappear +in the middle of things. Also, a transfer might be interested in several +sockets at the same time (resolving, eye balling, ftp are all examples of +those). + +### And Come Again + +While transfer and connection identifier are practically unique in a +libcurl application, sockets are not. Operating systems are keen on reusing +their resources, and the next socket may get the same identifier as +one just having been closed with high likelihood. + +This means that multi event handling needs to be informed *before* a close, +clean up all its tracking and be ready to see that same socket identifier +again right after. diff --git a/docs/libcurl/curl_global_trace.md b/docs/libcurl/curl_global_trace.md index acf966f3ce..a2db6005db 100644 --- a/docs/libcurl/curl_global_trace.md +++ b/docs/libcurl/curl_global_trace.md @@ -101,13 +101,34 @@ trace. Tracing of DNS operations to resolve hostnames and HTTPS records. +## `lib-ids` + +Adds transfer and connection identifiers as prefix to every call to +CURLOPT_DEBUGFUNCTION(3). The format is `[n-m]` where `n` is the identifier +of the transfer and `m` is the identifier of the connection. A literal `x` +is used for internal transfers or when no connection is assigned. + +For example, `[5-x]` is the prefix for transfer 5 that has no +connection. The command line tool `curl`uses the same format for its +`--trace-ids` option. + +`lib-ids` is intended for libcurl applications that handle multiple +transfers but have no own way to identify in trace output which transfer +a trace event is connected to. + ## `doh` Former name for DNS-over-HTTP operations. Now an alias for `dns`. +## `multi` + +Traces multi operations managing transfers' state changes and sockets poll +states. + ## `read` -Traces reading of upload data from the application in order to send it to the server. +Traces reading of upload data from the application in order to send it to the +server. ## `ssls` diff --git a/lib/Makefile.inc b/lib/Makefile.inc index 778e7881b0..cb48733ae7 100644 --- a/lib/Makefile.inc +++ b/lib/Makefile.inc @@ -198,6 +198,7 @@ LIB_CFILES = \ mprintf.c \ mqtt.c \ multi.c \ + multi_ev.c \ netrc.c \ nonblock.c \ noproxy.c \ @@ -335,6 +336,7 @@ LIB_HFILES = \ mime.h \ mqtt.h \ multihandle.h \ + multi_ev.h \ multiif.h \ netrc.h \ nonblock.h \ diff --git a/lib/asyn-ares.c b/lib/asyn-ares.c index 6d071969c3..2aa597f2ba 100644 --- a/lib/asyn-ares.c +++ b/lib/asyn-ares.c @@ -245,7 +245,7 @@ static void sock_state_cb(void *data, ares_socket_t socket_fd, struct Curl_easy *easy = data; if(!readable && !writable) { DEBUGASSERT(easy); - Curl_multi_closed(easy, socket_fd); + Curl_multi_will_close(easy, socket_fd); } } diff --git a/lib/asyn-thread.c b/lib/asyn-thread.c index 2ca28041d0..5982983082 100644 --- a/lib/asyn-thread.c +++ b/lib/asyn-thread.c @@ -394,7 +394,7 @@ static void destroy_async_data(struct Curl_easy *data) * ensure CURLMOPT_SOCKETFUNCTION fires CURL_POLL_REMOVE * before the FD is invalidated to avoid EBADF on EPOLL_CTL_DEL */ - Curl_multi_closed(data, sock_rd); + Curl_multi_will_close(data, sock_rd); wakeup_close(sock_rd); #endif diff --git a/lib/asyn.h b/lib/asyn.h index b963da533c..481c869bdb 100644 --- a/lib/asyn.h +++ b/lib/asyn.h @@ -177,7 +177,7 @@ void Curl_resolver_kill(struct Curl_easy *data); /* Curl_resolver_getsock() * - * This function is called from the multi_getsock() function. 'sock' is a + * This function is called from the Curl_multi_getsock() function. 'sock' is a * pointer to an array to hold the file descriptors, with 'numsock' being the * size of that array (in number of entries). This function is supposed to * return bitmask indicating what file descriptors (referring to array indexes diff --git a/lib/cf-socket.c b/lib/cf-socket.c index b7b218f896..ec8a76e1aa 100644 --- a/lib/cf-socket.c +++ b/lib/cf-socket.c @@ -422,7 +422,7 @@ static int socket_close(struct Curl_easy *data, struct connectdata *conn, if(use_callback && conn && conn->fclosesocket) { int rc; - Curl_multi_closed(data, sock); + Curl_multi_will_close(data, sock); Curl_set_in_callback(data, TRUE); rc = conn->fclosesocket(conn->closesocket_client, sock); Curl_set_in_callback(data, FALSE); @@ -431,7 +431,7 @@ static int socket_close(struct Curl_easy *data, struct connectdata *conn, if(conn) /* tell the multi-socket code about this */ - Curl_multi_closed(data, sock); + Curl_multi_will_close(data, sock); sclose(sock); @@ -997,7 +997,7 @@ static void cf_socket_close(struct Curl_cfilter *cf, struct Curl_easy *data) struct cf_socket_ctx *ctx = cf->ctx; if(ctx && CURL_SOCKET_BAD != ctx->sock) { - CURL_TRC_CF(data, cf, "cf_socket_close(%" FMT_SOCKET_T ")", ctx->sock); + CURL_TRC_CF(data, cf, "cf_socket_close, fd=%" FMT_SOCKET_T, ctx->sock); if(ctx->sock == cf->conn->sock[cf->sockindex]) cf->conn->sock[cf->sockindex] = CURL_SOCKET_BAD; socket_close(data, cf->conn, !ctx->accepted, ctx->sock); @@ -1019,7 +1019,7 @@ static CURLcode cf_socket_shutdown(struct Curl_cfilter *cf, if(cf->connected) { struct cf_socket_ctx *ctx = cf->ctx; - CURL_TRC_CF(data, cf, "cf_socket_shutdown(%" FMT_SOCKET_T ")", ctx->sock); + CURL_TRC_CF(data, cf, "cf_socket_shutdown, fd=%" FMT_SOCKET_T, ctx->sock); /* On TCP, and when the socket looks well and non-blocking mode * can be enabled, receive dangling bytes before close to avoid * entering RST states unnecessarily. */ diff --git a/lib/cfilters.c b/lib/cfilters.c index 1c9e7bc776..c7f9ab71f8 100644 --- a/lib/cfilters.c +++ b/lib/cfilters.c @@ -200,8 +200,8 @@ CURLcode Curl_conn_shutdown(struct Curl_easy *data, int sockindex, bool *done) *done = FALSE; now = Curl_now(); if(!Curl_shutdown_started(data, sockindex)) { - DEBUGF(infof(data, "shutdown start on%s connection", - sockindex ? " secondary" : "")); + CURL_TRC_M(data, "shutdown start on%s connection", + sockindex ? " secondary" : ""); Curl_shutdown_start(data, sockindex, &now); } else { @@ -476,7 +476,7 @@ CURLcode Curl_conn_connect(struct Curl_easy *data, /* In general, we want to send after connect, wait on that. */ if(sockfd != CURL_SOCKET_BAD) Curl_pollset_set_out_only(data, &ps, sockfd); - Curl_conn_adjust_pollset(data, &ps); + Curl_conn_adjust_pollset(data, data->conn, &ps); result = Curl_pollfds_add_ps(&cpfds, &ps); if(result) goto out; @@ -626,14 +626,15 @@ void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf, } void Curl_conn_adjust_pollset(struct Curl_easy *data, - struct easy_pollset *ps) + struct connectdata *conn, + struct easy_pollset *ps) { int i; DEBUGASSERT(data); - DEBUGASSERT(data->conn); + DEBUGASSERT(conn); for(i = 0; i < 2; ++i) { - Curl_conn_cf_adjust_pollset(data->conn->cfilter[i], data, ps); + Curl_conn_cf_adjust_pollset(conn->cfilter[i], data, ps); } } diff --git a/lib/cfilters.h b/lib/cfilters.h index 19edea9b98..0b9098a5d1 100644 --- a/lib/cfilters.h +++ b/lib/cfilters.h @@ -455,7 +455,8 @@ void Curl_conn_cf_adjust_pollset(struct Curl_cfilter *cf, * Adjust pollset from filters installed at transfer's connection. */ void Curl_conn_adjust_pollset(struct Curl_easy *data, - struct easy_pollset *ps); + struct connectdata *conn, + struct easy_pollset *ps); /** * Curl_poll() the filter chain at `cf` with timeout `timeout_ms`. diff --git a/lib/conncache.c b/lib/conncache.c index 2df29d9385..d926eb69e3 100644 --- a/lib/conncache.c +++ b/lib/conncache.c @@ -32,6 +32,7 @@ #include "cfilters.h" #include "progress.h" #include "multiif.h" +#include "multi_ev.h" #include "sendf.h" #include "conncache.h" #include "http_negotiate.h" @@ -94,8 +95,8 @@ static void cpool_run_conn_shutdown(struct Curl_easy *data, bool *done); static void cpool_run_conn_shutdown_handler(struct Curl_easy *data, struct connectdata *conn); -static CURLMcode cpool_update_shutdown_ev(struct Curl_multi *multi, - struct Curl_easy *data, +static CURLMcode cpool_update_shutdown_ev(struct cpool *cpool, + struct Curl_multi *multi, struct connectdata *conn); static void cpool_shutdown_all(struct cpool *cpool, struct Curl_easy *data, int timeout_ms); @@ -148,10 +149,10 @@ static void cpool_bundle_free_entry(void *freethis) } int Curl_cpool_init(struct cpool *cpool, - Curl_cpool_disconnect_cb *disconnect_cb, - struct Curl_multi *multi, - struct Curl_share *share, - size_t size) + Curl_cpool_disconnect_cb *disconnect_cb, + struct Curl_multi *multi, + struct Curl_share *share, + size_t size) { DEBUGASSERT(!!multi != !!share); /* either one */ Curl_hash_init(&cpool->dest2bundle, size, Curl_hash_str, @@ -317,10 +318,10 @@ int Curl_cpool_check_limits(struct Curl_easy *data, if(!oldest_idle) break; /* disconnect the old conn and continue */ - DEBUGF(infof(data, "Discarding connection #%" + CURL_TRC_M(data, "Discarding connection #%" FMT_OFF_T " from %zu to reach destination " "limit of %zu", oldest_idle->connection_id, - Curl_llist_count(&bundle->conns), dest_limit)); + Curl_llist_count(&bundle->conns), dest_limit); Curl_cpool_disconnect(data, oldest_idle, FALSE); /* in case the bundle was destroyed in disconnect, look it up again */ @@ -341,10 +342,10 @@ int Curl_cpool_check_limits(struct Curl_easy *data, if(!oldest_idle) break; /* disconnect the old conn and continue */ - DEBUGF(infof(data, "Discarding connection #%" - FMT_OFF_T " from %zu to reach total " - "limit of %zu", - oldest_idle->connection_id, cpool->num_conn, total_limit)); + CURL_TRC_M(data, "Discarding connection #%" + FMT_OFF_T " from %zu to reach total " + "limit of %zu", + oldest_idle->connection_id, cpool->num_conn, total_limit); Curl_cpool_disconnect(data, oldest_idle, FALSE); shutdowns = Curl_llist_count(&cpool->shutdowns); } @@ -652,8 +653,6 @@ static void cpool_shutdown_discard_all(struct cpool *cpool) while(e) { conn = Curl_node_elem(e); Curl_node_remove(e); - DEBUGF(infof(cpool->idata, "discard connection #%" FMT_OFF_T, - conn->connection_id)); cpool_close_and_destroy(cpool, conn, NULL, FALSE); e = Curl_llist_head(&cpool->shutdowns); } @@ -735,9 +734,9 @@ static void cpool_discard_conn(struct cpool *cpool, * are other users of it */ if(CONN_INUSE(conn) && !aborted) { - DEBUGF(infof(data, "[CCACHE] not discarding #%" FMT_OFF_T - " still in use by %zu transfers", conn->connection_id, - CONN_INUSE(conn))); + CURL_TRC_M(data, "[CPOOL] not discarding #%" FMT_OFF_T + " still in use by %zu transfers", conn->connection_id, + CONN_INUSE(conn)); return; } @@ -758,8 +757,7 @@ static void cpool_discard_conn(struct cpool *cpool, /* Attempt to shutdown the connection right away. */ Curl_attach_connection(data, conn); cpool_run_conn_shutdown(data, conn, &done); - DEBUGF(infof(data, "[CCACHE] shutdown #%" FMT_OFF_T ", done=%d", - conn->connection_id, done)); + CURL_TRC_M(data, "[CPOOL] shutdown, done=%d", done); Curl_detach_connection(data); } @@ -773,30 +771,26 @@ static void cpool_discard_conn(struct cpool *cpool, if(data->multi && data->multi->max_total_connections > 0 && (data->multi->max_total_connections <= (long)(cpool->num_conn + Curl_llist_count(&cpool->shutdowns)))) { - DEBUGF(infof(data, "[CCACHE] discarding oldest shutdown connection " - "due to connection limit of %ld", - data->multi->max_total_connections)); + CURL_TRC_M(data, "[CPOOL] discarding oldest shutdown connection " + "due to connection limit of %ld", + data->multi->max_total_connections); cpool_shutdown_destroy_oldest(cpool); } if(data->multi && data->multi->socket_cb) { DEBUGASSERT(cpool == &data->multi->cpool); - /* Start with an empty shutdown pollset, so out internal closure handle - * is added to the sockets. */ - memset(&conn->shutdown_poll, 0, sizeof(conn->shutdown_poll)); - if(cpool_update_shutdown_ev(data->multi, cpool->idata, conn)) { - DEBUGF(infof(data, "[CCACHE] update events for shutdown failed, " - "discarding #%" FMT_OFF_T, - conn->connection_id)); + if(cpool_update_shutdown_ev(cpool, data->multi, conn)) { + CURL_TRC_M(data, "[CPOOL] update events failed, discarding #%" + FMT_OFF_T, conn->connection_id); cpool_close_and_destroy(cpool, conn, data, FALSE); return; } } Curl_llist_append(&cpool->shutdowns, conn, &conn->cpool_node); - DEBUGF(infof(data, "[CCACHE] added #%" FMT_OFF_T - " to shutdowns, now %zu conns in shutdown", - conn->connection_id, Curl_llist_count(&cpool->shutdowns))); + CURL_TRC_M(data, "[CPOOL] added #%" FMT_OFF_T + " to shutdowns, now %zu conns in shutdown", + conn->connection_id, Curl_llist_count(&cpool->shutdowns)); } void Curl_cpool_disconnect(struct Curl_easy *data, @@ -930,7 +924,7 @@ static CURLcode cpool_add_pollfds(struct cpool *cpool, conn = Curl_node_elem(e); memset(&ps, 0, sizeof(ps)); Curl_attach_connection(cpool->idata, conn); - Curl_conn_adjust_pollset(cpool->idata, &ps); + Curl_conn_adjust_pollset(cpool->idata, conn, &ps); Curl_detach_connection(cpool->idata); result = Curl_pollfds_add_ps(cpfds, &ps); @@ -971,7 +965,7 @@ unsigned int Curl_cpool_add_waitfds(struct cpool *cpool, conn = Curl_node_elem(e); memset(&ps, 0, sizeof(ps)); Curl_attach_connection(cpool->idata, conn); - Curl_conn_adjust_pollset(cpool->idata, &ps); + Curl_conn_adjust_pollset(cpool->idata, conn, &ps); Curl_detach_connection(cpool->idata); need += Curl_waitfds_add_ps(cwfds, &ps); @@ -997,7 +991,7 @@ void Curl_cpool_setfds(struct cpool *cpool, struct connectdata *conn = Curl_node_elem(e); memset(&ps, 0, sizeof(ps)); Curl_attach_connection(cpool->idata, conn); - Curl_conn_adjust_pollset(cpool->idata, &ps); + Curl_conn_adjust_pollset(cpool->idata, conn, &ps); Curl_detach_connection(cpool->idata); for(i = 0; i < ps.num; i++) { @@ -1036,15 +1030,14 @@ static void cpool_perform(struct cpool *cpool) return; DEBUGASSERT(data); - DEBUGF(infof(data, "[CCACHE] perform, %zu connections being shutdown", - Curl_llist_count(&cpool->shutdowns))); + CURL_TRC_M(data, "[CPOOL] perform, %zu connections being shutdown", + Curl_llist_count(&cpool->shutdowns)); while(e) { enext = Curl_node_next(e); conn = Curl_node_elem(e); Curl_attach_connection(data, conn); cpool_run_conn_shutdown(data, conn, &done); - DEBUGF(infof(data, "[CCACHE] shutdown #%" FMT_OFF_T ", done=%d", - conn->connection_id, done)); + CURL_TRC_M(data, "[CPOOL] shutdown, done=%d", done); Curl_detach_connection(data); if(done) { Curl_node_remove(e); @@ -1067,14 +1060,6 @@ static void cpool_perform(struct cpool *cpool) Curl_expire(data, next_from_now_ms, EXPIRE_RUN_NOW); } -void Curl_cpool_multi_perform(struct Curl_multi *multi) -{ - CPOOL_LOCK(&multi->cpool); - cpool_perform(&multi->cpool); - CPOOL_UNLOCK(&multi->cpool); -} - - /* * Close and destroy the connection. Run the shutdown sequence once, * of so requested. @@ -1108,8 +1093,7 @@ static void cpool_close_and_destroy(struct cpool *cpool, } if(cpool) - DEBUGF(infof(data, "[CCACHE] closing #%" FMT_OFF_T, - conn->connection_id)); + CURL_TRC_M(data, "[CPOOL] closing connection"); else DEBUGF(infof(data, "closing connection #%" FMT_OFF_T, conn->connection_id)); @@ -1117,60 +1101,54 @@ static void cpool_close_and_destroy(struct cpool *cpool, Curl_conn_close(data, FIRSTSOCKET); Curl_detach_connection(data); + if(cpool && cpool->multi) + Curl_multi_ev_conn_done(cpool->multi, data, conn); + else if(data->multi) + Curl_multi_ev_conn_done(data->multi, data, conn); + Curl_conn_free(data, conn); if(cpool && cpool->multi) { - DEBUGF(infof(data, "[CCACHE] trigger multi connchanged")); + CURL_TRC_M(data, "[CPOOL] trigger multi connchanged"); Curl_multi_connchanged(cpool->multi); } } -static CURLMcode cpool_update_shutdown_ev(struct Curl_multi *multi, - struct Curl_easy *data, +static CURLMcode cpool_update_shutdown_ev(struct cpool *cpool, + struct Curl_multi *multi, struct connectdata *conn) { - struct easy_pollset ps; CURLMcode mresult; - DEBUGASSERT(data); + DEBUGASSERT(cpool); DEBUGASSERT(multi); DEBUGASSERT(multi->socket_cb); - memset(&ps, 0, sizeof(ps)); - Curl_attach_connection(data, conn); - Curl_conn_adjust_pollset(data, &ps); - Curl_detach_connection(data); - - mresult = Curl_multi_pollset_ev(multi, data, &ps, &conn->shutdown_poll); - - if(!mresult) /* Remember for next time */ - memcpy(&conn->shutdown_poll, &ps, sizeof(ps)); + Curl_attach_connection(cpool->idata, conn); + mresult = Curl_multi_ev_assess_conn(multi, cpool->idata, conn); + Curl_detach_connection(cpool->idata); return mresult; } -void Curl_cpool_multi_socket(struct Curl_multi *multi, - curl_socket_t s, int ev_bitmask) +static void cpool_multi_socket(struct Curl_multi *multi, curl_socket_t s) { struct cpool *cpool = &multi->cpool; - struct Curl_easy *data = cpool->idata; struct Curl_llist_node *e; struct connectdata *conn; bool done; - (void)ev_bitmask; DEBUGASSERT(multi->socket_cb); CPOOL_LOCK(cpool); e = Curl_llist_head(&cpool->shutdowns); while(e) { conn = Curl_node_elem(e); if(s == conn->sock[FIRSTSOCKET] || s == conn->sock[SECONDARYSOCKET]) { - Curl_attach_connection(data, conn); - cpool_run_conn_shutdown(data, conn, &done); - DEBUGF(infof(data, "[CCACHE] shutdown #%" FMT_OFF_T ", done=%d", - conn->connection_id, done)); - Curl_detach_connection(data); - if(done || cpool_update_shutdown_ev(multi, data, conn)) { + Curl_attach_connection(cpool->idata, conn); + cpool_run_conn_shutdown(cpool->idata, conn, &done); + CURL_TRC_M(cpool->idata, "[CPOOL] shutdown, done=%d", done); + Curl_detach_connection(cpool->idata); + if(done || cpool_update_shutdown_ev(cpool, multi, conn)) { Curl_node_remove(e); cpool_close_and_destroy(cpool, conn, NULL, FALSE); } @@ -1181,6 +1159,17 @@ void Curl_cpool_multi_socket(struct Curl_multi *multi, CPOOL_UNLOCK(cpool); } +void Curl_cpool_multi_perform(struct Curl_multi *multi, curl_socket_t s) +{ + CPOOL_LOCK(&multi->cpool); + if((s == CURL_SOCKET_TIMEOUT) || (!multi->socket_cb)) + cpool_perform(&multi->cpool); + else + cpool_multi_socket(multi, s); + CPOOL_UNLOCK(&multi->cpool); +} + + #define NUM_POLLS_ON_STACK 10 static CURLcode cpool_shutdown_wait(struct cpool *cpool, int timeout_ms) @@ -1212,14 +1201,13 @@ static void cpool_shutdown_all(struct cpool *cpool, return; (void)data; - DEBUGF(infof(data, "cpool shutdown all")); + CURL_TRC_M(data, "[CPOOL] shutdown all"); /* Move all connections into the shutdown queue */ for(conn = cpool_get_live_conn(cpool); conn; conn = cpool_get_live_conn(cpool)) { /* Move conn from live set to shutdown or destroy right away */ - DEBUGF(infof(data, "moving connection #%" FMT_OFF_T - " to shutdown queue", conn->connection_id)); + CURL_TRC_M(data, "[CPOOL] moving connection to shutdown queue"); cpool_remove_conn(cpool, conn); cpool_discard_conn(cpool, data, conn, FALSE); } @@ -1231,21 +1219,21 @@ static void cpool_shutdown_all(struct cpool *cpool, cpool_perform(cpool); if(!Curl_llist_head(&cpool->shutdowns)) { - DEBUGF(infof(data, "cpool shutdown ok")); + CURL_TRC_M(data, "[CPOOL] shutdown finished cleanly"); break; } /* wait for activity, timeout or "nothing" */ timespent = Curl_timediff(Curl_now(), started); if(timespent >= (timediff_t)timeout_ms) { - DEBUGF(infof(data, "cpool shutdown %s", - (timeout_ms > 0) ? "timeout" : "best effort done")); + CURL_TRC_M(data, "[CPOOL] shutdown finished, %s", + (timeout_ms > 0) ? "timeout" : "best effort done"); break; } remain_ms = timeout_ms - (int)timespent; if(cpool_shutdown_wait(cpool, remain_ms)) { - DEBUGF(infof(data, "cpool shutdown all, abort")); + CURL_TRC_M(data, "[CPOOL] shutdown finished, aborted"); break; } } diff --git a/lib/conncache.h b/lib/conncache.h index 5f239bc0b0..d12328cd41 100644 --- a/lib/conncache.h +++ b/lib/conncache.h @@ -191,13 +191,9 @@ void Curl_cpool_setfds(struct cpool *cpool, int *maxfd); /** - * Perform maintenance on connections in the pool. Specifically, - * progress the shutdown of connections in the queue. + * Run connections on socket. If socket is CURL_SOCKET_TIMEOUT, run + * maintenance on all connections. */ -void Curl_cpool_multi_perform(struct Curl_multi *multi); - -void Curl_cpool_multi_socket(struct Curl_multi *multi, - curl_socket_t s, int ev_bitmask); - +void Curl_cpool_multi_perform(struct Curl_multi *multi, curl_socket_t s); #endif /* HEADER_CURL_CONNCACHE_H */ diff --git a/lib/curl_trc.c b/lib/curl_trc.c index 2dcd6a974d..ea74964e65 100644 --- a/lib/curl_trc.c +++ b/lib/curl_trc.c @@ -53,12 +53,10 @@ #include "curl_memory.h" #include "memdebug.h" -void Curl_debug(struct Curl_easy *data, curl_infotype type, - char *ptr, size_t size) +static void trc_write(struct Curl_easy *data, curl_infotype type, + char *ptr, size_t size) { if(data->set.verbose) { - static const char s_infotype[CURLINFO_END][3] = { - "* ", "< ", "> ", "{ ", "} ", "{ ", "} " }; if(data->set.fdebug) { bool inCallback = Curl_is_in_callback(data); Curl_set_in_callback(data, TRUE); @@ -66,6 +64,8 @@ void Curl_debug(struct Curl_easy *data, curl_infotype type, Curl_set_in_callback(data, inCallback); } else { + static const char s_infotype[CURLINFO_END][3] = { + "* ", "< ", "> ", "{ ", "} ", "{ ", "} " }; switch(type) { case CURLINFO_TEXT: case CURLINFO_HEADER_OUT: @@ -80,6 +80,100 @@ void Curl_debug(struct Curl_easy *data, curl_infotype type, } } +/* max length we trace before ending in '...' */ +#define TRC_LINE_MAX 2048 + +#define CURL_TRC_FMT_IDSC "[x-%" CURL_FORMAT_CURL_OFF_T "] " +#define CURL_TRC_FMT_IDSD "[%" CURL_FORMAT_CURL_OFF_T "-x] " +#define CURL_TRC_FMT_IDSDC "[%" CURL_FORMAT_CURL_OFF_T "-%" \ + CURL_FORMAT_CURL_OFF_T "] " + +static struct curl_trc_feat Curl_trc_feat_ids = { + "LIB-IDS", + CURL_LOG_LVL_NONE, +}; +#define CURL_TRC_IDS(data) \ + (Curl_trc_is_verbose(data) && \ + Curl_trc_feat_ids.log_level >= CURL_LOG_LVL_INFO) + +static size_t trc_print_ids(struct Curl_easy *data, char *buf, size_t maxlen) +{ + curl_off_t cid = data->conn ? + data->conn->connection_id : data->state.recent_conn_id; + if(data->id >= 0) { + if(cid >= 0) + return msnprintf(buf, maxlen, CURL_TRC_FMT_IDSDC, data->id, cid); + else + return msnprintf(buf, maxlen, CURL_TRC_FMT_IDSD, data->id); + } + else if(cid >= 0) + return msnprintf(buf, maxlen, CURL_TRC_FMT_IDSC, cid); + else { + return msnprintf(buf, maxlen, "[x-x] "); + } +} + +static size_t trc_end_buf(char *buf, size_t len, size_t maxlen, bool addnl) +{ + /* make sure we end the trace line in `buf` properly. It needs + * to end with a terminating '\0' or '\n\0' */ + if(len >= (maxlen - (addnl ? 2 : 1))) { + len = maxlen - 5; + buf[len++] = '.'; + buf[len++] = '.'; + buf[len++] = '.'; + buf[len++] = '\n'; + } + else if(addnl) + buf[len++] = '\n'; + buf[len] = '\0'; + return len; +} + +void Curl_debug(struct Curl_easy *data, curl_infotype type, + char *ptr, size_t size) +{ + if(data->set.verbose) { + static const char s_infotype[CURLINFO_END][3] = { + "* ", "< ", "> ", "{ ", "} ", "{ ", "} " }; + char buf[TRC_LINE_MAX]; + size_t len; + if(data->set.fdebug) { + bool inCallback = Curl_is_in_callback(data); + + if(CURL_TRC_IDS(data) && (size < TRC_LINE_MAX)) { + len = trc_print_ids(data, buf, TRC_LINE_MAX); + len += msnprintf(buf + len, TRC_LINE_MAX - len, "%.*s", + (int)size, ptr); + len = trc_end_buf(buf, len, TRC_LINE_MAX, FALSE); + Curl_set_in_callback(data, TRUE); + (void)(*data->set.fdebug)(data, type, buf, len, data->set.debugdata); + Curl_set_in_callback(data, inCallback); + } + else { + Curl_set_in_callback(data, TRUE); + (void)(*data->set.fdebug)(data, type, ptr, size, data->set.debugdata); + Curl_set_in_callback(data, inCallback); + } + } + else { + switch(type) { + case CURLINFO_TEXT: + case CURLINFO_HEADER_OUT: + case CURLINFO_HEADER_IN: + if(CURL_TRC_IDS(data)) { + len = trc_print_ids(data, buf, TRC_LINE_MAX); + fwrite(buf, len, 1, data->set.err); + } + fwrite(s_infotype[type], 2, 1, data->set.err); + fwrite(ptr, size, 1, data->set.err); + break; + default: /* nada */ + break; + } + } + } +} /* Curl_failf() is for messages stating why we failed. * The message SHALL NOT include any LF or CR. @@ -89,7 +183,7 @@ void Curl_failf(struct Curl_easy *data, const char *fmt, ...) DEBUGASSERT(!strchr(fmt, '\n')); if(data->set.verbose || data->set.errorbuffer) { va_list ap; - int len; + size_t len; char error[CURL_ERROR_SIZE + 2]; va_start(ap, fmt); len = mvsnprintf(error, CURL_ERROR_SIZE, fmt, ap); @@ -100,36 +194,41 @@ void Curl_failf(struct Curl_easy *data, const char *fmt, ...) } error[len++] = '\n'; error[len] = '\0'; - Curl_debug(data, CURLINFO_TEXT, error, len); + trc_write(data, CURLINFO_TEXT, error, len); va_end(ap); } } #if !defined(CURL_DISABLE_VERBOSE_STRINGS) -/* Curl_infof() is for info message along the way */ -#define MAXINFO 2048 -static void trc_infof(struct Curl_easy *data, struct curl_trc_feat *feat, - const char * const fmt, va_list ap) CURL_PRINTF(3, 0); +static void trc_infof(struct Curl_easy *data, + struct curl_trc_feat *feat, + const char *opt_id, int opt_id_idx, + const char * const fmt, va_list ap) CURL_PRINTF(5, 0); -static void trc_infof(struct Curl_easy *data, struct curl_trc_feat *feat, +static void trc_infof(struct Curl_easy *data, + struct curl_trc_feat *feat, + const char *opt_id, int opt_id_idx, const char * const fmt, va_list ap) { - int len = 0; - char buffer[MAXINFO + 5]; + size_t len = 0; + char buf[TRC_LINE_MAX]; + + if(CURL_TRC_IDS(data)) + len += trc_print_ids(data, buf + len, TRC_LINE_MAX - len); if(feat) - len = msnprintf(buffer, (MAXINFO + 1), "[%s] ", feat->name); - len += mvsnprintf(buffer + len, (MAXINFO + 1) - len, fmt, ap); - if(len >= MAXINFO) { /* too long, shorten with '...' */ - --len; - buffer[len++] = '.'; - buffer[len++] = '.'; - buffer[len++] = '.'; + len += msnprintf(buf + len, TRC_LINE_MAX - len, "[%s] ", feat->name); + if(opt_id) { + if(opt_id_idx > 0) + len += msnprintf(buf + len, TRC_LINE_MAX - len, "[%s-%d] ", + opt_id, opt_id_idx); + else + len += msnprintf(buf + len, TRC_LINE_MAX - len, "[%s] ", opt_id); } - buffer[len++] = '\n'; - buffer[len] = '\0'; - Curl_debug(data, CURLINFO_TEXT, buffer, len); + len += mvsnprintf(buf + len, TRC_LINE_MAX - len, fmt, ap); + len = trc_end_buf(buf, len, TRC_LINE_MAX, TRUE); + trc_write(data, CURLINFO_TEXT, buf, len); } void Curl_infof(struct Curl_easy *data, const char *fmt, ...) @@ -138,7 +237,7 @@ void Curl_infof(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_is_verbose(data)) { va_list ap; va_start(ap, fmt); - trc_infof(data, data->state.feat, fmt, ap); + trc_infof(data, data->state.feat, NULL, 0, fmt, ap); va_end(ap); } } @@ -149,25 +248,16 @@ void Curl_trc_cf_infof(struct Curl_easy *data, struct Curl_cfilter *cf, DEBUGASSERT(cf); if(Curl_trc_cf_is_verbose(cf, data)) { va_list ap; - int len = 0; - char buffer[MAXINFO + 2]; - if(data->state.feat) - len += msnprintf(buffer + len, MAXINFO - len, "[%s] ", - data->state.feat->name); - if(cf->sockindex) - len += msnprintf(buffer + len, MAXINFO - len, "[%s-%d] ", - cf->cft->name, cf->sockindex); - else - len += msnprintf(buffer + len, MAXINFO - len, "[%s] ", cf->cft->name); va_start(ap, fmt); - len += mvsnprintf(buffer + len, MAXINFO - len, fmt, ap); + trc_infof(data, data->state.feat, cf->cft->name, cf->sockindex, fmt, ap); va_end(ap); - buffer[len++] = '\n'; - buffer[len] = '\0'; - Curl_debug(data, CURLINFO_TEXT, buffer, len); } } +struct curl_trc_feat Curl_trc_feat_multi = { + "MULTI", + CURL_LOG_LVL_NONE, +}; struct curl_trc_feat Curl_trc_feat_read = { "READ", CURL_LOG_LVL_NONE, @@ -182,13 +272,54 @@ struct curl_trc_feat Curl_trc_feat_dns = { }; +static const char * const Curl_trc_mstate_names[]={ + "INIT", + "PENDING", + "SETUP", + "CONNECT", + "RESOLVING", + "CONNECTING", + "TUNNELING", + "PROTOCONNECT", + "PROTOCONNECTING", + "DO", + "DOING", + "DOING_MORE", + "DID", + "PERFORMING", + "RATELIMITING", + "DONE", + "COMPLETED", + "MSGSENT", +}; + +const char *Curl_trc_mstate_name(int state) +{ + if((state >= 0) && ((size_t)state < CURL_ARRAYSIZE(Curl_trc_mstate_names))) + return Curl_trc_mstate_names[(size_t)state]; + return "?"; +} + +void Curl_trc_multi(struct Curl_easy *data, const char *fmt, ...) +{ + DEBUGASSERT(!strchr(fmt, '\n')); + if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_multi)) { + const char *sname = (data->id >= 0) ? + Curl_trc_mstate_name(data->mstate) : NULL; + va_list ap; + va_start(ap, fmt); + trc_infof(data, &Curl_trc_feat_multi, sname, 0, fmt, ap); + va_end(ap); + } +} + void Curl_trc_read(struct Curl_easy *data, const char *fmt, ...) { DEBUGASSERT(!strchr(fmt, '\n')); if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_read)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_read, fmt, ap); + trc_infof(data, &Curl_trc_feat_read, NULL, 0, fmt, ap); va_end(ap); } } @@ -199,7 +330,7 @@ void Curl_trc_write(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_write)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_write, fmt, ap); + trc_infof(data, &Curl_trc_feat_write, NULL, 0, fmt, ap); va_end(ap); } } @@ -210,7 +341,7 @@ void Curl_trc_dns(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_dns)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_dns, fmt, ap); + trc_infof(data, &Curl_trc_feat_dns, NULL, 0, fmt, ap); va_end(ap); } } @@ -227,7 +358,7 @@ void Curl_trc_ftp(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ftp)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_ftp, fmt, ap); + trc_infof(data, &Curl_trc_feat_ftp, NULL, 0, fmt, ap); va_end(ap); } } @@ -245,7 +376,7 @@ void Curl_trc_smtp(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_smtp)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_smtp, fmt, ap); + trc_infof(data, &Curl_trc_feat_smtp, NULL, 0, fmt, ap); va_end(ap); } } @@ -263,7 +394,7 @@ void Curl_trc_ssls(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ssls)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_ssls, fmt, ap); + trc_infof(data, &Curl_trc_feat_ssls, NULL, 0, fmt, ap); va_end(ap); } } @@ -281,7 +412,7 @@ void Curl_trc_ws(struct Curl_easy *data, const char *fmt, ...) if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_ws)) { va_list ap; va_start(ap, fmt); - trc_infof(data, &Curl_trc_feat_ws, fmt, ap); + trc_infof(data, &Curl_trc_feat_ws, NULL, 0, fmt, ap); va_end(ap); } } @@ -291,6 +422,7 @@ void Curl_trc_ws(struct Curl_easy *data, const char *fmt, ...) #define TRC_CT_PROTOCOL (1<<(0)) #define TRC_CT_NETWORK (1<<(1)) #define TRC_CT_PROXY (1<<(2)) +#define TRC_CT_INTERNALS (1<<(3)) struct trc_feat_def { struct curl_trc_feat *feat; @@ -298,6 +430,8 @@ struct trc_feat_def { }; static struct trc_feat_def trc_feats[] = { + { &Curl_trc_feat_ids, TRC_CT_INTERNALS }, + { &Curl_trc_feat_multi, TRC_CT_NETWORK }, { &Curl_trc_feat_read, TRC_CT_NONE }, { &Curl_trc_feat_write, TRC_CT_NONE }, { &Curl_trc_feat_dns, TRC_CT_NETWORK }, @@ -468,6 +602,11 @@ void Curl_trc_cf_infof(struct Curl_easy *data, struct curl_trc_feat; +void Curl_trc_multi(struct Curl_easy *data, const char *fmt, ...) +{ + (void)data; (void)fmt; +} + void Curl_trc_write(struct Curl_easy *data, const char *fmt, ...) { (void)data; (void)fmt; diff --git a/lib/curl_trc.h b/lib/curl_trc.h index 5412c7a5f8..c87bdb7dd3 100644 --- a/lib/curl_trc.h +++ b/lib/curl_trc.h @@ -82,6 +82,10 @@ void Curl_infof(struct Curl_easy *data, */ void Curl_trc_cf_infof(struct Curl_easy *data, struct Curl_cfilter *cf, const char *fmt, ...) CURL_PRINTF(3, 4); +void Curl_trc_multi(struct Curl_easy *data, + const char *fmt, ...) CURL_PRINTF(2, 3); +const char *Curl_trc_mstate_name(int state); +#define CURL_MSTATE_NAME(s) Curl_trc_mstate_name((int)(s)) void Curl_trc_write(struct Curl_easy *data, const char *fmt, ...) CURL_PRINTF(2, 3); void Curl_trc_read(struct Curl_easy *data, @@ -114,6 +118,9 @@ void Curl_trc_ws(struct Curl_easy *data, #define infof(data, ...) \ do { if(Curl_trc_is_verbose(data)) \ Curl_infof(data, __VA_ARGS__); } while(0) +#define CURL_TRC_M(data, ...) \ + do { if(Curl_trc_ft_is_verbose(data, &Curl_trc_feat_multi)) \ + Curl_trc_multi(data, __VA_ARGS__); } while(0) #define CURL_TRC_CF(data, cf, ...) \ do { if(Curl_trc_cf_is_verbose(cf, data)) \ Curl_trc_cf_infof(data, cf, __VA_ARGS__); } while(0) @@ -151,6 +158,7 @@ void Curl_trc_ws(struct Curl_easy *data, #else /* CURL_HAVE_C99 */ #define infof Curl_infof +#define CURL_TRC_M Curl_trc_multi #define CURL_TRC_CF Curl_trc_cf_infof #define CURL_TRC_WRITE Curl_trc_write #define CURL_TRC_READ Curl_trc_read @@ -178,6 +186,7 @@ struct curl_trc_feat { const char *name; int log_level; }; +extern struct curl_trc_feat Curl_trc_feat_multi; extern struct curl_trc_feat Curl_trc_feat_read; extern struct curl_trc_feat Curl_trc_feat_write; extern struct curl_trc_feat Curl_trc_feat_dns; @@ -199,6 +208,7 @@ extern struct curl_trc_feat Curl_trc_feat_dns; #define Curl_trc_is_verbose(d) (FALSE) #define Curl_trc_cf_is_verbose(x,y) (FALSE) #define Curl_trc_ft_is_verbose(x,y) (FALSE) +#define CURL_MSTATE_NAME(x) ((void)(x), "-") #endif /* !defined(CURL_DISABLE_VERBOSE_STRINGS) */ diff --git a/lib/easy.c b/lib/easy.c index 72caeff3f3..36619d44cc 100644 --- a/lib/easy.c +++ b/lib/easy.c @@ -1195,10 +1195,10 @@ CURLcode curl_easy_pause(CURL *d, int action) } out: - if(!result && !data->state.done && keep_changed) - /* This transfer may have been moved in or out of the bundle, update the - corresponding socket callback, if used */ - result = Curl_updatesocket(data); + if(!result && !data->state.done && keep_changed && data->multi) + /* pause/unpausing may result in multi event changes */ + if(Curl_multi_ev_assess_xfer(data->multi, data)) + result = CURLE_ABORTED_BY_CALLBACK; if(recursive) /* this might have called a callback recursively which might have set this diff --git a/lib/multi.c b/lib/multi.c index be52d54f54..71bcd5bc10 100644 --- a/lib/multi.c +++ b/lib/multi.c @@ -36,6 +36,7 @@ #include "share.h" #include "psl.h" #include "multiif.h" +#include "multi_ev.h" #include "sendf.h" #include "timeval.h" #include "http.h" @@ -94,8 +95,6 @@ static void move_pending_to_connect(struct Curl_multi *multi, struct Curl_easy *data); -static CURLMcode singlesocket(struct Curl_multi *multi, - struct Curl_easy *data); static CURLMcode add_next_timeout(struct curltime now, struct Curl_multi *multi, struct Curl_easy *d); @@ -104,31 +103,6 @@ static CURLMcode multi_timeout(struct Curl_multi *multi, long *timeout_ms); static void process_pending_handles(struct Curl_multi *multi); static void multi_xfer_bufs_free(struct Curl_multi *multi); -static void expire_ex(struct Curl_easy *data, const struct curltime *nowp, - timediff_t milli, expire_id id); - -#if defined( DEBUGBUILD) && !defined(CURL_DISABLE_VERBOSE_STRINGS) -static const char * const multi_statename[]={ - "INIT", - "PENDING", - "SETUP", - "CONNECT", - "RESOLVING", - "CONNECTING", - "TUNNELING", - "PROTOCONNECT", - "PROTOCONNECTING", - "DO", - "DOING", - "DOING_MORE", - "DID", - "PERFORMING", - "RATELIMITING", - "DONE", - "COMPLETED", - "MSGSENT", -}; -#endif /* function pointer called once when switching TO a state */ typedef void (*init_multistate_func)(struct Curl_easy *data); @@ -179,26 +153,18 @@ static void mstate(struct Curl_easy *data, CURLMstate state NULL /* MSGSENT */ }; -#if defined(DEBUGBUILD) && defined(CURL_DISABLE_VERBOSE_STRINGS) - (void) lineno; -#endif - if(oldstate == state) /* do not bother when the new state is the same as the old state */ return; - data->mstate = state; - -#if defined(DEBUGBUILD) && !defined(CURL_DISABLE_VERBOSE_STRINGS) - if(data->mstate >= MSTATE_PENDING && - data->mstate < MSTATE_COMPLETED) { - infof(data, - "STATE: %s => %s handle %p; line %d", - multi_statename[oldstate], multi_statename[data->mstate], - (void *)data, lineno); - } +#ifdef DEBUGBUILD + CURL_TRC_M(data, "-> [%s] (line %d)", CURL_MSTATE_NAME(state), lineno); +#else + CURL_TRC_M(data, "-> [%s]", CURL_MSTATE_NAME(state)); #endif + data->mstate = state; + if(state == MSTATE_COMPLETED) { /* changing to COMPLETED means there is one less easy handle 'alive' */ DEBUGASSERT(data->multi->num_alive > 0); @@ -220,163 +186,6 @@ static void mstate(struct Curl_easy *data, CURLMstate state #define multistate(x,y) mstate(x,y, __LINE__) #endif -/* - * We add one of these structs to the sockhash for each socket - */ - -struct Curl_sh_entry { - struct Curl_hash transfers; /* hash of transfers using this socket */ - unsigned int action; /* what combined action READ/WRITE this socket waits - for */ - unsigned int users; /* number of transfers using this */ - void *socketp; /* settable by users with curl_multi_assign() */ - unsigned int readers; /* this many transfers want to read */ - unsigned int writers; /* this many transfers want to write */ -}; - -/* look up a given socket in the socket hash, skip invalid sockets */ -static struct Curl_sh_entry *sh_getentry(struct Curl_hash *sh, - curl_socket_t s) -{ - if(s != CURL_SOCKET_BAD) { - /* only look for proper sockets */ - return Curl_hash_pick(sh, (char *)&s, sizeof(curl_socket_t)); - } - return NULL; -} - -#define TRHASH_SIZE 13 - -/* the given key here is a struct Curl_easy pointer */ -static size_t trhash(void *key, size_t key_length, size_t slots_num) -{ - unsigned char bytes = ((unsigned char *)key)[key_length - 1] ^ - ((unsigned char *)key)[0]; - return (bytes % slots_num); -} - -static size_t trhash_compare(void *k1, size_t k1_len, void *k2, size_t k2_len) -{ - (void)k2_len; - return !memcmp(k1, k2, k1_len); -} - -static void trhash_dtor(void *nada) -{ - (void)nada; -} - -/* - * The sockhash has its own separate subhash in each entry that need to be - * safely destroyed first. - */ -static void sockhash_destroy(struct Curl_hash *h) -{ - struct Curl_hash_iterator iter; - struct Curl_hash_element *he; - - DEBUGASSERT(h); - Curl_hash_start_iterate(h, &iter); - he = Curl_hash_next_element(&iter); - while(he) { - struct Curl_sh_entry *sh = (struct Curl_sh_entry *)he->ptr; - Curl_hash_destroy(&sh->transfers); - he = Curl_hash_next_element(&iter); - } - Curl_hash_destroy(h); -} - - -/* make sure this socket is present in the hash for this handle */ -static struct Curl_sh_entry *sh_addentry(struct Curl_hash *sh, - curl_socket_t s) -{ - struct Curl_sh_entry *there = sh_getentry(sh, s); - struct Curl_sh_entry *check; - - if(there) { - /* it is present, return fine */ - return there; - } - - /* not present, add it */ - check = calloc(1, sizeof(struct Curl_sh_entry)); - if(!check) - return NULL; /* major failure */ - - Curl_hash_init(&check->transfers, TRHASH_SIZE, trhash, trhash_compare, - trhash_dtor); - - /* make/add new hash entry */ - if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) { - Curl_hash_destroy(&check->transfers); - free(check); - return NULL; /* major failure */ - } - - return check; /* things are good in sockhash land */ -} - - -/* delete the given socket + handle from the hash */ -static void sh_delentry(struct Curl_sh_entry *entry, - struct Curl_hash *sh, curl_socket_t s) -{ - Curl_hash_destroy(&entry->transfers); - - /* We remove the hash entry. This will end up in a call to - sh_freeentry(). */ - Curl_hash_delete(sh, (char *)&s, sizeof(curl_socket_t)); -} - -/* - * free a sockhash entry - */ -static void sh_freeentry(void *freethis) -{ - struct Curl_sh_entry *p = (struct Curl_sh_entry *) freethis; - - free(p); -} - -static size_t fd_key_compare(void *k1, size_t k1_len, void *k2, size_t k2_len) -{ - (void) k1_len; (void) k2_len; - - return (*((curl_socket_t *) k1)) == (*((curl_socket_t *) k2)); -} - -static size_t hash_fd(void *key, size_t key_length, size_t slots_num) -{ - curl_socket_t fd = *((curl_socket_t *) key); - (void) key_length; - - return (fd % (curl_socket_t)slots_num); -} - -/* - * sh_init() creates a new socket hash and returns the handle for it. - * - * Quote from README.multi_socket: - * - * "Some tests at 7000 and 9000 connections showed that the socket hash lookup - * is somewhat of a bottle neck. Its current implementation may be a bit too - * limiting. It simply has a fixed-size array, and on each entry in the array - * it has a linked list with entries. The hash only checks which list to scan - * through. The code I had used so for used a list with merely 7 slots (as - * that is what the DNS hash uses) but with 7000 connections that would make - * an average of 1000 nodes in each list to run through. I upped that to 97 - * slots (I believe a prime is suitable) and noticed a significant speed - * increase. I need to reconsider the hash implementation or use a rather - * large default value like this. At 9000 connections I was still below 10us - * per call." - * - */ -static void sh_init(struct Curl_hash *hash, size_t hashsize) -{ - Curl_hash_init(hash, hashsize, hash_fd, fd_key_compare, - sh_freeentry); -} /* multi->proto_hash destructor. Should never be called as elements * MUST be added with their own destructor */ @@ -400,7 +209,7 @@ static void multi_addmsg(struct Curl_multi *multi, struct Curl_message *msg) Curl_llist_append(&multi->msglist, msg, &msg->list); } -struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */ +struct Curl_multi *Curl_multi_handle(size_t ev_hashsize, /* event hash */ size_t chashsize, /* connection hash */ size_t dnssize, /* dns hash */ size_t sesssize) /* TLS session cache */ @@ -413,8 +222,7 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */ multi->magic = CURL_MULTI_HANDLE; Curl_init_dnscache(&multi->hostcache, dnssize); - - sh_init(&multi->sockhash, hashsize); + Curl_multi_ev_init(multi, ev_hashsize); Curl_hash_init(&multi->proto_hash, 23, Curl_hash_str, Curl_str_key_compare, ph_freeentry); @@ -452,7 +260,7 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* socket hash */ error: - sockhash_destroy(&multi->sockhash); + Curl_multi_ev_cleanup(multi); Curl_hash_destroy(&multi->proto_hash); Curl_hash_destroy(&multi->hostcache); Curl_cpool_destroy(&multi->cpool); @@ -588,6 +396,7 @@ CURLMcode curl_multi_add_handle(CURLM *m, CURL *d) Curl_cpool_xfer_init(data); multi_warn_debug(multi, data); + CURL_TRC_M(data, "added, transfers=%u", multi->num_easy); return CURLM_OK; } @@ -621,9 +430,8 @@ static void multi_done_locked(struct connectdata *conn, if(CONN_INUSE(conn)) { /* Stop if still used. */ - DEBUGF(infof(data, "Connection still in use %zu, " - "no more multi_done now!", - Curl_llist_count(&conn->easyq))); + CURL_TRC_M(data, "Connection still in use %zu, no more multi_done now!", + Curl_llist_count(&conn->easyq)); return; } @@ -660,12 +468,12 @@ static void multi_done_locked(struct connectdata *conn, #endif ) || conn->bits.close || (mdctx->premature && !Curl_conn_is_multiplex(conn, FIRSTSOCKET))) { - DEBUGF(infof(data, "multi_done, not reusing connection=%" - FMT_OFF_T ", forbid=%d" - ", close=%d, premature=%d, conn_multiplex=%d", - conn->connection_id, data->set.reuse_forbid, - conn->bits.close, mdctx->premature, - Curl_conn_is_multiplex(conn, FIRSTSOCKET))); + CURL_TRC_M(data, "multi_done, not reusing connection=%" + FMT_OFF_T ", forbid=%d" + ", close=%d, premature=%d, conn_multiplex=%d", + conn->connection_id, data->set.reuse_forbid, + conn->bits.close, mdctx->premature, + Curl_conn_is_multiplex(conn, FIRSTSOCKET)); connclose(conn, "disconnecting"); Curl_cpool_disconnect(data, conn, mdctx->premature); } @@ -703,14 +511,8 @@ static CURLcode multi_done(struct Curl_easy *data, memset(&mdctx, 0, sizeof(mdctx)); -#if defined(DEBUGBUILD) && !defined(CURL_DISABLE_VERBOSE_STRINGS) - DEBUGF(infof(data, "multi_done[%s]: status: %d prem: %d done: %d", - multi_statename[data->mstate], - (int)status, (int)premature, data->state.done)); -#else - DEBUGF(infof(data, "multi_done: status: %d prem: %d done: %d", - (int)status, (int)premature, data->state.done)); -#endif + CURL_TRC_M(data, "multi_done: status: %d prem: %d done: %d", + (int)status, (int)premature, data->state.done); if(data->state.done) /* Stop if multi_done() has already been called */ @@ -861,18 +663,14 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d) Curl_wildcard_dtor(&data->wildcard); - /* change state without using multistate(), only to make singlesocket() do - what we want */ data->mstate = MSTATE_COMPLETED; - /* This ignores the return code even in case of problems because there is - nothing more to do about that, here */ - (void)singlesocket(multi, data); /* to let the application know what sockets - that vanish with this handle */ - /* Remove the association between the connection and the handle */ Curl_detach_connection(data); + /* Tell event handling that this transfer is definitely going away */ + Curl_multi_ev_xfer_done(multi, data); + if(data->set.connect_only && !data->multi_easy) { /* This removes a handle that was part the multi interface that used CONNECT_ONLY, that connection is now left alive but since this handle @@ -927,6 +725,8 @@ CURLMcode curl_multi_remove_handle(CURLM *m, CURL *d) if(rc) return rc; } + + CURL_TRC_M(data, "removed, transfers=%u", multi->num_easy); return CURLM_OK; } @@ -1068,13 +868,15 @@ static int perform_getsock(struct Curl_easy *data, curl_socket_t *sock) /* Initializes `poll_set` with the current socket poll actions needed * for transfer `data`. */ -static void multi_getsock(struct Curl_easy *data, - struct easy_pollset *ps) +void Curl_multi_getsock(struct Curl_easy *data, + struct easy_pollset *ps, + const char *caller) { bool expect_sockets = TRUE; - /* The no connection case can happen when this is called from - curl_multi_remove_handle() => singlesocket() => multi_getsock(). - */ + + /* If the transfer has no connection, this is fine. Happens when + called via curl_multi_remove_handle() => Curl_multi_ev_assess() => + Curl_multi_getsock(). */ Curl_pollset_reset(data, ps); if(!data->conn) return; @@ -1098,30 +900,30 @@ static void multi_getsock(struct Curl_easy *data, case MSTATE_CONNECTING: case MSTATE_TUNNELING: Curl_pollset_add_socks(data, ps, connecting_getsock); - Curl_conn_adjust_pollset(data, ps); + Curl_conn_adjust_pollset(data, data->conn, ps); break; case MSTATE_PROTOCONNECT: case MSTATE_PROTOCONNECTING: Curl_pollset_add_socks(data, ps, protocol_getsock); - Curl_conn_adjust_pollset(data, ps); + Curl_conn_adjust_pollset(data, data->conn, ps); break; case MSTATE_DO: case MSTATE_DOING: Curl_pollset_add_socks(data, ps, doing_getsock); - Curl_conn_adjust_pollset(data, ps); + Curl_conn_adjust_pollset(data, data->conn, ps); break; case MSTATE_DOING_MORE: Curl_pollset_add_socks(data, ps, domore_getsock); - Curl_conn_adjust_pollset(data, ps); + Curl_conn_adjust_pollset(data, data->conn, ps); break; case MSTATE_DID: /* same as PERFORMING in regard to polling */ case MSTATE_PERFORMING: Curl_pollset_add_socks(data, ps, perform_getsock); - Curl_conn_adjust_pollset(data, ps); + Curl_conn_adjust_pollset(data, data->conn, ps); break; case MSTATE_RATELIMITING: @@ -1143,6 +945,35 @@ static void multi_getsock(struct Curl_easy *data, break; } + switch(ps->num) { + case 0: + CURL_TRC_M(data, "%s pollset[], timeouts=%zu, paused %d/%d (r/w)", + caller, Curl_llist_count(&data->state.timeoutlist), + Curl_creader_is_paused(data), Curl_cwriter_is_paused(data)); + break; + case 1: + CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu", + caller, ps->sockets[0], + (ps->actions[0] & CURL_POLL_IN) ? "IN" : "", + (ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "", + Curl_llist_count(&data->state.timeoutlist)); + break; + case 2: + CURL_TRC_M(data, "%s pollset[fd=%" FMT_SOCKET_T " %s%s, " + "fd=%" FMT_SOCKET_T " %s%s], timeouts=%zu", + caller, ps->sockets[0], + (ps->actions[0] & CURL_POLL_IN) ? "IN" : "", + (ps->actions[0] & CURL_POLL_OUT) ? "OUT" : "", + ps->sockets[1], + (ps->actions[1] & CURL_POLL_IN) ? "IN" : "", + (ps->actions[1] & CURL_POLL_OUT) ? "OUT" : "", + Curl_llist_count(&data->state.timeoutlist)); + break; + default: + CURL_TRC_M(data, "%s pollset[fds=%u], timeouts=%zu", + caller, ps->num, Curl_llist_count(&data->state.timeoutlist)); + break; + } if(expect_sockets && !ps->num && !Curl_llist_count(&data->state.timeoutlist) && !Curl_cwriter_is_paused(data) && !Curl_creader_is_paused(data) && @@ -1177,26 +1008,27 @@ CURLMcode curl_multi_fdset(CURLM *m, for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); + struct easy_pollset ps; - multi_getsock(data, &data->last_poll); + Curl_multi_getsock(data, &ps, "curl_multi_fdset"); - for(i = 0; i < data->last_poll.num; i++) { - if(!FDSET_SOCK(data->last_poll.sockets[i])) + for(i = 0; i < ps.num; i++) { + if(!FDSET_SOCK(ps.sockets[i])) /* pretend it does not exist */ continue; #if defined(__DJGPP__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Warith-conversion" #endif - if(data->last_poll.actions[i] & CURL_POLL_IN) - FD_SET(data->last_poll.sockets[i], read_fd_set); - if(data->last_poll.actions[i] & CURL_POLL_OUT) - FD_SET(data->last_poll.sockets[i], write_fd_set); + if(ps.actions[i] & CURL_POLL_IN) + FD_SET(ps.sockets[i], read_fd_set); + if(ps.actions[i] & CURL_POLL_OUT) + FD_SET(ps.sockets[i], write_fd_set); #if defined(__DJGPP__) #pragma GCC diagnostic pop #endif - if((int)data->last_poll.sockets[i] > this_max_fd) - this_max_fd = (int)data->last_poll.sockets[i]; + if((int)ps.sockets[i] > this_max_fd) + this_max_fd = (int)ps.sockets[i]; } } @@ -1230,8 +1062,10 @@ CURLMcode curl_multi_waitfds(CURLM *m, Curl_waitfds_init(&cwfds, ufds, size); for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); - multi_getsock(data, &data->last_poll); - need += Curl_waitfds_add_ps(&cwfds, &data->last_poll); + struct easy_pollset ps; + + Curl_multi_getsock(data, &ps, "curl_multi_waitfds"); + need += Curl_waitfds_add_ps(&cwfds, &ps); } need += Curl_cpool_add_waitfds(&multi->cpool, &cwfds); @@ -1303,9 +1137,10 @@ static CURLMcode multi_wait(struct Curl_multi *multi, /* Add the curl handles to our pollfds first */ for(e = Curl_llist_head(&multi->process); e; e = Curl_node_next(e)) { struct Curl_easy *data = Curl_node_elem(e); + struct easy_pollset ps; - multi_getsock(data, &data->last_poll); - if(Curl_pollfds_add_ps(&cpfds, &data->last_poll)) { + Curl_multi_getsock(data, &ps, "multi_wait"); + if(Curl_pollfds_add_ps(&cpfds, &ps)) { result = CURLM_OUT_OF_MEMORY; goto out; } @@ -1440,23 +1275,14 @@ static CURLMcode multi_wait(struct Curl_multi *multi, #ifdef USE_WINSOCK /* Count up all our own sockets that had activity, and remove them from the event. */ - if(curl_nfds) { - for(e = Curl_llist_head(&multi->process); e && !result; - e = Curl_node_next(e)) { - struct Curl_easy *data = Curl_node_elem(e); - - for(i = 0; i < data->last_poll.num; i++) { - wsa_events.lNetworkEvents = 0; - if(WSAEnumNetworkEvents(data->last_poll.sockets[i], NULL, - &wsa_events) == 0) { - if(ret && !pollrc && wsa_events.lNetworkEvents) - retcode++; - } - WSAEventSelect(data->last_poll.sockets[i], multi->wsa_event, 0); - } + for(i = 0; i < curl_nfds; ++i) { + wsa_events.lNetworkEvents = 0; + if(WSAEnumNetworkEvents(cpfds.pfds[i].fd, NULL, &wsa_events) == 0) { + if(ret && !pollrc && wsa_events.lNetworkEvents) + retcode++; } + WSAEventSelect(cpfds.pfds[i].fd, multi->wsa_event, 0); } - WSAResetEvent(multi->wsa_event); #else #ifdef ENABLE_WAKEUP @@ -2246,7 +2072,7 @@ static CURLMcode state_resolving(struct Curl_multi *multi, socket(s) will again be used further down. If the name has not yet been resolved, it is likely that new sockets have been opened in an attempt to contact another resolver. */ - rc = singlesocket(multi, data); + rc = Curl_multi_ev_assess_xfer(multi, data); if(rc) return rc; @@ -2365,7 +2191,7 @@ static CURLMcode multi_runsingle(struct Curl_multi *multi, rc = CURLM_OK; if(multi_ischanged(multi, TRUE)) { - DEBUGF(infof(data, "multi changed, check CONNECT_PEND queue")); + CURL_TRC_M(data, "multi changed, check CONNECT_PEND queue"); process_pending_handles(multi); /* multiplexed */ } @@ -2731,6 +2557,7 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles) struct Curl_llist_node *e; struct Curl_llist_node *n = NULL; struct Curl_multi *multi = m; + bool first = TRUE; SIGPIPE_VARIABLE(pipe_st); if(!GOOD_MULTI_HANDLE(multi)) @@ -2745,12 +2572,16 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles) CURLMcode result; /* Do the loop and only alter the signal ignore state if the next handle has a different NO_SIGNAL state than the previous */ + if(first) { + CURL_TRC_M(data, "multi_perform(running=%u)", multi->num_alive); + first = FALSE; + } /* the current node might be unlinked in multi_runsingle(), get the next pointer now */ n = Curl_node_next(e); - if(data != multi->cpool.idata) { + if(data && data != multi->cpool.idata) { /* connection pool handle is processed below */ sigpipe_apply(data, &pipe_st); result = multi_runsingle(multi, &now, data); @@ -2760,7 +2591,7 @@ CURLMcode curl_multi_perform(CURLM *m, int *running_handles) } sigpipe_apply(multi->cpool.idata, &pipe_st); - Curl_cpool_multi_perform(multi); + Curl_cpool_multi_perform(multi, CURL_SOCKET_TIMEOUT); sigpipe_restore(&pipe_st); if(multi_ischanged(m, TRUE)) @@ -2862,7 +2693,7 @@ CURLMcode curl_multi_cleanup(CURLM *m) multi->magic = 0; /* not good anymore */ - sockhash_destroy(&multi->sockhash); + Curl_multi_ev_cleanup(multi); Curl_hash_destroy(&multi->proto_hash); Curl_hash_destroy(&multi->hostcache); Curl_psl_destroy(&multi->psl); @@ -2925,233 +2756,14 @@ CURLMsg *curl_multi_info_read(CURLM *m, int *msgs_in_queue) return NULL; } -/* - * singlesocket() checks what sockets we deal with and their "action state" - * and if we have a different state in any of those sockets from last time we - * call the callback accordingly. - */ -static CURLMcode singlesocket(struct Curl_multi *multi, - struct Curl_easy *data) -{ - struct easy_pollset cur_poll; - CURLMcode mresult; - /* Fill in the 'current' struct with the state as it is now: what sockets to - supervise and for what actions */ - multi_getsock(data, &cur_poll); - mresult = Curl_multi_pollset_ev(multi, data, &cur_poll, &data->last_poll); - - if(!mresult) /* Remember for next time */ - memcpy(&data->last_poll, &cur_poll, sizeof(cur_poll)); - return mresult; -} - -CURLMcode Curl_multi_pollset_ev(struct Curl_multi *multi, - struct Curl_easy *data, - struct easy_pollset *ps, - struct easy_pollset *last_ps) -{ - unsigned int i; - struct Curl_sh_entry *entry; - curl_socket_t s; - int rc; - - /* We have 0 .. N sockets already and we get to know about the 0 .. M - sockets we should have from now on. Detect the differences, remove no - longer supervised ones and add new ones */ - - /* walk over the sockets we got right now */ - for(i = 0; i < ps->num; i++) { - unsigned char cur_action = ps->actions[i]; - unsigned char last_action = 0; - int comboaction; - - s = ps->sockets[i]; - - /* get it from the hash */ - entry = sh_getentry(&multi->sockhash, s); - if(entry) { - /* check if new for this transfer */ - unsigned int j; - for(j = 0; j < last_ps->num; j++) { - if(s == last_ps->sockets[j]) { - last_action = last_ps->actions[j]; - break; - } - } - } - else { - /* this is a socket we did not have before, add it to the hash! */ - entry = sh_addentry(&multi->sockhash, s); - if(!entry) - /* fatal */ - return CURLM_OUT_OF_MEMORY; - } - if(last_action && (last_action != cur_action)) { - /* Socket was used already, but different action now */ - if(last_action & CURL_POLL_IN) { - DEBUGASSERT(entry->readers); - entry->readers--; - } - if(last_action & CURL_POLL_OUT) { - DEBUGASSERT(entry->writers); - entry->writers--; - } - if(cur_action & CURL_POLL_IN) { - entry->readers++; - } - if(cur_action & CURL_POLL_OUT) - entry->writers++; - } - else if(!last_action && - !Curl_hash_pick(&entry->transfers, (char *)&data, /* hash key */ - sizeof(struct Curl_easy *))) { - DEBUGASSERT(entry->users < 100000); /* detect weird values */ - /* a new transfer using this socket */ - entry->users++; - if(cur_action & CURL_POLL_IN) - entry->readers++; - if(cur_action & CURL_POLL_OUT) - entry->writers++; - /* add 'data' to the transfer hash on this socket! */ - if(!Curl_hash_add(&entry->transfers, (char *)&data, /* hash key */ - sizeof(struct Curl_easy *), data)) { - Curl_hash_destroy(&entry->transfers); - return CURLM_OUT_OF_MEMORY; - } - } - - comboaction = (entry->writers ? CURL_POLL_OUT : 0) | - (entry->readers ? CURL_POLL_IN : 0); - - /* socket existed before and has the same action set as before */ - if(last_action && ((int)entry->action == comboaction)) - /* same, continue */ - continue; - - if(multi->socket_cb) { - set_in_callback(multi, TRUE); - rc = multi->socket_cb(data, s, comboaction, multi->socket_userp, - entry->socketp); - - set_in_callback(multi, FALSE); - if(rc == -1) { - multi->dead = TRUE; - return CURLM_ABORTED_BY_CALLBACK; - } - } - - /* store the current action state */ - entry->action = (unsigned int)comboaction; - } - - /* Check for last_poll.sockets that no longer appear in ps->sockets. - * Need to remove the easy handle from the multi->sockhash->transfers and - * remove multi->sockhash entry when this was the last transfer */ - for(i = 0; i < last_ps->num; i++) { - unsigned int j; - bool stillused = FALSE; - s = last_ps->sockets[i]; - for(j = 0; j < ps->num; j++) { - if(s == ps->sockets[j]) { - /* this is still supervised */ - stillused = TRUE; - break; - } - } - if(stillused) - continue; - - entry = sh_getentry(&multi->sockhash, s); - /* if this is NULL here, the socket has been closed and notified so - already by Curl_multi_closed() */ - if(entry) { - unsigned char oldactions = last_ps->actions[i]; - /* this socket has been removed. Decrease user count */ - DEBUGASSERT(entry->users); - entry->users--; - if(oldactions & CURL_POLL_OUT) - entry->writers--; - if(oldactions & CURL_POLL_IN) - entry->readers--; - if(!entry->users) { - bool dead = FALSE; - if(multi->socket_cb) { - set_in_callback(multi, TRUE); - rc = multi->socket_cb(data, s, CURL_POLL_REMOVE, - multi->socket_userp, entry->socketp); - set_in_callback(multi, FALSE); - if(rc == -1) - dead = TRUE; - } - sh_delentry(entry, &multi->sockhash, s); - if(dead) { - multi->dead = TRUE; - return CURLM_ABORTED_BY_CALLBACK; - } - } - else { - /* still users, but remove this handle as a user of this socket */ - if(Curl_hash_delete(&entry->transfers, (char *)&data, - sizeof(struct Curl_easy *))) { - DEBUGASSERT(NULL); - } - } - } - } /* for loop over num */ - - return CURLM_OK; -} - -CURLcode Curl_updatesocket(struct Curl_easy *data) -{ - if(singlesocket(data->multi, data)) - return CURLE_ABORTED_BY_CALLBACK; - return CURLE_OK; -} - - -/* - * Curl_multi_closed() - * - * Used by the connect code to tell the multi_socket code that one of the - * sockets we were using is about to be closed. This function will then - * remove it from the sockethash for this handle to make the multi_socket API - * behave properly, especially for the case when libcurl will create another - * socket again and it gets the same file descriptor number. - */ - -void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s) +void Curl_multi_will_close(struct Curl_easy *data, curl_socket_t s) { if(data) { - /* if there is still an easy handle associated with this connection */ struct Curl_multi *multi = data->multi; - DEBUGF(infof(data, "Curl_multi_closed, fd=%" FMT_SOCKET_T - " multi is %p", s, (void *)multi)); if(multi) { - /* this is set if this connection is part of a handle that is added to - a multi handle, and only then this is necessary */ - struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s); - - DEBUGF(infof(data, "Curl_multi_closed, fd=%" FMT_SOCKET_T - " entry is %p", s, (void *)entry)); - if(entry) { - int rc = 0; - if(multi->socket_cb) { - set_in_callback(multi, TRUE); - rc = multi->socket_cb(data, s, CURL_POLL_REMOVE, - multi->socket_userp, entry->socketp); - set_in_callback(multi, FALSE); - } - - /* now remove it from the socket hash */ - sh_delentry(entry, &multi->sockhash, s); - if(rc == -1) - /* This just marks the multi handle as "dead" without returning an - error code primarily because this function is used from many - places where propagating an error back is tricky. */ - multi->dead = TRUE; - } + CURL_TRC_M(data, "Curl_multi_will_close fd=%" FMT_SOCKET_T, s); + Curl_multi_ev_socket_done(multi, data, s); } } } @@ -3253,9 +2865,8 @@ static CURLMcode multi_run_expired(struct multi_run_ctx *mrc) result = multi_runsingle(multi, &mrc->now, data); if(CURLM_OK >= result) { - /* get the socket(s) and check if the state has been changed since - last */ - result = singlesocket(multi, data); + /* reassess event handling of data */ + result = Curl_multi_ev_assess_xfer(multi, data); if(result) goto out; } @@ -3271,7 +2882,6 @@ static CURLMcode multi_socket(struct Curl_multi *multi, int *running_handles) { CURLMcode result = CURLM_OK; - struct Curl_easy *data = NULL; struct multi_run_ctx mrc; (void)ev_bitmask; @@ -3281,56 +2891,19 @@ static CURLMcode multi_socket(struct Curl_multi *multi, sigpipe_init(&mrc.pipe_st); if(checkall) { - struct Curl_llist_node *e; /* *perform() deals with running_handles on its own */ result = curl_multi_perform(multi, running_handles); - /* walk through each easy handle and do the socket state change magic - and callbacks */ if(result != CURLM_BAD_HANDLE) { - for(e = Curl_llist_head(&multi->process); e && !result; - e = Curl_node_next(e)) { - result = singlesocket(multi, Curl_node_elem(e)); - } + /* Reassess event status of all active transfers */ + result = Curl_multi_ev_assess_xfer_list(multi, &multi->process); } mrc.run_cpool = TRUE; goto out; } if(s != CURL_SOCKET_TIMEOUT) { - struct Curl_sh_entry *entry = sh_getentry(&multi->sockhash, s); - - if(!entry) { - /* Unmatched socket, we cannot act on it but we ignore this fact. In - real-world tests it has been proved that libevent can in fact give - the application actions even though the socket was just previously - asked to get removed, so thus we better survive stray socket actions - and just move on. */ - /* The socket might come from a connection that is being shut down - * by the multi's connection pool. */ - Curl_cpool_multi_socket(multi, s, ev_bitmask); - } - else { - struct Curl_hash_iterator iter; - struct Curl_hash_element *he; - - /* the socket can be shared by many transfers, iterate */ - Curl_hash_start_iterate(&entry->transfers, &iter); - for(he = Curl_hash_next_element(&iter); he; - he = Curl_hash_next_element(&iter)) { - data = (struct Curl_easy *)he->ptr; - DEBUGASSERT(data); - DEBUGASSERT(data->magic == CURLEASY_MAGIC_NUMBER); - - if(data == multi->cpool.idata) - mrc.run_cpool = TRUE; - else { - /* Expire with out current now, so we will get it below when - * asking the splaytree for expired transfers. */ - expire_ex(data, &mrc.now, 0, EXPIRE_RUN_NOW); - } - } - } + Curl_multi_ev_expire_xfers(multi, s, &mrc.now, &mrc.run_cpool); } else { /* Asked to run due to time-out. Clear the 'last_expire_ts' variable to @@ -3339,6 +2912,7 @@ static CURLMcode multi_socket(struct Curl_multi *multi, handles the case when the application asks libcurl to run the timeout prematurely. */ memset(&multi->last_expire_ts, 0, sizeof(multi->last_expire_ts)); + mrc.run_cpool = TRUE; } result = multi_run_expired(&mrc); @@ -3358,7 +2932,7 @@ static CURLMcode multi_socket(struct Curl_multi *multi, out: if(mrc.run_cpool) { sigpipe_apply(multi->cpool.idata, &mrc.pipe_st); - Curl_cpool_multi_perform(multi); + Curl_cpool_multi_perform(multi, s); } sigpipe_restore(&mrc.pipe_st); @@ -3675,9 +3249,9 @@ multi_addtimeout(struct Curl_easy *data, return CURLM_OK; } -static void expire_ex(struct Curl_easy *data, - const struct curltime *nowp, - timediff_t milli, expire_id id) +void Curl_expire_ex(struct Curl_easy *data, + const struct curltime *nowp, + timediff_t milli, expire_id id) { struct Curl_multi *multi = data->multi; struct curltime *curr_expire = &data->state.expiretime; @@ -3749,7 +3323,7 @@ static void expire_ex(struct Curl_easy *data, void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id id) { struct curltime now = Curl_now(); - expire_ex(data, &now, milli, id); + Curl_expire_ex(data, &now, milli, id); } /* @@ -3793,9 +3367,7 @@ bool Curl_expire_clear(struct Curl_easy *data) /* clear the timeout list too */ Curl_llist_destroy(list, NULL); -#ifdef DEBUGBUILD - infof(data, "Expire cleared"); -#endif + CURL_TRC_M(data, "Expire cleared"); nowp->tv_sec = 0; nowp->tv_usec = 0; return TRUE; @@ -3806,19 +3378,11 @@ bool Curl_expire_clear(struct Curl_easy *data) CURLMcode curl_multi_assign(CURLM *m, curl_socket_t s, void *hashp) { - struct Curl_sh_entry *there = NULL; struct Curl_multi *multi = m; if(!GOOD_MULTI_HANDLE(multi)) return CURLM_BAD_HANDLE; - there = sh_getentry(&multi->sockhash, s); - - if(!there) - return CURLM_BAD_SOCKET; - - there->socketp = hashp; - - return CURLM_OK; + return Curl_multi_ev_assign(multi, s, hashp); } static void move_pending_to_connect(struct Curl_multi *multi, diff --git a/lib/multi_ev.c b/lib/multi_ev.c new file mode 100644 index 0000000000..64a810e1b1 --- /dev/null +++ b/lib/multi_ev.c @@ -0,0 +1,610 @@ +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ + +#include "curl_setup.h" + +#include + +#include "urldata.h" +#include "cfilters.h" +#include "curl_trc.h" +#include "multiif.h" +#include "timeval.h" +#include "multi_ev.h" +#include "select.h" +#include "warnless.h" +#include "multihandle.h" +#include "socks.h" +/* The last 3 #include files should be in this order */ +#include "curl_printf.h" +#include "curl_memory.h" +#include "memdebug.h" + + +static void mev_in_callback(struct Curl_multi *multi, bool value) +{ + multi->in_callback = value; +} + +#define CURL_MEV_XFER_HASH_SIZE 13 +#define CURL_MEV_CONN_HASH_SIZE 3 + +/* Information about a socket for which we inform the libcurl application + * what to supervise (CURL_POLL_IN/CURL_POLL_OUT/CURL_POLL_REMOVE) + */ +struct mev_sh_entry { + struct Curl_hash xfers; /* hash of transfers using this socket */ + struct Curl_hash conns; /* hash of connections using this socket */ + void *user_data; /* libcurl app data via curl_multi_assign() */ + unsigned int action; /* CURL_POLL_IN/CURL_POLL_OUT we last told the + * libcurl application to watch out for */ + unsigned int readers; /* this many transfers want to read */ + unsigned int writers; /* this many transfers want to write */ +}; + +static size_t mev_sh_entry_hash(void *key, size_t key_length, size_t slots_num) +{ + curl_socket_t fd = *((curl_socket_t *) key); + (void) key_length; + return (fd % (curl_socket_t)slots_num); +} + +static size_t mev_sh_entry_compare(void *k1, size_t k1_len, + void *k2, size_t k2_len) +{ + (void) k1_len; (void) k2_len; + return (*((curl_socket_t *) k1)) == (*((curl_socket_t *) k2)); +} + +/* sockhash entry destructor callback */ +static void mev_sh_entry_dtor(void *freethis) +{ + struct mev_sh_entry *entry = (struct mev_sh_entry *)freethis; + Curl_hash_destroy(&entry->xfers); + Curl_hash_destroy(&entry->conns); + free(entry); +} + +/* look up a given socket in the socket hash, skip invalid sockets */ +static struct mev_sh_entry * +mev_sh_entry_get(struct Curl_hash *sh, curl_socket_t s) +{ + if(s != CURL_SOCKET_BAD) { + /* only look for proper sockets */ + return Curl_hash_pick(sh, (char *)&s, sizeof(curl_socket_t)); + } + return NULL; +} + +static void mev_nop_dtor(void *e) +{ + (void)e; /* does nothing */ +} + +/* make sure this socket is present in the hash for this handle */ +static struct mev_sh_entry * +mev_sh_entry_add(struct Curl_hash *sh, curl_socket_t s) +{ + struct mev_sh_entry *there = mev_sh_entry_get(sh, s); + struct mev_sh_entry *check; + + if(there) { + /* it is present, return fine */ + return there; + } + + /* not present, add it */ + check = calloc(1, sizeof(struct mev_sh_entry)); + if(!check) + return NULL; /* major failure */ + + Curl_hash_offt_init(&check->xfers, CURL_MEV_XFER_HASH_SIZE, mev_nop_dtor); + Curl_hash_offt_init(&check->conns, CURL_MEV_CONN_HASH_SIZE, mev_nop_dtor); + + /* make/add new hash entry */ + if(!Curl_hash_add(sh, (char *)&s, sizeof(curl_socket_t), check)) { + mev_sh_entry_dtor(check); + return NULL; /* major failure */ + } + + return check; /* things are good in sockhash land */ +} + +/* delete the given socket entry from the hash */ +static void mev_sh_entry_kill(struct Curl_multi *multi, curl_socket_t s) +{ + Curl_hash_delete(&multi->ev.sh_entries, (char *)&s, sizeof(curl_socket_t)); +} + +static size_t mev_sh_entry_user_count(struct mev_sh_entry *e) +{ + return Curl_hash_count(&e->xfers) + Curl_hash_count(&e->conns); +} + +static bool mev_sh_entry_xfer_known(struct mev_sh_entry *e, + struct Curl_easy *data) +{ + return !!Curl_hash_offt_get(&e->xfers, data->id); +} + +static bool mev_sh_entry_conn_known(struct mev_sh_entry *e, + struct connectdata *conn) +{ + return !!Curl_hash_offt_get(&e->conns, conn->connection_id); +} + +static bool mev_sh_entry_xfer_add(struct mev_sh_entry *e, + struct Curl_easy *data) +{ + /* detect weird values */ + DEBUGASSERT(mev_sh_entry_user_count(e) < 100000); + return !!Curl_hash_offt_set(&e->xfers, data->id, data); +} + +static bool mev_sh_entry_conn_add(struct mev_sh_entry *e, + struct connectdata *conn) +{ + /* detect weird values */ + DEBUGASSERT(mev_sh_entry_user_count(e) < 100000); + return !!Curl_hash_offt_set(&e->conns, conn->connection_id, conn); +} + + +static bool mev_sh_entry_xfer_remove(struct mev_sh_entry *e, + struct Curl_easy *data) +{ + return !Curl_hash_offt_remove(&e->xfers, data->id); +} + +/* Purge any information about socket `s`. + * Let the socket callback know as well when necessary */ +static CURLMcode mev_forget_socket(struct Curl_multi *multi, + struct Curl_easy *data, + curl_socket_t s, + const char *cause) +{ + struct mev_sh_entry *entry = mev_sh_entry_get(&multi->ev.sh_entries, s); + int rc = 0; + + if(!entry) /* we never knew or already forgot about this socket */ + return CURLM_OK; + + /* We managed this socket before, tell the socket callback to forget it. */ + if(multi->socket_cb) { + CURL_TRC_M(data, "ev %s, call(fd=%" FMT_SOCKET_T ", ev=REMOVE)", + cause, s); + mev_in_callback(multi, TRUE); + rc = multi->socket_cb(data, s, CURL_POLL_REMOVE, + multi->socket_userp, entry->user_data); + mev_in_callback(multi, FALSE); + } + + mev_sh_entry_kill(multi, s); + if(rc == -1) { + multi->dead = TRUE; + return CURLM_ABORTED_BY_CALLBACK; + } + return CURLM_OK; +} + +static CURLMcode mev_sh_entry_update(struct Curl_multi *multi, + struct Curl_easy *data, + struct mev_sh_entry *entry, + curl_socket_t s, + unsigned char last_action, + unsigned char cur_action) +{ + int rc, comboaction; + + /* we should only be called when the callback exists */ + DEBUGASSERT(multi->socket_cb); + if(!multi->socket_cb) + return CURLM_OK; + + /* Transfer `data` goes from `last_action` to `cur_action` on socket `s` + * with `multi->ev.sh_entries` entry `entry`. Update `entry` and trigger + * `multi->socket_cb` on change, if the callback is set. */ + if(last_action == cur_action) /* nothing from `data` changed */ + return CURLM_OK; + + if(last_action & CURL_POLL_IN) { + DEBUGASSERT(entry->readers); + if(!(cur_action & CURL_POLL_IN)) + entry->readers--; + } + else if(cur_action & CURL_POLL_IN) + entry->readers++; + + if(last_action & CURL_POLL_OUT) { + DEBUGASSERT(entry->writers); + if(!(cur_action & CURL_POLL_OUT)) + entry->writers--; + } + else if(cur_action & CURL_POLL_OUT) + entry->writers++; + + DEBUGASSERT(entry->readers <= mev_sh_entry_user_count(entry)); + DEBUGASSERT(entry->writers <= mev_sh_entry_user_count(entry)); + DEBUGASSERT(entry->writers + entry->readers); + + CURL_TRC_M(data, "ev update fd=%" FMT_SOCKET_T ", action '%s%s' -> '%s%s'" + " (%d/%d r/w)", s, + (last_action & CURL_POLL_IN) ? "IN" : "", + (last_action & CURL_POLL_OUT) ? "OUT" : "", + (cur_action & CURL_POLL_IN) ? "IN" : "", + (cur_action & CURL_POLL_OUT) ? "OUT" : "", + entry->readers, entry->writers); + + comboaction = (entry->writers ? CURL_POLL_OUT : 0) | + (entry->readers ? CURL_POLL_IN : 0); + if(((int)entry->action == comboaction)) /* nothing for socket changed */ + return CURLM_OK; + + CURL_TRC_M(data, "ev update call(fd=%" FMT_SOCKET_T ", ev=%s%s)", + s, (comboaction & CURL_POLL_IN) ? "IN" : "", + (comboaction & CURL_POLL_OUT) ? "OUT" : ""); + mev_in_callback(multi, TRUE); + rc = multi->socket_cb(data, s, comboaction, multi->socket_userp, + entry->user_data); + + mev_in_callback(multi, FALSE); + if(rc == -1) { + multi->dead = TRUE; + return CURLM_ABORTED_BY_CALLBACK; + } + entry->action = (unsigned int)comboaction; + return CURLM_OK; +} + +static CURLMcode mev_pollset_diff(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn, + struct easy_pollset *ps, + struct easy_pollset *prev_ps) +{ + struct mev_sh_entry *entry; + curl_socket_t s; + unsigned int i, j; + CURLMcode mresult; + + /* The transfer `data` reports in `ps` the sockets it is interested + * in and which combinatino of CURL_POLL_IN/CURL_POLL_OUT it wants + * to have monitored for events. + * There can be more than 1 transfer interested in the same socket + * and 1 transfer might be interested in more than 1 socket. + * `prev_ps` is the pollset copy from the previous call here. On + * the 1st call it will be empty. + */ + DEBUGASSERT(ps); + DEBUGASSERT(prev_ps); + + /* Handle changes to sockets the transfer is interested in. */ + for(i = 0; i < ps->num; i++) { + unsigned char last_action; + bool first_time = FALSE; /* data/conn appears first time on socket */ + + s = ps->sockets[i]; + /* Have we handled this socket before? */ + entry = mev_sh_entry_get(&multi->ev.sh_entries, s); + if(!entry) { + /* new socket, add new entry */ + first_time = TRUE; + entry = mev_sh_entry_add(&multi->ev.sh_entries, s); + if(!entry) /* fatal */ + return CURLM_OUT_OF_MEMORY; + CURL_TRC_M(data, "ev new entry fd=%" FMT_SOCKET_T, s); + } + else if(conn) { + first_time = !mev_sh_entry_conn_known(entry, data->conn); + } + else { + first_time = !mev_sh_entry_xfer_known(entry, data); + } + + /* What was the previous action the transfer had regarding this socket? + * If the transfer is new to the socket, disregard the information + * in `last_poll`, because the socket might have been destroyed and + * reopened. We'd have cleared the sh_entry for that, but the socket + * might still be mentioned in the hashed pollsets. */ + last_action = 0; + if(first_time) { + if(conn) { + if(!mev_sh_entry_conn_add(entry, data->conn)) + return CURLM_OUT_OF_MEMORY; + } + else { + if(!mev_sh_entry_xfer_add(entry, data)) + return CURLM_OUT_OF_MEMORY; + } + CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", added %s #%" FMT_OFF_T + ", total=%zu/%zu (xfer/conn)", s, + conn ? "connection" : "transfer", + conn ? conn->connection_id : data->id, + Curl_hash_count(&entry->xfers), + Curl_hash_count(&entry->conns)); + } + else { + for(j = 0; j < prev_ps->num; j++) { + if(s == prev_ps->sockets[j]) { + last_action = prev_ps->actions[j]; + break; + } + } + } + /* track readers/writers changes and report to socket callback */ + mresult = mev_sh_entry_update(multi, data, entry, s, + last_action, ps->actions[i]); + if(mresult) + return mresult; + } + + /* Handle changes to sockets the transfer is NO LONGER interested in. */ + for(i = 0; i < prev_ps->num; i++) { + bool stillused = FALSE; + + s = prev_ps->sockets[i]; + for(j = 0; j < ps->num; j++) { + if(s == ps->sockets[j]) { + /* socket is still supervised */ + stillused = TRUE; + break; + } + } + if(stillused) + continue; + + entry = mev_sh_entry_get(&multi->ev.sh_entries, s); + /* if entry does not exist, we were either never told about it or + * have already cleaned up this socket via Curl_multi_ev_socket_done(). + * In other words: this is perfectly normal */ + if(!entry) + continue; + + if(!mev_sh_entry_xfer_remove(entry, data)) { + /* `data` says in `prev_ps` that it had been using a socket, + * but `data` has not been registered for it. + * This should not happen if our book-keeping is correct? */ + CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", transfer lost " + "interest but is not registered", s); + DEBUGASSERT(NULL); + continue; + } + + if(mev_sh_entry_user_count(entry)) { + /* track readers/writers changes and report to socket callback */ + mresult = mev_sh_entry_update(multi, data, entry, s, + prev_ps->actions[i], 0); + if(mresult) + return mresult; + CURL_TRC_M(data, "ev entry fd=%" FMT_SOCKET_T ", removed transfer, " + "total=%zu/%zu (xfer/conn)", s, + Curl_hash_count(&entry->xfers), + Curl_hash_count(&entry->conns)); + } + else { + mresult = mev_forget_socket(multi, data, s, "last user gone"); + if(mresult) + return mresult; + } + } /* for loop over num */ + + /* Remember for next time */ + memcpy(prev_ps, ps, sizeof(*prev_ps)); + return CURLM_OK; +} + +static struct easy_pollset* +mev_add_new_pollset(struct Curl_hash *h, curl_off_t id) +{ + struct easy_pollset *ps; + + ps = calloc(1, sizeof(*ps)); + if(!ps) + return NULL; + if(!Curl_hash_offt_set(h, id, ps)) { + free(ps); + return NULL; + } + return ps; +} + +static struct easy_pollset * +mev_get_last_pollset(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn) +{ + if(data) { + if(conn) + return Curl_hash_offt_get(&multi->ev.conn_pollsets, + conn->connection_id); + else if(data) + return Curl_hash_offt_get(&multi->ev.xfer_pollsets, data->id); + } + return NULL; +} + +static void mev_init_cur_pollset(struct easy_pollset *ps, + struct Curl_easy *data, + struct connectdata *conn) +{ + memset(ps, 0, sizeof(*ps)); + if(conn) + Curl_conn_adjust_pollset(data, conn, ps); + else if(data) + Curl_multi_getsock(data, ps, "ev assess"); +} + +static CURLMcode mev_assess(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn) +{ + if(multi && multi->socket_cb) { + struct easy_pollset ps, *last_ps; + + mev_init_cur_pollset(&ps, data, conn); + last_ps = mev_get_last_pollset(multi, data, conn); + + if(!last_ps && ps.num) { + if(conn) + last_ps = mev_add_new_pollset(&multi->ev.conn_pollsets, + data->conn->connection_id); + else + last_ps = mev_add_new_pollset(&multi->ev.xfer_pollsets, data->id); + if(!last_ps) + return CURLM_OUT_OF_MEMORY; + } + + if(last_ps) + return mev_pollset_diff(multi, data, conn, &ps, last_ps); + else + DEBUGASSERT(!ps.num); + } + return CURLM_OK; +} + +CURLMcode Curl_multi_ev_assess_xfer(struct Curl_multi *multi, + struct Curl_easy *data) +{ + return mev_assess(multi, data, NULL); +} + +CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn) +{ + return mev_assess(multi, data, conn); +} + +CURLMcode Curl_multi_ev_assess_xfer_list(struct Curl_multi *multi, + struct Curl_llist *list) +{ + struct Curl_llist_node *e; + CURLMcode result = CURLM_OK; + + if(multi && multi->socket_cb) { + for(e = Curl_llist_head(list); e && !result; e = Curl_node_next(e)) { + result = Curl_multi_ev_assess_xfer(multi, Curl_node_elem(e)); + } + } + return result; +} + + +CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi, + curl_socket_t s, + void *user_data) +{ + struct mev_sh_entry *e = mev_sh_entry_get(&multi->ev.sh_entries, s); + if(!e) + return CURLM_BAD_SOCKET; + e->user_data = user_data; + return CURLM_OK; +} + + +void Curl_multi_ev_expire_xfers(struct Curl_multi *multi, + curl_socket_t s, + const struct curltime *nowp, + bool *run_cpool) +{ + struct mev_sh_entry *entry; + + DEBUGASSERT(s != CURL_SOCKET_TIMEOUT); + entry = mev_sh_entry_get(&multi->ev.sh_entries, s); + + /* Unmatched socket, we cannot act on it but we ignore this fact. In + real-world tests it has been proved that libevent can in fact give + the application actions even though the socket was just previously + asked to get removed, so thus we better survive stray socket actions + and just move on. */ + if(entry) { + struct Curl_hash_iterator iter; + struct Curl_hash_element *he; + + /* the socket can be shared by many transfers, iterate */ + Curl_hash_start_iterate(&entry->xfers, &iter); + for(he = Curl_hash_next_element(&iter); he; + he = Curl_hash_next_element(&iter)) { + struct Curl_easy *data = (struct Curl_easy *)he->ptr; + DEBUGASSERT(data); + DEBUGASSERT(data->magic == CURLEASY_MAGIC_NUMBER); + DEBUGASSERT(data->id >= 0); /* we should not track internal handles */ + + /* Expire with out current now, so we will get it below when + * asking the splaytree for expired transfers. */ + Curl_expire_ex(data, nowp, 0, EXPIRE_RUN_NOW); + } + + if(Curl_hash_count(&entry->conns)) + *run_cpool = TRUE; + } +} + +void Curl_multi_ev_socket_done(struct Curl_multi *multi, + struct Curl_easy *data, curl_socket_t s) +{ + mev_forget_socket(multi, data, s, "socket done"); +} + +void Curl_multi_ev_xfer_done(struct Curl_multi *multi, + struct Curl_easy *data) +{ + DEBUGASSERT(!data->conn); /* transfer should have been detached */ + if(data->id >= 0) { + (void)mev_assess(multi, data, NULL); + Curl_hash_offt_remove(&multi->ev.xfer_pollsets, data->id); + } +} + +void Curl_multi_ev_conn_done(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn) +{ + (void)mev_assess(multi, data, conn); + Curl_hash_offt_remove(&multi->ev.conn_pollsets, conn->connection_id); +} + +#define CURL_MEV_PS_HASH_SLOTS (991) /* nice prime */ + +static void mev_hash_pollset_free(void *entry) +{ + free(entry); +} + +void Curl_multi_ev_init(struct Curl_multi *multi, size_t hashsize) +{ + Curl_hash_init(&multi->ev.sh_entries, hashsize, mev_sh_entry_hash, + mev_sh_entry_compare, mev_sh_entry_dtor); + Curl_hash_offt_init(&multi->ev.xfer_pollsets, + CURL_MEV_PS_HASH_SLOTS, mev_hash_pollset_free); + Curl_hash_offt_init(&multi->ev.conn_pollsets, + CURL_MEV_PS_HASH_SLOTS, mev_hash_pollset_free); +} + +void Curl_multi_ev_cleanup(struct Curl_multi *multi) +{ + Curl_hash_destroy(&multi->ev.sh_entries); + Curl_hash_destroy(&multi->ev.xfer_pollsets); + Curl_hash_destroy(&multi->ev.conn_pollsets); +} diff --git a/lib/multi_ev.h b/lib/multi_ev.h new file mode 100644 index 0000000000..f0745db29e --- /dev/null +++ b/lib/multi_ev.h @@ -0,0 +1,79 @@ +#ifndef HEADER_CURL_MULTI_EV_H +#define HEADER_CURL_MULTI_EV_H +/*************************************************************************** + * _ _ ____ _ + * Project ___| | | | _ \| | + * / __| | | | |_) | | + * | (__| |_| | _ <| |___ + * \___|\___/|_| \_\_____| + * + * Copyright (C) Daniel Stenberg, , et al. + * + * This software is licensed as described in the file COPYING, which + * you should have received as part of this distribution. The terms + * are also available at https://curl.se/docs/copyright.html. + * + * You may opt to use, copy, modify, merge, publish, distribute and/or sell + * copies of the Software, and permit persons to whom the Software is + * furnished to do so, under the terms of the COPYING file. + * + * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY + * KIND, either express or implied. + * + * SPDX-License-Identifier: curl + * + ***************************************************************************/ + +struct Curl_easy; +struct Curl_multi; +struct easy_pollset; + +struct curl_multi_ev { + struct Curl_hash sh_entries; + struct Curl_hash xfer_pollsets; + struct Curl_hash conn_pollsets; +}; + +/* Setup/teardown of multi event book-keeping. */ +void Curl_multi_ev_init(struct Curl_multi *multi, size_t hashsize); +void Curl_multi_ev_cleanup(struct Curl_multi *multi); + +/* Assign a 'user_data' to be passed to the socket callback when + * invoked with the given socket. This will fail if this socket + * is not active, e.g. the application has not been told to monitor it. */ +CURLMcode Curl_multi_ev_assign(struct Curl_multi *multi, curl_socket_t s, + void *user_data); + +/* Assess the transfer by getting its current pollset, compute + * any changes to the last one and inform the application's socket + * callback if things have changed. */ +CURLMcode Curl_multi_ev_assess_xfer(struct Curl_multi *multi, + struct Curl_easy *data); +/* Assess all easy handles on the list */ +CURLMcode Curl_multi_ev_assess_xfer_list(struct Curl_multi *multi, + struct Curl_llist *list); +/* Assess the connection by getting its current pollset */ +CURLMcode Curl_multi_ev_assess_conn(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn); + +/* Expire all transfers tied to the given socket */ +void Curl_multi_ev_expire_xfers(struct Curl_multi *multi, + curl_socket_t s, + const struct curltime *nowp, + bool *run_cpool); + +/* Socket will be closed, forget anything we know about it. */ +void Curl_multi_ev_socket_done(struct Curl_multi *multi, + struct Curl_easy *data, curl_socket_t s); + +/* Transfer is removed from the multi */ +void Curl_multi_ev_xfer_done(struct Curl_multi *multi, + struct Curl_easy *data); + +/* Connection is being destroyed */ +void Curl_multi_ev_conn_done(struct Curl_multi *multi, + struct Curl_easy *data, + struct connectdata *conn); + +#endif /* HEADER_CURL_MULTI_EV_H */ diff --git a/lib/multihandle.h b/lib/multihandle.h index c0a3d09608..b6efd2fecd 100644 --- a/lib/multihandle.h +++ b/lib/multihandle.h @@ -27,6 +27,7 @@ #include "llist.h" #include "hash.h" #include "conncache.h" +#include "multi_ev.h" #include "psl.h" #include "socketpair.h" @@ -38,9 +39,9 @@ struct Curl_message { struct CURLMsg extmsg; }; -/* NOTE: if you add a state here, add the name to the statename[] array as - well! -*/ +/* NOTE: if you add a state here, add the name to the statenames[] array + * in curl_trc.c as well! + */ typedef enum { MSTATE_INIT, /* 0 - start in this state */ MSTATE_PENDING, /* 1 - no connections, waiting for one */ @@ -128,10 +129,9 @@ struct Curl_multi { char *xfer_sockbuf; /* the actual buffer */ size_t xfer_sockbuf_len; /* the allocated length */ - /* 'sockhash' is the lookup hash for socket descriptor => easy handles (note - the pluralis form, there can be more than one easy handle waiting on the - same actual socket) */ - struct Curl_hash sockhash; + /* multi event related things */ + struct curl_multi_ev ev; + /* `proto_hash` is a general key-value store for protocol implementations * with the lifetime of the multi handle. The number of elements kept here * should be in the order of supported protocols (and sub-protocols like diff --git a/lib/multiif.h b/lib/multiif.h index 89ede92c03..940d962c6c 100644 --- a/lib/multiif.h +++ b/lib/multiif.h @@ -28,8 +28,10 @@ * Prototypes for library-wide functions provided by multi.c */ -CURLcode Curl_updatesocket(struct Curl_easy *data); void Curl_expire(struct Curl_easy *data, timediff_t milli, expire_id); +void Curl_expire_ex(struct Curl_easy *data, + const struct curltime *nowp, + timediff_t milli, expire_id id); bool Curl_expire_clear(struct Curl_easy *data); void Curl_expire_done(struct Curl_easy *data, expire_id id); CURLMcode Curl_update_timer(struct Curl_multi *multi) WARN_UNUSED_RESULT; @@ -64,26 +66,13 @@ struct Curl_multi *Curl_multi_handle(size_t hashsize, /* mask for checking if read and/or write is set for index x */ #define GETSOCK_MASK_RW(x) (GETSOCK_READSOCK(x)|GETSOCK_WRITESOCK(x)) -/* - * Curl_multi_closed() - * - * Used by the connect code to tell the multi_socket code that one of the - * sockets we were using is about to be closed. This function will then - * remove it from the sockethash for this handle to make the multi_socket API - * behave properly, especially for the case when libcurl will create another - * socket again and it gets the same file descriptor number. +/** + * Let the multi handle know that the socket is about to be closed. + * The multi will then remove anything it knows about the socket, so + * when the OS is using this socket (number) again subsequently, + * the internal book keeping will not get confused. */ - -void Curl_multi_closed(struct Curl_easy *data, curl_socket_t s); - -/* Compare the two pollsets to notify the multi_socket API of changes - * in socket polling, e.g calling multi->socket_cb() with the changes if - * differences are seen. - */ -CURLMcode Curl_multi_pollset_ev(struct Curl_multi *multi, - struct Curl_easy *data, - struct easy_pollset *ps, - struct easy_pollset *last_ps); +void Curl_multi_will_close(struct Curl_easy *data, curl_socket_t s); /* * Add a handle and move it into PERFORM state at once. For pushed streams. @@ -96,6 +85,10 @@ CURLMcode Curl_multi_add_perform(struct Curl_multi *multi, /* Return the value of the CURLMOPT_MAX_CONCURRENT_STREAMS option */ unsigned int Curl_multi_max_concurrent_streams(struct Curl_multi *multi); +void Curl_multi_getsock(struct Curl_easy *data, + struct easy_pollset *ps, + const char *caller); + /** * Borrow the transfer buffer from the multi, suitable * for the given transfer `data`. The buffer may only be used in one diff --git a/lib/urldata.h b/lib/urldata.h index 8147070f94..6a7ddee62c 100644 --- a/lib/urldata.h +++ b/lib/urldata.h @@ -815,9 +815,6 @@ struct connectdata { struct curltime start[2]; /* when filter shutdown started */ unsigned int timeout_ms; /* 0 means no timeout */ } shutdown; - /* Last pollset used in connection shutdown. Used to detect changes - * for multi_socket API. */ - struct easy_pollset shutdown_poll; struct ssl_primary_config ssl_config; #ifndef CURL_DISABLE_PROXY @@ -1885,12 +1882,6 @@ struct Curl_easy { struct Curl_message msg; /* A single posted message. */ - /* Array with the plain socket numbers this handle takes care of, in no - particular order. Note that all sockets are added to the sockhash, where - the state etc are also kept. This array is mostly used to detect when a - socket is to be removed from the hash. See singlesocket(). */ - struct easy_pollset last_poll; - struct Names dns; struct Curl_multi *multi; /* if non-NULL, points to the multi handle struct to which this "belongs" when used by diff --git a/tests/http/test_15_tracing.py b/tests/http/test_15_tracing.py index a826044232..1054d0a26a 100644 --- a/tests/http/test_15_tracing.py +++ b/tests/http/test_15_tracing.py @@ -86,7 +86,7 @@ class TestTracing: m = re.match(r'^([0-9:.]+) \[0-[0x]] .+', line) if m is None: assert False, f'no match: {line}' - m = re.match(r'^([0-9:.]+) \[0-[0x]] . \[TCP].+', line) + m = re.match(r'^([0-9:.]+) \[0-[0x]] .+ \[TCP].+', line) if m is not None: found_tcp = True if not found_tcp: diff --git a/tests/http/test_19_shutdown.py b/tests/http/test_19_shutdown.py index 8dd4825e4c..7d32f0d0a5 100644 --- a/tests/http/test_19_shutdown.py +++ b/tests/http/test_19_shutdown.py @@ -25,6 +25,7 @@ ########################################################################### # import logging +import os import re import pytest @@ -56,7 +57,10 @@ class TestShutdown: def test_19_01_check_tcp_rst(self, env: Env, httpd, proto): if env.ci_run: pytest.skip("seems not to work in CI") - curl = CurlClient(env=env) + run_env = os.environ.copy() + if 'CURL_DEBUG' in run_env: + del run_env['CURL_DEBUG'] + curl = CurlClient(env=env, run_env=run_env) url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-1]' r = curl.http_download(urls=[url], alpn_proto=proto, with_tcpdump=True, extra_args=[ '--parallel' @@ -71,10 +75,12 @@ class TestShutdown: def test_19_02_check_shutdown(self, env: Env, httpd, proto): if not env.curl_is_debug(): pytest.skip('only works for curl debug builds') - curl = CurlClient(env=env, run_env={ + run_env = os.environ.copy() + run_env.update({ 'CURL_GRACEFUL_SHUTDOWN': '2000', - 'CURL_DEBUG': 'ssl,tcp' + 'CURL_DEBUG': 'ssl,tcp,lib-ids,multi' }) + curl = CurlClient(env=env, run_env=run_env) url = f'https://{env.authority_for(env.domain1, proto)}/data.json?[0-1]' r = curl.http_download(urls=[url], alpn_proto=proto, with_tcpdump=True, extra_args=[ '--parallel' @@ -91,14 +97,14 @@ class TestShutdown: count = 10 curl = CurlClient(env=env, run_env={ 'CURL_GRACEFUL_SHUTDOWN': '2000', - 'CURL_DEBUG': 'ssl' + 'CURL_DEBUG': 'ssl,multi' }) url = f'https://{env.authority_for(env.domain1, proto)}/curltest/tweak/?'\ f'id=[0-{count-1}]&with_cl&close' r = curl.http_download(urls=[url], alpn_proto=proto) r.check_response(http_status=200, count=count) shutdowns = [line for line in r.trace_lines - if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)] + if re.match(r'.*\[CPOOL\] shutdown, done=1', line)] assert len(shutdowns) == count, f'{shutdowns}' # run downloads with CURLOPT_FORBID_REUSE set, meaning *we* close @@ -112,7 +118,7 @@ class TestShutdown: url = f'https://localhost:{env.https_port}/{docname}' client = LocalClient(name='hx-download', env=env, run_env={ 'CURL_GRACEFUL_SHUTDOWN': '2000', - 'CURL_DEBUG': 'ssl' + 'CURL_DEBUG': 'ssl,multi' }) if not client.exists(): pytest.skip(f'example client not built: {client.name}') @@ -121,7 +127,7 @@ class TestShutdown: ]) r.check_exit_code(0) shutdowns = [line for line in r.trace_lines - if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)] + if re.match(r'.*CPOOL\] shutdown, done=1', line)] assert len(shutdowns) == count, f'{shutdowns}' # run event-based downloads with CURLOPT_FORBID_REUSE set, meaning *we* close @@ -131,13 +137,13 @@ class TestShutdown: if not env.curl_is_debug(): pytest.skip('only works for curl debug builds') count = 10 - curl = CurlClient(env=env, run_env={ - # forbid connection reuse to trigger shutdowns after transfer - 'CURL_FORBID_REUSE': '1', - # make socket receives block 50% of the time to delay shutdown - 'CURL_DBG_SOCK_RBLOCK': '50', - 'CURL_DEBUG': 'ssl' - }) + run_env = os.environ.copy() + # forbid connection reuse to trigger shutdowns after transfer + run_env['CURL_FORBID_REUSE'] = '1' + # make socket receives block 50% of the time to delay shutdown + run_env['CURL_DBG_SOCK_RBLOCK'] = '50' + run_env['CURL_DEBUG'] = 'ssl,multi,lib-ids' + curl = CurlClient(env=env, run_env=run_env) url = f'https://{env.authority_for(env.domain1, proto)}/curltest/tweak/?'\ f'id=[0-{count-1}]&with_cl&' r = curl.http_download(urls=[url], alpn_proto=proto, extra_args=[ @@ -146,7 +152,7 @@ class TestShutdown: r.check_response(http_status=200, count=count) # check that we closed all connections closings = [line for line in r.trace_lines - if re.match(r'.*CCACHE\] closing #\d+', line)] + if re.match(r'.*CPOOL\] closing', line)] assert len(closings) == count, f'{closings}' # check that all connection sockets were removed from event removes = [line for line in r.trace_lines @@ -171,5 +177,5 @@ class TestShutdown: r.check_response(http_status=200, count=2) # check connection cache closings shutdowns = [line for line in r.trace_lines - if re.match(r'.*CCACHE\] shutdown #\d+, done=1', line)] + if re.match(r'.*CPOOL\] shutdown, done=1', line)] assert len(shutdowns) == 1, f'{shutdowns}' diff --git a/tests/unit/unit1652.c b/tests/unit/unit1652.c index 73d6436f14..6a292cc96c 100644 --- a/tests/unit/unit1652.c +++ b/tests/unit/unit1652.c @@ -117,33 +117,45 @@ fail_unless(strlen(output) == 1, "Empty string"); Curl_infof(testdata, "%s", (char *)NULL); fail_unless(verify(output, "(nil)") == 0, "Passing NULL as string"); +/* Note: libcurl's tracebuffer hold 2048 bytes, so the max strlen() we + * get out of it is 2047, since we need a \0 at the end. + * Curl_infof() in addition adds a \n at the end, making the effective + * output 2046 characters. + * Any input that long or longer will truncated, ending in '...\n'. + */ + /* A string just long enough to not be truncated */ memset(input, '\0', sizeof(input)); -memset(input, 'A', 2047); +memset(input, 'A', 2045); Curl_infof(testdata, "%s", input); -fail_unless(strlen(output) == 2048, "No truncation of infof input"); +fprintf(stderr, "output len %d: %s", (int)strlen(output), output); +/* output is input + \n */ +fail_unless(strlen(output) == 2046, "No truncation of infof input"); fail_unless(verify(output, input) == 0, "No truncation of infof input"); fail_unless(output[sizeof(output) - 1] == '\0', "No truncation of infof input"); /* Just over the limit without newline for truncation via '...' */ -memset(input + 2047, 'A', 4); +memset(input + 2045, 'A', 4); Curl_infof(testdata, "%s", input); -fail_unless(strlen(output) == 2051, "Truncation of infof input 1"); +fprintf(stderr, "output len %d: %s", (int)strlen(output), output); +fail_unless(strlen(output) == 2047, "Truncation of infof input 1"); fail_unless(output[sizeof(output) - 1] == '\0', "Truncation of infof input 1"); /* Just over the limit with newline for truncation via '...' */ -memset(input + 2047, 'A', 4); -memset(input + 2047 + 4, '\n', 1); +memset(input + 2045, 'A', 4); +memset(input + 2045 + 4, '\n', 1); Curl_infof(testdata, "%s", input); -fail_unless(strlen(output) == 2051, "Truncation of infof input 2"); +fprintf(stderr, "output len %d: %s", (int)strlen(output), output); +fail_unless(strlen(output) == 2047, "Truncation of infof input 2"); fail_unless(output[sizeof(output) - 1] == '\0', "Truncation of infof input 2"); /* Way over the limit for truncation via '...' */ memset(input, '\0', sizeof(input)); memset(input, 'A', sizeof(input) - 1); Curl_infof(testdata, "%s", input); -fail_unless(strlen(output) == 2051, "Truncation of infof input 3"); +fprintf(stderr, "output len %d: %s", (int)strlen(output), output); +fail_unless(strlen(output) == 2047, "Truncation of infof input 3"); fail_unless(output[sizeof(output) - 1] == '\0', "Truncation of infof input 3"); #if defined(CURL_GNUC_DIAG) && !defined(__clang__)