std::string port_str = std::to_string(res_ep.port);
std::string host_port_prefix = res_ep.host + ":" + port_str + ":";
+ res_ep.resolved_ips.reserve(results.size());
for (const auto& entry : results) {
auto ip_str = entry.endpoint().address().to_string();
- res_ep.ips.push_back(ip_str);
- res_ep.connect_to_strings.emplace_back(host_port_prefix + ip_str + ":" + port_str);
+ res_ep.resolved_ips.emplace_back(host_port_prefix + ip_str + ":" + port_str);
ldout(cct, 2) << "endpoint_url=" << ep_url << " resolved to ip=" << ip_str << dendl;
}
ldout(cct, 2) << "endpoint=" << ep_url << " resolved to "
- << res_ep.ips.size() << " IP addresses" << dendl;
+ << res_ep.resolved_ips.size() << " IP addresses" << dendl;
} else {
ldout(cct, 0) << "WARNING: RGWRESTConn no IP addresses found for endpoint=" << ep_url
<< (ec ? " err=" + ec.message() : "") << dendl;
for (const auto& ep_url : remote_endpoints) {
ResolvedEndpoint res_ep;
res_ep.url = ep_url;
- res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
resolved_endpoints.push_back(std::move(res_ep));
}
resolve_endpoints();
for (const auto& ep_url : remote_endpoints) {
ResolvedEndpoint res_ep;
res_ep.url = ep_url;
- res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
resolved_endpoints.push_back(std::move(res_ep));
}
resolve_endpoints();
return;
}
- if (!resolved_endpoint.connect_to_strings.empty()) {
- size_t idx = resolved_endpoint.rr_index++;
- endpoint.set_connect_to(resolved_endpoint.connect_to_strings[idx % resolved_endpoint.connect_to_strings.size()]);
+ if (resolved_endpoint.resolved_ips.empty()) {
+ return;
+ }
+
+ static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
+ const size_t num_ips = resolved_endpoint.resolved_ips.size();
+
+ // Round-robin through IPs, skipping any that are marked down
+ for (size_t i = 0; i < num_ips; ++i) {
+ size_t idx = resolved_endpoint.endpoint_ips_round_robin_counter++ % num_ips;
+ ResolvedIP& ip_status = resolved_endpoint.resolved_ips[idx];
+
+ const auto& last_fail = ip_status.last_failure.load();
+ if (ceph::real_clock::is_zero(last_fail)) {
+ endpoint.set_connect_to(ip_status.connect_to); // IP is up
+ return;
+ }
+
+ auto diff = ceph::to_seconds<double>(ceph::real_clock::now() - last_fail);
+ if (diff >= CONN_STATUS_EXPIRE_SECS) {
+ // Failure expired, mark IP as up and use it
+ ip_status.mark_up();
+ ldout(cct, 5) << "IP " << ip_status.connect_to << " failure expired, marking up" << dendl;
+ endpoint.set_connect_to(ip_status.connect_to);
+ return;
+ }
}
+
+ // All IPs are down - do not populate connect_to; i.e.,
+ // let libcurl handle it without connect_to hint.
+ ldout(cct, 5) << "All IPs down for endpoint=" << resolved_endpoint.url
+ << " - skip connect_to hint" << dendl;
}
int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
return -EINVAL;
}
+ static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
+ auto now = ceph::real_clock::now();
+
+ // Helper to check if an endpoint has at least one available IP
+ auto endpoint_has_available_ip = [&](ResolvedEndpoint& res_ep) -> bool {
+ // If no IP resolution, endpoint is available (will use DNS directly)
+ if (res_ep.resolved_ips.empty()) {
+ return true;
+ }
+
+ // Fast path: if no recent failures at endpoint level, all IPs are available
+ const auto& ep_last_fail = res_ep.last_failure_time.load();
+ if (ceph::real_clock::is_zero(ep_last_fail) ||
+ ceph::to_seconds<double>(now - ep_last_fail) >= CONN_STATUS_EXPIRE_SECS) {
+ return true;
+ }
+
+ // Slow path: check individual IPs (only when there's a recent failure)
+ for (auto& ip_status : res_ep.resolved_ips) {
+ const auto& last_fail = ip_status.last_failure.load();
+ if (ceph::real_clock::is_zero(last_fail)) {
+ return true; // This IP is up
+ }
+ auto diff = ceph::to_seconds<double>(now - last_fail);
+ if (diff >= CONN_STATUS_EXPIRE_SECS) {
+ return true; // This IP's failure has expired
+ }
+ }
+ return false; // All IPs are down
+ };
+
size_t num = 0;
size_t selected_idx = 0;
while (num < resolved_endpoints.size()) {
selected_idx = i % resolved_endpoints.size();
ResolvedEndpoint& res_ep = resolved_endpoints[selected_idx];
- const std::string& ep_url = res_ep.url;
- endpoint.set_url(ep_url);
- const auto& upd_time = res_ep.status.load();
-
- if (ceph::real_clock::is_zero(upd_time)) {
+ if (endpoint_has_available_ip(res_ep)) {
+ endpoint.set_url(res_ep.url);
break;
}
- auto diff = ceph::to_seconds<double>(ceph::real_clock::now() - upd_time);
-
- ldout(cct, 20) << "endpoint url=" << ep_url
- << " last endpoint status update time="
- << ceph::real_clock::to_double(upd_time)
- << " diff=" << diff << dendl;
-
- static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
- if (diff >= CONN_STATUS_EXPIRE_SECS) {
- res_ep.status.store(ceph::real_clock::zero());
- ldout(cct, 10) << endpoint << " unconnectable status expired. mark it connectable" << dendl;
- break;
- }
+ ldout(cct, 5) << "endpoint url=" << res_ep.url << " all IPs down, trying next" << dendl;
num++;
- };
+ }
if (num == resolved_endpoints.size()) {
- ldout(cct, 5) << "ERROR: no valid endpoint" << dendl;
+ ldout(cct, 1) << "ERROR: no valid endpoint (all IPs down for all endpoints)" << dendl;
return -EINVAL;
}
void RGWRESTConn::set_endpoint_unconnectable(const RGWEndpoint& endpoint)
{
const string& orig_url = endpoint.get_original_url();
+ const string& connect_to = endpoint.get_connect_to();
ResolvedEndpoint* res_ep = find_resolved_endpoint(orig_url);
if (orig_url.empty() || !res_ep) {
- ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status: "
- << endpoint << dendl;
+ ldout(cct, 0) << "ERROR: endpoint is not valid or not found: "
+ << endpoint << dendl;
return;
}
- res_ep->status.store(ceph::real_clock::now());
+ // Update endpoint-level last_failure_time for fast-path optimization
+ auto now = ceph::real_clock::now();
+ res_ep->last_failure_time.store(now);
- ldout(cct, 10) << "set endpoint unconnectable. url=" << orig_url << dendl;
+ // If we have a connect_to string, mark that specific IP as down as well
+ if (!connect_to.empty()) {
+ ResolvedIP* res_ip = res_ep->find_ip_status(connect_to);
+ if (res_ip) {
+ res_ip->mark_down();
+ ldout(cct, 10) << "set IP unconnectable: " << connect_to << dendl;
+ return;
+ }
+ }
+
+ // Fallback: mark all IPs for this endpoint as down
+ for (auto& res_ip : res_ep->resolved_ips) {
+ res_ip.mark_down();
+ }
+ ldout(cct, 10) << "set all IPs unconnectable for endpoint url=" << orig_url << dendl;
}
void RGWRESTConn::populate_params(param_vec_t& params, const rgw_owner* uid, const string& zonegroup)
return params;
}
+/**
+ * ResolvedIP - Per-IP connection status tracking.
+ *
+ * Each resolved IP address has its own failure status. An IP is considered
+ * "down" if last_failure is non-zero and less than CONN_STATUS_EXPIRE_SECS old.
+ * After the timeout, the IP becomes eligible for retry.
+ */
+struct ResolvedIP {
+ std::string connect_to; // Pre-computed "host:port:ip:port" for CURLOPT_CONNECT_TO
+ mutable std::atomic<ceph::real_time> last_failure;
+
+ ResolvedIP() : last_failure(ceph::real_clock::zero()) {}
+
+ explicit ResolvedIP(std::string _connect_to)
+ : connect_to(std::move(_connect_to)), last_failure(ceph::real_clock::zero()) {}
+
+ // Move & assignment operations (required because std::atomic is not movable)
+ ResolvedIP(ResolvedIP&& o) noexcept
+ : connect_to(std::move(o.connect_to)), last_failure(o.last_failure.load()) {}
+
+ ResolvedIP& operator=(ResolvedIP&& o) noexcept {
+ connect_to = std::move(o.connect_to);
+ last_failure.store(o.last_failure.load());
+ return *this;
+ }
+
+ // Delete copy (std::atomic is not copyable)
+ ResolvedIP(const ResolvedIP&) = delete;
+ ResolvedIP& operator=(const ResolvedIP&) = delete;
+
+ void mark_down() const { last_failure.store(ceph::real_clock::now()); }
+ void mark_up() const { last_failure.store(ceph::real_clock::zero()); }
+};
+
+/**
+ * ResolvedEndpoint - A zone endpoint URL with its resolved IP addresses.
+ *
+ * Tracks per-IP connection status. An endpoint is considered "down" only when
+ * ALL of its IPs are marked as failed (within the retry timeout window).
+ */
struct ResolvedEndpoint {
std::string url; // e.g., "https://s3.abc.com:8443"
std::string scheme; // e.g., "https"
std::string host; // e.g., "s3.abc.com"
int port = -1; // e.g., 8443
- std::vector<std::string> ips; // the IPs the endpoint resolves to
- std::vector<std::string> connect_to_strings; // Pre-computed full connect_to strings for each IP
- size_t rr_index = 0; // round-robin index for IPs
-
- /* endpoint health state: the endpoint is not able to connect if the timestamp is not real_clock::zero */
- std::atomic<ceph::real_time> status;
+ std::vector<ResolvedIP> resolved_ips; // Per-IP connect_to strings with health status
+ mutable size_t endpoint_ips_round_robin_counter = 0; // round-robin index for IPs
+ mutable std::atomic<ceph::real_time> last_failure_time; // most recent IP failure seen on this endpoint
- ResolvedEndpoint() = default;
+ ResolvedEndpoint() : last_failure_time(ceph::real_clock::zero()) {}
- // Custom move constructor (required because std::atomic is not movable)
+ // Custom move constructor (required because of atomics in ResolvedIP)
ResolvedEndpoint(ResolvedEndpoint&& other) noexcept
: url(std::move(other.url)),
scheme(std::move(other.scheme)),
host(std::move(other.host)),
port(other.port),
- ips(std::move(other.ips)),
- connect_to_strings(std::move(other.connect_to_strings)),
- rr_index(other.rr_index),
- status(other.status.load())
+ resolved_ips(std::move(other.resolved_ips)),
+ endpoint_ips_round_robin_counter(other.endpoint_ips_round_robin_counter),
+ last_failure_time(other.last_failure_time.load())
{}
- // Custom move assignment (required because std::atomic is not movable)
+ // Custom move assignment
ResolvedEndpoint& operator=(ResolvedEndpoint&& other) noexcept {
url = std::move(other.url);
scheme = std::move(other.scheme);
host = std::move(other.host);
port = other.port;
- ips = std::move(other.ips);
- connect_to_strings = std::move(other.connect_to_strings);
- rr_index = other.rr_index;
- status.store(other.status.load());
+ resolved_ips = std::move(other.resolved_ips);
+ endpoint_ips_round_robin_counter = other.endpoint_ips_round_robin_counter;
+ last_failure_time.store(other.last_failure_time.load());
return *this;
}
- // Delete copy operations (std::atomic is not copyable)
+ // Delete copy operations
ResolvedEndpoint(const ResolvedEndpoint&) = delete;
ResolvedEndpoint& operator=(const ResolvedEndpoint&) = delete;
+
+ // Find IP status by connect_to string
+ ResolvedIP* find_ip_status(const std::string& connect_to_str) {
+ for (auto& ip : resolved_ips) {
+ if (ip.connect_to == connect_to_str) return &ip;
+ }
+ return nullptr;
+ }
};
class RGWRESTConn