using namespace std;
void RGWRESTConn::resolve_endpoints() {
- resolved_endpoints.reserve(endpoints.size());
-
- for (const auto& ep_url : endpoints) {
- ResolvedEndpoint res_ep;
- res_ep.url = ep_url;
-
+ for (auto& [ep_url, res_ep] : resolved_endpoints) {
// parse URL
boost::system::result<boost::urls::url_view> r = boost::urls::parse_uri(ep_url);
if (r.has_error()) {
ldout(cct, 0) << "WARNING: RGWRESTConn no IP addresses found for endpoint=" << ep_url
<< (ec ? " err=" + ec.message() : "") << dendl;
}
-
- resolved_endpoints.emplace(ep_url, std::move(res_ep));
}
}
std::optional<string> _api_name,
HostStyle _host_style)
: cct(_cct),
- endpoints(remote_endpoints.begin(), remote_endpoints.end()),
+ endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()),
remote_id(_remote_id),
api_name(_api_name),
host_style(_host_style)
{
- endpoints_status.reserve(remote_endpoints.size());
- std::for_each(remote_endpoints.begin(), remote_endpoints.end(),
- [this](const auto& url) {
- this->endpoints_status.emplace(url, ceph::real_clock::zero());
- });
- if (cct->_conf->rgw_rest_conn_connect_to_resolved_ips) {
- resolve_endpoints();
+ resolved_endpoints.reserve(remote_endpoints.size());
+ for (const auto& ep_url : remote_endpoints) {
+ ResolvedEndpoint& res_ep = resolved_endpoints[ep_url];
+ res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
}
+ resolve_endpoints();
if (driver) {
key = driver->get_zone()->get_system_key();
std::optional<string> _api_name,
HostStyle _host_style)
: cct(_cct),
- endpoints(remote_endpoints.begin(), remote_endpoints.end()),
+ endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()),
key(_cred),
self_zone_group(_zone_group),
remote_id(_remote_id),
api_name(_api_name),
host_style(_host_style)
{
- endpoints_status.reserve(remote_endpoints.size());
- std::for_each(remote_endpoints.begin(), remote_endpoints.end(),
- [this](const auto& url) {
- this->endpoints_status.emplace(url, ceph::real_clock::zero());
- });
- if (cct->_conf->rgw_rest_conn_connect_to_resolved_ips) {
- resolve_endpoints();
+ resolved_endpoints.reserve(remote_endpoints.size());
+ for (const auto& ep_url : remote_endpoints) {
+ ResolvedEndpoint& res_ep = resolved_endpoints[ep_url];
+ res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
}
+ resolve_endpoints();
}
RGWRESTConn::RGWRESTConn(RGWRESTConn&& other)
: cct(other.cct),
- endpoints(std::move(other.endpoints)),
+ endpoint_urls(std::move(other.endpoint_urls)),
+ endpoint_urls_counter(other.endpoint_urls_counter.load()),
resolved_endpoints(std::move(other.resolved_endpoints)),
- endpoints_status(std::move(other.endpoints_status)),
key(std::move(other.key)),
self_zone_group(std::move(other.self_zone_group)),
- remote_id(std::move(other.remote_id)),
- counter(other.counter.load())
+ remote_id(std::move(other.remote_id))
{
}
RGWRESTConn& RGWRESTConn::operator=(RGWRESTConn&& other)
{
cct = other.cct;
- endpoints = std::move(other.endpoints);
+ endpoint_urls = std::move(other.endpoint_urls);
+ endpoint_urls_counter = other.endpoint_urls_counter.load();
resolved_endpoints = std::move(other.resolved_endpoints);
- endpoints_status = std::move(other.endpoints_status);
key = std::move(other.key);
self_zone_group = std::move(other.self_zone_group);
remote_id = std::move(other.remote_id);
- counter = other.counter.load();
return *this;
}
int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
{
- if (endpoints.empty()) {
+ if (endpoint_urls.empty()) {
ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl;
return -EINVAL;
}
size_t num = 0;
- while (num < endpoints.size()) {
- int i = ++counter;
+ while (num < endpoint_urls.size()) {
+ int i = ++endpoint_urls_counter;
- const string& ep_url = endpoints[i % endpoints.size()];
+ const string& ep_url = endpoint_urls[i % endpoint_urls.size()];
endpoint.set_url(ep_url);
- if (endpoints_status.find(ep_url) == endpoints_status.end()) {
+ if (resolved_endpoints.find(ep_url) == resolved_endpoints.end()) {
ldout(cct, 1) << "ERROR: missing status for endpoint " << ep_url << dendl;
num++;
continue;
}
- const auto& upd_time = endpoints_status[ep_url].load();
+ const auto& upd_time = resolved_endpoints[ep_url].status.load();
if (ceph::real_clock::is_zero(upd_time)) {
break;
static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
if (diff >= CONN_STATUS_EXPIRE_SECS) {
- endpoints_status[ep_url].store(ceph::real_clock::zero());
+ resolved_endpoints[ep_url].status.store(ceph::real_clock::zero());
ldout(cct, 10) << "endpoint " << endpoint.get_url() << " unconnectable status expired. mark it connectable" << dendl;
break;
}
num++;
};
- if (num == endpoints.size()) {
+ if (num == endpoint_urls.size()) {
ldout(cct, 5) << "ERROR: no valid endpoint" << dendl;
return -EINVAL;
}
{
const string& url = endpoint.get_url();
- if (url.empty() || endpoints_status.find(url) == endpoints_status.end()) {
+ if (url.empty() || resolved_endpoints.find(url) == resolved_endpoints.end()) {
ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status. endpoint="
<< url << dendl;
return;
}
- endpoints_status[url].store(ceph::real_clock::now());
+ resolved_endpoints[url].status.store(ceph::real_clock::now());
ldout(cct, 10) << "set endpoint unconnectable. url=" << url << dendl;
}
std::string url; // e.g., "https://s3.abc.com:8443"
std::string scheme; // e.g., "https"
std::string host; // e.g., "s3.abc.com"
- int port = -1; // e.g., 443
+ int port = -1; // e.g., 8443
std::vector<std::string> ips; // the IPs the endpoint resolves to
std::vector<std::string> connect_to_strings; // Pre-computed full connect_to strings for each IP
size_t rr_index = 0; // round-robin index for IPs
+
+ /* endpoint health state: the endpoint is not able to connect if the timestamp is not real_clock::zero */
+ std::atomic<ceph::real_time> status;
+
+ ResolvedEndpoint() = default;
+
+ // Custom move constructor (required because std::atomic is not movable)
+ ResolvedEndpoint(ResolvedEndpoint&& other) noexcept
+ : url(std::move(other.url)),
+ scheme(std::move(other.scheme)),
+ host(std::move(other.host)),
+ port(other.port),
+ ips(std::move(other.ips)),
+ connect_to_strings(std::move(other.connect_to_strings)),
+ rr_index(other.rr_index),
+ status(other.status.load())
+ {}
+
+ // Custom move assignment (required because std::atomic is not movable)
+ ResolvedEndpoint& operator=(ResolvedEndpoint&& other) noexcept {
+ url = std::move(other.url);
+ scheme = std::move(other.scheme);
+ host = std::move(other.host);
+ port = other.port;
+ ips = std::move(other.ips);
+ connect_to_strings = std::move(other.connect_to_strings);
+ rr_index = other.rr_index;
+ status.store(other.status.load());
+ return *this;
+ }
+
+ // Delete copy operations (std::atomic is not copyable)
+ ResolvedEndpoint(const ResolvedEndpoint&) = delete;
+ ResolvedEndpoint& operator=(const ResolvedEndpoint&) = delete;
};
class RGWRESTConn
{
- /* the endpoint is not able to connect if the timestamp is not real_clock::zero */
- using endpoint_status_map = std::unordered_map<std::string, std::atomic<ceph::real_time>>;
-
CephContext *cct;
- std::vector<std::string> endpoints;
+ std::vector<std::string> endpoint_urls; // For ordered round-robin
+ std::atomic<int64_t> endpoint_urls_counter = { 0 }; // Round-robin counter for endpoint_urls
std::unordered_map<std::string, ResolvedEndpoint> resolved_endpoints;
- endpoint_status_map endpoints_status;
RGWAccessKey key;
std::string self_zone_group;
std::string remote_id;
std::optional<std::string> api_name;
HostStyle host_style;
- std::atomic<int64_t> counter = { 0 };
void resolve_endpoints(void);
CephContext *get_ctx() {
return cct;
}
- size_t get_endpoint_count() const { return endpoints.size(); }
+ size_t get_endpoint_count() const { return endpoint_urls.size(); }
virtual void populate_params(param_vec_t& params, const rgw_owner* uid, const std::string& zonegroup);