From: Oguzhan Ozmen Date: Sat, 7 Feb 2026 01:45:20 +0000 (+0000) Subject: rgw/rest: consolidate endpoint status tracking into ResolvedEndpoint X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=030e62b8f14c232d1ba08928e5c16a337d51dba0;p=ceph.git rgw/rest: consolidate endpoint status tracking into ResolvedEndpoint Refactor RGWRESTConn to eliminate the separate endpoints_status map by moving the connection status (std::atomic) directly into the ResolvedEndpoint struct. This reduces redundancy and simplifies endpoint state management. Signed-off-by: Oguzhan Ozmen --- diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 71996fcb401..fb54bc57a81 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -15,12 +15,7 @@ using namespace std; void RGWRESTConn::resolve_endpoints() { - resolved_endpoints.reserve(endpoints.size()); - - for (const auto& ep_url : endpoints) { - ResolvedEndpoint res_ep; - res_ep.url = ep_url; - + for (auto& [ep_url, res_ep] : resolved_endpoints) { // parse URL boost::system::result r = boost::urls::parse_uri(ep_url); if (r.has_error()) { @@ -78,8 +73,6 @@ void RGWRESTConn::resolve_endpoints() { ldout(cct, 0) << "WARNING: RGWRESTConn no IP addresses found for endpoint=" << ep_url << (ec ? " err=" + ec.message() : "") << dendl; } - - resolved_endpoints.emplace(ep_url, std::move(res_ep)); } } @@ -89,19 +82,17 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct, rgw::sal::Driver* driver, std::optional _api_name, HostStyle _host_style) : cct(_cct), - endpoints(remote_endpoints.begin(), remote_endpoints.end()), + endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()), remote_id(_remote_id), api_name(_api_name), host_style(_host_style) { - endpoints_status.reserve(remote_endpoints.size()); - std::for_each(remote_endpoints.begin(), remote_endpoints.end(), - [this](const auto& url) { - this->endpoints_status.emplace(url, ceph::real_clock::zero()); - }); - if (cct->_conf->rgw_rest_conn_connect_to_resolved_ips) { - resolve_endpoints(); + resolved_endpoints.reserve(remote_endpoints.size()); + for (const auto& ep_url : remote_endpoints) { + ResolvedEndpoint& res_ep = resolved_endpoints[ep_url]; + res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable } + resolve_endpoints(); if (driver) { key = driver->get_zone()->get_system_key(); @@ -117,45 +108,41 @@ RGWRESTConn::RGWRESTConn(CephContext *_cct, std::optional _api_name, HostStyle _host_style) : cct(_cct), - endpoints(remote_endpoints.begin(), remote_endpoints.end()), + endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()), key(_cred), self_zone_group(_zone_group), remote_id(_remote_id), api_name(_api_name), host_style(_host_style) { - endpoints_status.reserve(remote_endpoints.size()); - std::for_each(remote_endpoints.begin(), remote_endpoints.end(), - [this](const auto& url) { - this->endpoints_status.emplace(url, ceph::real_clock::zero()); - }); - if (cct->_conf->rgw_rest_conn_connect_to_resolved_ips) { - resolve_endpoints(); + resolved_endpoints.reserve(remote_endpoints.size()); + for (const auto& ep_url : remote_endpoints) { + ResolvedEndpoint& res_ep = resolved_endpoints[ep_url]; + res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable } + resolve_endpoints(); } RGWRESTConn::RGWRESTConn(RGWRESTConn&& other) : cct(other.cct), - endpoints(std::move(other.endpoints)), + endpoint_urls(std::move(other.endpoint_urls)), + endpoint_urls_counter(other.endpoint_urls_counter.load()), resolved_endpoints(std::move(other.resolved_endpoints)), - endpoints_status(std::move(other.endpoints_status)), key(std::move(other.key)), self_zone_group(std::move(other.self_zone_group)), - remote_id(std::move(other.remote_id)), - counter(other.counter.load()) + remote_id(std::move(other.remote_id)) { } RGWRESTConn& RGWRESTConn::operator=(RGWRESTConn&& other) { cct = other.cct; - endpoints = std::move(other.endpoints); + endpoint_urls = std::move(other.endpoint_urls); + endpoint_urls_counter = other.endpoint_urls_counter.load(); resolved_endpoints = std::move(other.resolved_endpoints); - endpoints_status = std::move(other.endpoints_status); key = std::move(other.key); self_zone_group = std::move(other.self_zone_group); remote_id = std::move(other.remote_id); - counter = other.counter.load(); return *this; } @@ -179,25 +166,25 @@ void RGWRESTConn::get_connect_to_mapping_for_url(RGWEndpoint& endpoint) int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint) { - if (endpoints.empty()) { + if (endpoint_urls.empty()) { ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl; return -EINVAL; } size_t num = 0; - while (num < endpoints.size()) { - int i = ++counter; + while (num < endpoint_urls.size()) { + int i = ++endpoint_urls_counter; - const string& ep_url = endpoints[i % endpoints.size()]; + const string& ep_url = endpoint_urls[i % endpoint_urls.size()]; endpoint.set_url(ep_url); - if (endpoints_status.find(ep_url) == endpoints_status.end()) { + if (resolved_endpoints.find(ep_url) == resolved_endpoints.end()) { ldout(cct, 1) << "ERROR: missing status for endpoint " << ep_url << dendl; num++; continue; } - const auto& upd_time = endpoints_status[ep_url].load(); + const auto& upd_time = resolved_endpoints[ep_url].status.load(); if (ceph::real_clock::is_zero(upd_time)) { break; @@ -212,14 +199,14 @@ int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint) static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2; if (diff >= CONN_STATUS_EXPIRE_SECS) { - endpoints_status[ep_url].store(ceph::real_clock::zero()); + resolved_endpoints[ep_url].status.store(ceph::real_clock::zero()); ldout(cct, 10) << "endpoint " << endpoint.get_url() << " unconnectable status expired. mark it connectable" << dendl; break; } num++; }; - if (num == endpoints.size()) { + if (num == endpoint_urls.size()) { ldout(cct, 5) << "ERROR: no valid endpoint" << dendl; return -EINVAL; } @@ -242,13 +229,13 @@ void RGWRESTConn::set_endpoint_unconnectable(const RGWEndpoint& endpoint) { const string& url = endpoint.get_url(); - if (url.empty() || endpoints_status.find(url) == endpoints_status.end()) { + if (url.empty() || resolved_endpoints.find(url) == resolved_endpoints.end()) { ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status. endpoint=" << url << dendl; return; } - endpoints_status[url].store(ceph::real_clock::now()); + resolved_endpoints[url].status.store(ceph::real_clock::now()); ldout(cct, 10) << "set endpoint unconnectable. url=" << url << dendl; } diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index f29151c97c7..700d06f5407 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -69,27 +69,57 @@ struct ResolvedEndpoint { std::string url; // e.g., "https://s3.abc.com:8443" std::string scheme; // e.g., "https" std::string host; // e.g., "s3.abc.com" - int port = -1; // e.g., 443 + int port = -1; // e.g., 8443 std::vector ips; // the IPs the endpoint resolves to std::vector connect_to_strings; // Pre-computed full connect_to strings for each IP size_t rr_index = 0; // round-robin index for IPs + + /* endpoint health state: the endpoint is not able to connect if the timestamp is not real_clock::zero */ + std::atomic status; + + ResolvedEndpoint() = default; + + // Custom move constructor (required because std::atomic is not movable) + ResolvedEndpoint(ResolvedEndpoint&& other) noexcept + : url(std::move(other.url)), + scheme(std::move(other.scheme)), + host(std::move(other.host)), + port(other.port), + ips(std::move(other.ips)), + connect_to_strings(std::move(other.connect_to_strings)), + rr_index(other.rr_index), + status(other.status.load()) + {} + + // Custom move assignment (required because std::atomic is not movable) + ResolvedEndpoint& operator=(ResolvedEndpoint&& other) noexcept { + url = std::move(other.url); + scheme = std::move(other.scheme); + host = std::move(other.host); + port = other.port; + ips = std::move(other.ips); + connect_to_strings = std::move(other.connect_to_strings); + rr_index = other.rr_index; + status.store(other.status.load()); + return *this; + } + + // Delete copy operations (std::atomic is not copyable) + ResolvedEndpoint(const ResolvedEndpoint&) = delete; + ResolvedEndpoint& operator=(const ResolvedEndpoint&) = delete; }; class RGWRESTConn { - /* the endpoint is not able to connect if the timestamp is not real_clock::zero */ - using endpoint_status_map = std::unordered_map>; - CephContext *cct; - std::vector endpoints; + std::vector endpoint_urls; // For ordered round-robin + std::atomic endpoint_urls_counter = { 0 }; // Round-robin counter for endpoint_urls std::unordered_map resolved_endpoints; - endpoint_status_map endpoints_status; RGWAccessKey key; std::string self_zone_group; std::string remote_id; std::optional api_name; HostStyle host_style; - std::atomic counter = { 0 }; void resolve_endpoints(void); @@ -139,7 +169,7 @@ public: CephContext *get_ctx() { return cct; } - size_t get_endpoint_count() const { return endpoints.size(); } + size_t get_endpoint_count() const { return endpoint_urls.size(); } virtual void populate_params(param_vec_t& params, const rgw_owner* uid, const std::string& zonegroup);