From: Oguzhan Ozmen Date: Tue, 3 Mar 2026 00:45:59 +0000 (+0000) Subject: rgw/rest: track connection failures per-IP instead of per-endpoint X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2d7bc7818c5976702cbddcb228f6ac13dcee8697;p=ceph.git rgw/rest: track connection failures per-IP instead of per-endpoint Previously, when a connection to a zone endpoint failed, the entire endpoint was marked as unavailable for a timeout period. Since we now resolve endpoints to all their IP addresses (via DNS A/AAAA records), we can be more granular: track failures at the individual IP level. Introduce ResolvedIP struct that pairs each IP's connect_to string with its own failure timestamp. When selecting an IP for a request, round-robin skips IPs that have recently failed, allowing traffic to continue flowing to healthy nodes even when some are down. An endpoint-level last_failure_time is maintained as a fast-path optimization to avoid scanning all IPs when none have failed recently. Signed-off-by: Oguzhan Ozmen --- diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 459559b4cf3..412a33d95a1 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -60,14 +60,14 @@ void RGWRESTConn::resolve_endpoints() { std::string port_str = std::to_string(res_ep.port); std::string host_port_prefix = res_ep.host + ":" + port_str + ":"; + res_ep.resolved_ips.reserve(results.size()); for (const auto& entry : results) { auto ip_str = entry.endpoint().address().to_string(); - res_ep.ips.push_back(ip_str); - res_ep.connect_to_strings.emplace_back(host_port_prefix + ip_str + ":" + port_str); + res_ep.resolved_ips.emplace_back(host_port_prefix + ip_str + ":" + port_str); ldout(cct, 2) << "endpoint_url=" << ep_url << " resolved to ip=" << ip_str << dendl; } ldout(cct, 2) << "endpoint=" << ep_url << " resolved to " - << res_ep.ips.size() << " IP addresses" << dendl; + << res_ep.resolved_ips.size() << " IP addresses" << dendl; } else { ldout(cct, 0) << "WARNING: RGWRESTConn no IP addresses found for endpoint=" << ep_url << (ec ? " err=" + ec.message() : "") << dendl; @@ -89,7 +89,6 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct, rgw::sal::Driver* driver, for (const auto& ep_url : remote_endpoints) { ResolvedEndpoint res_ep; res_ep.url = ep_url; - res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable resolved_endpoints.push_back(std::move(res_ep)); } resolve_endpoints(); @@ -118,7 +117,6 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct, for (const auto& ep_url : remote_endpoints) { ResolvedEndpoint res_ep; res_ep.url = ep_url; - res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable resolved_endpoints.push_back(std::move(res_ep)); } resolve_endpoints(); @@ -165,10 +163,38 @@ void RGWRESTConn::populate_connect_to(RGWEndpoint& endpoint, ResolvedEndpoint& r return; } - if (!resolved_endpoint.connect_to_strings.empty()) { - size_t idx = resolved_endpoint.rr_index++; - endpoint.set_connect_to(resolved_endpoint.connect_to_strings[idx % resolved_endpoint.connect_to_strings.size()]); + if (resolved_endpoint.resolved_ips.empty()) { + return; + } + + static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2; + const size_t num_ips = resolved_endpoint.resolved_ips.size(); + + // Round-robin through IPs, skipping any that are marked down + for (size_t i = 0; i < num_ips; ++i) { + size_t idx = resolved_endpoint.endpoint_ips_round_robin_counter++ % num_ips; + ResolvedIP& ip_status = resolved_endpoint.resolved_ips[idx]; + + const auto& last_fail = ip_status.last_failure.load(); + if (ceph::real_clock::is_zero(last_fail)) { + endpoint.set_connect_to(ip_status.connect_to); // IP is up + return; + } + + auto diff = ceph::to_seconds(ceph::real_clock::now() - last_fail); + if (diff >= CONN_STATUS_EXPIRE_SECS) { + // Failure expired, mark IP as up and use it + ip_status.mark_up(); + ldout(cct, 5) << "IP " << ip_status.connect_to << " failure expired, marking up" << dendl; + endpoint.set_connect_to(ip_status.connect_to); + return; + } } + + // All IPs are down - do not populate connect_to; i.e., + // let libcurl handle it without connect_to hint. + ldout(cct, 5) << "All IPs down for endpoint=" << resolved_endpoint.url + << " - skip connect_to hint" << dendl; } int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint) @@ -178,6 +204,37 @@ int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint) return -EINVAL; } + static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2; + auto now = ceph::real_clock::now(); + + // Helper to check if an endpoint has at least one available IP + auto endpoint_has_available_ip = [&](ResolvedEndpoint& res_ep) -> bool { + // If no IP resolution, endpoint is available (will use DNS directly) + if (res_ep.resolved_ips.empty()) { + return true; + } + + // Fast path: if no recent failures at endpoint level, all IPs are available + const auto& ep_last_fail = res_ep.last_failure_time.load(); + if (ceph::real_clock::is_zero(ep_last_fail) || + ceph::to_seconds(now - ep_last_fail) >= CONN_STATUS_EXPIRE_SECS) { + return true; + } + + // Slow path: check individual IPs (only when there's a recent failure) + for (auto& ip_status : res_ep.resolved_ips) { + const auto& last_fail = ip_status.last_failure.load(); + if (ceph::real_clock::is_zero(last_fail)) { + return true; // This IP is up + } + auto diff = ceph::to_seconds(now - last_fail); + if (diff >= CONN_STATUS_EXPIRE_SECS) { + return true; // This IP's failure has expired + } + } + return false; // All IPs are down + }; + size_t num = 0; size_t selected_idx = 0; while (num < resolved_endpoints.size()) { @@ -185,33 +242,18 @@ int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint) selected_idx = i % resolved_endpoints.size(); ResolvedEndpoint& res_ep = resolved_endpoints[selected_idx]; - const std::string& ep_url = res_ep.url; - endpoint.set_url(ep_url); - const auto& upd_time = res_ep.status.load(); - - if (ceph::real_clock::is_zero(upd_time)) { + if (endpoint_has_available_ip(res_ep)) { + endpoint.set_url(res_ep.url); break; } - auto diff = ceph::to_seconds(ceph::real_clock::now() - upd_time); - - ldout(cct, 20) << "endpoint url=" << ep_url - << " last endpoint status update time=" - << ceph::real_clock::to_double(upd_time) - << " diff=" << diff << dendl; - - static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2; - if (diff >= CONN_STATUS_EXPIRE_SECS) { - res_ep.status.store(ceph::real_clock::zero()); - ldout(cct, 10) << endpoint << " unconnectable status expired. mark it connectable" << dendl; - break; - } + ldout(cct, 5) << "endpoint url=" << res_ep.url << " all IPs down, trying next" << dendl; num++; - }; + } if (num == resolved_endpoints.size()) { - ldout(cct, 5) << "ERROR: no valid endpoint" << dendl; + ldout(cct, 1) << "ERROR: no valid endpoint (all IPs down for all endpoints)" << dendl; return -EINVAL; } @@ -231,17 +273,34 @@ RGWEndpoint RGWRESTConn::get_endpoint() void RGWRESTConn::set_endpoint_unconnectable(const RGWEndpoint& endpoint) { const string& orig_url = endpoint.get_original_url(); + const string& connect_to = endpoint.get_connect_to(); ResolvedEndpoint* res_ep = find_resolved_endpoint(orig_url); if (orig_url.empty() || !res_ep) { - ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status: " - << endpoint << dendl; + ldout(cct, 0) << "ERROR: endpoint is not valid or not found: " + << endpoint << dendl; return; } - res_ep->status.store(ceph::real_clock::now()); + // Update endpoint-level last_failure_time for fast-path optimization + auto now = ceph::real_clock::now(); + res_ep->last_failure_time.store(now); - ldout(cct, 10) << "set endpoint unconnectable. url=" << orig_url << dendl; + // If we have a connect_to string, mark that specific IP as down as well + if (!connect_to.empty()) { + ResolvedIP* res_ip = res_ep->find_ip_status(connect_to); + if (res_ip) { + res_ip->mark_down(); + ldout(cct, 10) << "set IP unconnectable: " << connect_to << dendl; + return; + } + } + + // Fallback: mark all IPs for this endpoint as down + for (auto& res_ip : res_ep->resolved_ips) { + res_ip.mark_down(); + } + ldout(cct, 10) << "set all IPs unconnectable for endpoint url=" << orig_url << dendl; } void RGWRESTConn::populate_params(param_vec_t& params, const rgw_owner* uid, const string& zonegroup) diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index c4a9389f1bc..66c849f1a1e 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -65,48 +65,91 @@ inline param_vec_t make_param_list(const std::map *pp) return params; } +/** + * ResolvedIP - Per-IP connection status tracking. + * + * Each resolved IP address has its own failure status. An IP is considered + * "down" if last_failure is non-zero and less than CONN_STATUS_EXPIRE_SECS old. + * After the timeout, the IP becomes eligible for retry. + */ +struct ResolvedIP { + std::string connect_to; // Pre-computed "host:port:ip:port" for CURLOPT_CONNECT_TO + mutable std::atomic last_failure; + + ResolvedIP() : last_failure(ceph::real_clock::zero()) {} + + explicit ResolvedIP(std::string _connect_to) + : connect_to(std::move(_connect_to)), last_failure(ceph::real_clock::zero()) {} + + // Move & assignment operations (required because std::atomic is not movable) + ResolvedIP(ResolvedIP&& o) noexcept + : connect_to(std::move(o.connect_to)), last_failure(o.last_failure.load()) {} + + ResolvedIP& operator=(ResolvedIP&& o) noexcept { + connect_to = std::move(o.connect_to); + last_failure.store(o.last_failure.load()); + return *this; + } + + // Delete copy (std::atomic is not copyable) + ResolvedIP(const ResolvedIP&) = delete; + ResolvedIP& operator=(const ResolvedIP&) = delete; + + void mark_down() const { last_failure.store(ceph::real_clock::now()); } + void mark_up() const { last_failure.store(ceph::real_clock::zero()); } +}; + +/** + * ResolvedEndpoint - A zone endpoint URL with its resolved IP addresses. + * + * Tracks per-IP connection status. An endpoint is considered "down" only when + * ALL of its IPs are marked as failed (within the retry timeout window). + */ struct ResolvedEndpoint { std::string url; // e.g., "https://s3.abc.com:8443" std::string scheme; // e.g., "https" std::string host; // e.g., "s3.abc.com" int port = -1; // e.g., 8443 - std::vector ips; // the IPs the endpoint resolves to - std::vector connect_to_strings; // Pre-computed full connect_to strings for each IP - size_t rr_index = 0; // round-robin index for IPs - - /* endpoint health state: the endpoint is not able to connect if the timestamp is not real_clock::zero */ - std::atomic status; + std::vector resolved_ips; // Per-IP connect_to strings with health status + mutable size_t endpoint_ips_round_robin_counter = 0; // round-robin index for IPs + mutable std::atomic last_failure_time; // most recent IP failure seen on this endpoint - ResolvedEndpoint() = default; + ResolvedEndpoint() : last_failure_time(ceph::real_clock::zero()) {} - // Custom move constructor (required because std::atomic is not movable) + // Custom move constructor (required because of atomics in ResolvedIP) ResolvedEndpoint(ResolvedEndpoint&& other) noexcept : url(std::move(other.url)), scheme(std::move(other.scheme)), host(std::move(other.host)), port(other.port), - ips(std::move(other.ips)), - connect_to_strings(std::move(other.connect_to_strings)), - rr_index(other.rr_index), - status(other.status.load()) + resolved_ips(std::move(other.resolved_ips)), + endpoint_ips_round_robin_counter(other.endpoint_ips_round_robin_counter), + last_failure_time(other.last_failure_time.load()) {} - // Custom move assignment (required because std::atomic is not movable) + // Custom move assignment ResolvedEndpoint& operator=(ResolvedEndpoint&& other) noexcept { url = std::move(other.url); scheme = std::move(other.scheme); host = std::move(other.host); port = other.port; - ips = std::move(other.ips); - connect_to_strings = std::move(other.connect_to_strings); - rr_index = other.rr_index; - status.store(other.status.load()); + resolved_ips = std::move(other.resolved_ips); + endpoint_ips_round_robin_counter = other.endpoint_ips_round_robin_counter; + last_failure_time.store(other.last_failure_time.load()); return *this; } - // Delete copy operations (std::atomic is not copyable) + // Delete copy operations ResolvedEndpoint(const ResolvedEndpoint&) = delete; ResolvedEndpoint& operator=(const ResolvedEndpoint&) = delete; + + // Find IP status by connect_to string + ResolvedIP* find_ip_status(const std::string& connect_to_str) { + for (auto& ip : resolved_ips) { + if (ip.connect_to == connect_to_str) return &ip; + } + return nullptr; + } }; class RGWRESTConn