]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/rest: track connection failures per-IP instead of per-endpoint
authorOguzhan Ozmen <oozmen@bloomberg.net>
Tue, 3 Mar 2026 00:45:59 +0000 (00:45 +0000)
committerOguzhan Ozmen <oozmen@bloomberg.net>
Tue, 2 Jun 2026 22:16:20 +0000 (22:16 +0000)
Previously, when a connection to a zone endpoint failed, the entire
endpoint was marked as unavailable for a timeout period. Since we now
resolve endpoints to all their IP addresses (via DNS A/AAAA records),
we can be more granular: track failures at the individual IP level.

Introduce ResolvedIP struct that pairs each IP's connect_to string
with its own failure timestamp. When selecting an IP for a request,
round-robin skips IPs that have recently failed, allowing traffic to
continue flowing to healthy nodes even when some are down.

An endpoint-level last_failure_time is maintained as a fast-path
optimization to avoid scanning all IPs when none have failed recently.

Signed-off-by: Oguzhan Ozmen <oozmen@bloomberg.net>
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h

index 459559b4cf34fc4e8e4b59ef7971249e80eb2866..412a33d95a113024cacc32c75a65003918d21f62 100644 (file)
@@ -60,14 +60,14 @@ void RGWRESTConn::resolve_endpoints() {
       std::string port_str = std::to_string(res_ep.port);
       std::string host_port_prefix = res_ep.host + ":" + port_str + ":";
 
+      res_ep.resolved_ips.reserve(results.size());
       for (const auto& entry : results) {
         auto ip_str = entry.endpoint().address().to_string();
-        res_ep.ips.push_back(ip_str);
-        res_ep.connect_to_strings.emplace_back(host_port_prefix + ip_str + ":" + port_str);
+        res_ep.resolved_ips.emplace_back(host_port_prefix + ip_str + ":" + port_str);
         ldout(cct, 2) << "endpoint_url=" << ep_url << " resolved to ip=" << ip_str << dendl;
       }
       ldout(cct, 2) << "endpoint=" << ep_url << " resolved to "
-                << res_ep.ips.size() << " IP addresses" << dendl;
+                << res_ep.resolved_ips.size() << " IP addresses" << dendl;
     } else {
       ldout(cct, 0) << "WARNING: RGWRESTConn no IP addresses found for endpoint=" << ep_url
                     << (ec ? " err=" + ec.message() : "") << dendl;
@@ -89,7 +89,6 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct, rgw::sal::Driver* driver,
   for (const auto& ep_url : remote_endpoints) {
     ResolvedEndpoint res_ep;
     res_ep.url = ep_url;
-    res_ep.status.store(ceph::real_clock::zero());  // Initial status: connectable
     resolved_endpoints.push_back(std::move(res_ep));
   }
   resolve_endpoints();
@@ -118,7 +117,6 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct,
   for (const auto& ep_url : remote_endpoints) {
     ResolvedEndpoint res_ep;
     res_ep.url = ep_url;
-    res_ep.status.store(ceph::real_clock::zero());  // Initial status: connectable
     resolved_endpoints.push_back(std::move(res_ep));
   }
   resolve_endpoints();
@@ -165,10 +163,38 @@ void RGWRESTConn::populate_connect_to(RGWEndpoint& endpoint, ResolvedEndpoint& r
     return;
   }
 
-  if (!resolved_endpoint.connect_to_strings.empty()) {
-    size_t idx = resolved_endpoint.rr_index++;
-    endpoint.set_connect_to(resolved_endpoint.connect_to_strings[idx % resolved_endpoint.connect_to_strings.size()]);
+  if (resolved_endpoint.resolved_ips.empty()) {
+    return;
+  }
+
+  static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
+  const size_t num_ips = resolved_endpoint.resolved_ips.size();
+
+  // Round-robin through IPs, skipping any that are marked down
+  for (size_t i = 0; i < num_ips; ++i) {
+    size_t idx = resolved_endpoint.endpoint_ips_round_robin_counter++ % num_ips;
+    ResolvedIP& ip_status = resolved_endpoint.resolved_ips[idx];
+
+    const auto& last_fail = ip_status.last_failure.load();
+    if (ceph::real_clock::is_zero(last_fail)) {
+      endpoint.set_connect_to(ip_status.connect_to);  // IP is up
+      return;
+    }
+
+    auto diff = ceph::to_seconds<double>(ceph::real_clock::now() - last_fail);
+    if (diff >= CONN_STATUS_EXPIRE_SECS) {
+      // Failure expired, mark IP as up and use it
+      ip_status.mark_up();
+      ldout(cct, 5) << "IP " << ip_status.connect_to << " failure expired, marking up" << dendl;
+      endpoint.set_connect_to(ip_status.connect_to);
+      return;
+    }
   }
+
+  // All IPs are down - do not populate connect_to; i.e.,
+  // let libcurl handle it without connect_to hint.
+  ldout(cct, 5) << "All IPs down for endpoint=" << resolved_endpoint.url
+    << " - skip connect_to hint" << dendl;
 }
 
 int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
@@ -178,6 +204,37 @@ int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
     return -EINVAL;
   }
 
+  static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
+  auto now = ceph::real_clock::now();
+
+  // Helper to check if an endpoint has at least one available IP
+  auto endpoint_has_available_ip = [&](ResolvedEndpoint& res_ep) -> bool {
+    // If no IP resolution, endpoint is available (will use DNS directly)
+    if (res_ep.resolved_ips.empty()) {
+      return true;
+    }
+
+    // Fast path: if no recent failures at endpoint level, all IPs are available
+    const auto& ep_last_fail = res_ep.last_failure_time.load();
+    if (ceph::real_clock::is_zero(ep_last_fail) ||
+        ceph::to_seconds<double>(now - ep_last_fail) >= CONN_STATUS_EXPIRE_SECS) {
+      return true;
+    }
+
+    // Slow path: check individual IPs (only when there's a recent failure)
+    for (auto& ip_status : res_ep.resolved_ips) {
+      const auto& last_fail = ip_status.last_failure.load();
+      if (ceph::real_clock::is_zero(last_fail)) {
+        return true;  // This IP is up
+      }
+      auto diff = ceph::to_seconds<double>(now - last_fail);
+      if (diff >= CONN_STATUS_EXPIRE_SECS) {
+        return true;  // This IP's failure has expired
+      }
+    }
+    return false;  // All IPs are down
+  };
+
   size_t num = 0;
   size_t selected_idx = 0;
   while (num < resolved_endpoints.size()) {
@@ -185,33 +242,18 @@ int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
     selected_idx = i % resolved_endpoints.size();
 
     ResolvedEndpoint& res_ep = resolved_endpoints[selected_idx];
-    const std::string& ep_url = res_ep.url;
-    endpoint.set_url(ep_url);
 
-    const auto& upd_time = res_ep.status.load();
-
-    if (ceph::real_clock::is_zero(upd_time)) {
+    if (endpoint_has_available_ip(res_ep)) {
+      endpoint.set_url(res_ep.url);
       break;
     }
 
-    auto diff = ceph::to_seconds<double>(ceph::real_clock::now() - upd_time);
-
-    ldout(cct, 20) << "endpoint url=" << ep_url
-                   << " last endpoint status update time="
-                   << ceph::real_clock::to_double(upd_time)
-                   << " diff=" << diff << dendl;
-
-    static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
-    if (diff >= CONN_STATUS_EXPIRE_SECS) {
-      res_ep.status.store(ceph::real_clock::zero());
-      ldout(cct, 10) << endpoint << " unconnectable status expired. mark it connectable" << dendl;
-      break;
-    }
+    ldout(cct, 5) << "endpoint url=" << res_ep.url << " all IPs down, trying next" << dendl;
     num++;
-  };
+  }
 
   if (num == resolved_endpoints.size()) {
-    ldout(cct, 5) << "ERROR: no valid endpoint" << dendl;
+    ldout(cct, 1) << "ERROR: no valid endpoint (all IPs down for all endpoints)" << dendl;
     return -EINVAL;
   }
 
@@ -231,17 +273,34 @@ RGWEndpoint RGWRESTConn::get_endpoint()
 void RGWRESTConn::set_endpoint_unconnectable(const RGWEndpoint& endpoint)
 {
   const string& orig_url = endpoint.get_original_url();
+  const string& connect_to = endpoint.get_connect_to();
 
   ResolvedEndpoint* res_ep = find_resolved_endpoint(orig_url);
   if (orig_url.empty() || !res_ep) {
-    ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status: "
-                  << endpoint << dendl;
+    ldout(cct, 0) << "ERROR: endpoint is not valid or not found: "
+      << endpoint << dendl;
     return;
   }
 
-  res_ep->status.store(ceph::real_clock::now());
+  // Update endpoint-level last_failure_time for fast-path optimization
+  auto now = ceph::real_clock::now();
+  res_ep->last_failure_time.store(now);
 
-  ldout(cct, 10) << "set endpoint unconnectable. url=" << orig_url << dendl;
+  // If we have a connect_to string, mark that specific IP as down as well
+  if (!connect_to.empty()) {
+    ResolvedIP* res_ip = res_ep->find_ip_status(connect_to);
+    if (res_ip) {
+      res_ip->mark_down();
+      ldout(cct, 10) << "set IP unconnectable: " << connect_to << dendl;
+      return;
+    }
+  }
+
+  // Fallback: mark all IPs for this endpoint as down
+  for (auto& res_ip : res_ep->resolved_ips) {
+    res_ip.mark_down();
+  }
+  ldout(cct, 10) << "set all IPs unconnectable for endpoint url=" << orig_url << dendl;
 }
 
 void RGWRESTConn::populate_params(param_vec_t& params, const rgw_owner* uid, const string& zonegroup)
index c4a9389f1bcf9562f35119becc98ee5f8bb393eb..66c849f1a1e97d13b6b00e8a6c7c5367c2664755 100644 (file)
@@ -65,48 +65,91 @@ inline param_vec_t make_param_list(const std::map<std::string, std::string> *pp)
   return params;
 }
 
+/**
+ * ResolvedIP - Per-IP connection status tracking.
+ *
+ * Each resolved IP address has its own failure status. An IP is considered
+ * "down" if last_failure is non-zero and less than CONN_STATUS_EXPIRE_SECS old.
+ * After the timeout, the IP becomes eligible for retry.
+ */
+struct ResolvedIP {
+  std::string connect_to;  // Pre-computed "host:port:ip:port" for CURLOPT_CONNECT_TO
+  mutable std::atomic<ceph::real_time> last_failure;
+
+  ResolvedIP() : last_failure(ceph::real_clock::zero()) {}
+
+  explicit ResolvedIP(std::string _connect_to)
+    : connect_to(std::move(_connect_to)), last_failure(ceph::real_clock::zero()) {}
+
+  // Move & assignment operations (required because std::atomic is not movable)
+  ResolvedIP(ResolvedIP&& o) noexcept
+    : connect_to(std::move(o.connect_to)), last_failure(o.last_failure.load()) {}
+
+  ResolvedIP& operator=(ResolvedIP&& o) noexcept {
+    connect_to = std::move(o.connect_to);
+    last_failure.store(o.last_failure.load());
+    return *this;
+  }
+
+  // Delete copy (std::atomic is not copyable)
+  ResolvedIP(const ResolvedIP&) = delete;
+  ResolvedIP& operator=(const ResolvedIP&) = delete;
+
+  void mark_down() const { last_failure.store(ceph::real_clock::now()); }
+  void mark_up() const { last_failure.store(ceph::real_clock::zero()); }
+};
+
+/**
+ * ResolvedEndpoint - A zone endpoint URL with its resolved IP addresses.
+ *
+ * Tracks per-IP connection status. An endpoint is considered "down" only when
+ * ALL of its IPs are marked as failed (within the retry timeout window).
+ */
 struct ResolvedEndpoint {
   std::string url;                // e.g., "https://s3.abc.com:8443"
   std::string scheme;             // e.g., "https"
   std::string host;               // e.g., "s3.abc.com"
   int port = -1;                  // e.g., 8443
-  std::vector<std::string> ips; // the IPs the endpoint resolves to
-  std::vector<std::string> connect_to_strings;  // Pre-computed full connect_to strings for each IP
-  size_t rr_index = 0;            // round-robin index for IPs
-
-  /* endpoint health state: the endpoint is not able to connect if the timestamp is not real_clock::zero */
-  std::atomic<ceph::real_time> status;
+  std::vector<ResolvedIP> resolved_ips;  // Per-IP connect_to strings with health status
+  mutable size_t endpoint_ips_round_robin_counter = 0;    // round-robin index for IPs
+  mutable std::atomic<ceph::real_time> last_failure_time; // most recent IP failure seen on this endpoint
 
-  ResolvedEndpoint() = default;
+  ResolvedEndpoint() : last_failure_time(ceph::real_clock::zero()) {}
 
-  // Custom move constructor (required because std::atomic is not movable)
+  // Custom move constructor (required because of atomics in ResolvedIP)
   ResolvedEndpoint(ResolvedEndpoint&& other) noexcept
     : url(std::move(other.url)),
       scheme(std::move(other.scheme)),
       host(std::move(other.host)),
       port(other.port),
-      ips(std::move(other.ips)),
-      connect_to_strings(std::move(other.connect_to_strings)),
-      rr_index(other.rr_index),
-      status(other.status.load())
+      resolved_ips(std::move(other.resolved_ips)),
+      endpoint_ips_round_robin_counter(other.endpoint_ips_round_robin_counter),
+      last_failure_time(other.last_failure_time.load())
   {}
 
-  // Custom move assignment (required because std::atomic is not movable)
+  // Custom move assignment
   ResolvedEndpoint& operator=(ResolvedEndpoint&& other) noexcept {
     url = std::move(other.url);
     scheme = std::move(other.scheme);
     host = std::move(other.host);
     port = other.port;
-    ips = std::move(other.ips);
-    connect_to_strings = std::move(other.connect_to_strings);
-    rr_index = other.rr_index;
-    status.store(other.status.load());
+    resolved_ips = std::move(other.resolved_ips);
+    endpoint_ips_round_robin_counter = other.endpoint_ips_round_robin_counter;
+    last_failure_time.store(other.last_failure_time.load());
     return *this;
   }
 
-  // Delete copy operations (std::atomic is not copyable)
+  // Delete copy operations
   ResolvedEndpoint(const ResolvedEndpoint&) = delete;
   ResolvedEndpoint& operator=(const ResolvedEndpoint&) = delete;
+
+  // Find IP status by connect_to string
+  ResolvedIP* find_ip_status(const std::string& connect_to_str) {
+    for (auto& ip : resolved_ips) {
+      if (ip.connect_to == connect_to_str) return &ip;
+    }
+    return nullptr;
+  }
 };
 
 class RGWRESTConn