using namespace std;
void RGWRESTConn::resolve_endpoints() {
- for (auto& [ep_url, res_ep] : resolved_endpoints) {
+ for (auto& res_ep : resolved_endpoints) {
+ const std::string& ep_url = res_ep.url;
// parse URL
boost::system::result<boost::urls::url_view> r = boost::urls::parse_uri(ep_url);
if (r.has_error()) {
std::optional<string> _api_name,
HostStyle _host_style)
: cct(_cct),
- endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()),
remote_id(_remote_id),
api_name(_api_name),
host_style(_host_style)
{
resolved_endpoints.reserve(remote_endpoints.size());
for (const auto& ep_url : remote_endpoints) {
- ResolvedEndpoint& res_ep = resolved_endpoints[ep_url];
+ ResolvedEndpoint res_ep;
+ res_ep.url = ep_url;
res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
+ resolved_endpoints.push_back(std::move(res_ep));
}
resolve_endpoints();
std::optional<string> _api_name,
HostStyle _host_style)
: cct(_cct),
- endpoint_urls(remote_endpoints.begin(), remote_endpoints.end()),
key(_cred),
self_zone_group(_zone_group),
remote_id(_remote_id),
{
resolved_endpoints.reserve(remote_endpoints.size());
for (const auto& ep_url : remote_endpoints) {
- ResolvedEndpoint& res_ep = resolved_endpoints[ep_url];
+ ResolvedEndpoint res_ep;
+ res_ep.url = ep_url;
res_ep.status.store(ceph::real_clock::zero()); // Initial status: connectable
+ resolved_endpoints.push_back(std::move(res_ep));
}
resolve_endpoints();
}
RGWRESTConn::RGWRESTConn(RGWRESTConn&& other)
: cct(other.cct),
- endpoint_urls(std::move(other.endpoint_urls)),
- endpoint_urls_counter(other.endpoint_urls_counter.load()),
+ endpoint_round_robin_counter(other.endpoint_round_robin_counter.load()),
resolved_endpoints(std::move(other.resolved_endpoints)),
key(std::move(other.key)),
self_zone_group(std::move(other.self_zone_group)),
RGWRESTConn& RGWRESTConn::operator=(RGWRESTConn&& other)
{
cct = other.cct;
- endpoint_urls = std::move(other.endpoint_urls);
- endpoint_urls_counter = other.endpoint_urls_counter.load();
+ endpoint_round_robin_counter = other.endpoint_round_robin_counter.load();
resolved_endpoints = std::move(other.resolved_endpoints);
key = std::move(other.key);
self_zone_group = std::move(other.self_zone_group);
return *this;
}
-void RGWRESTConn::get_connect_to_mapping_for_url(RGWEndpoint& endpoint)
+ResolvedEndpoint* RGWRESTConn::find_resolved_endpoint(const std::string& url)
+{
+ for (auto& res_ep : resolved_endpoints) {
+ if (res_ep.url == url) {
+ return &res_ep;
+ }
+ }
+ return nullptr;
+}
+
+void RGWRESTConn::populate_connect_to(RGWEndpoint& endpoint, ResolvedEndpoint& resolved_endpoint)
{
if (!cct->_conf->rgw_rest_conn_connect_to_resolved_ips) {
return;
}
- std::string connect_to;
-
- auto it = resolved_endpoints.find(endpoint.get_url());
- if (it != resolved_endpoints.end() && !it->second.connect_to_strings.empty()) {
- auto& res_ep = it->second;
- size_t idx = res_ep.rr_index++;
- connect_to = res_ep.connect_to_strings[idx % res_ep.connect_to_strings.size()];
+ if (!resolved_endpoint.connect_to_strings.empty()) {
+ size_t idx = resolved_endpoint.rr_index++;
+ endpoint.set_connect_to(resolved_endpoint.connect_to_strings[idx % resolved_endpoint.connect_to_strings.size()]);
}
-
- endpoint.set_connect_to(connect_to);
}
int RGWRESTConn::get_endpoint(RGWEndpoint& endpoint)
{
- if (endpoint_urls.empty()) {
+ if (resolved_endpoints.empty()) {
ldout(cct, 0) << "ERROR: endpoints not configured for upstream zone" << dendl;
return -EINVAL;
}
size_t num = 0;
- while (num < endpoint_urls.size()) {
- int i = ++endpoint_urls_counter;
+ size_t selected_idx = 0;
+ while (num < resolved_endpoints.size()) {
+ int i = ++endpoint_round_robin_counter;
+ selected_idx = i % resolved_endpoints.size();
- const string& ep_url = endpoint_urls[i % endpoint_urls.size()];
+ ResolvedEndpoint& res_ep = resolved_endpoints[selected_idx];
+ const std::string& ep_url = res_ep.url;
endpoint.set_url(ep_url);
- if (resolved_endpoints.find(ep_url) == resolved_endpoints.end()) {
- ldout(cct, 1) << "ERROR: missing status for endpoint " << ep_url << dendl;
- num++;
- continue;
- }
-
- const auto& upd_time = resolved_endpoints[ep_url].status.load();
+ const auto& upd_time = res_ep.status.load();
if (ceph::real_clock::is_zero(upd_time)) {
break;
static constexpr uint32_t CONN_STATUS_EXPIRE_SECS = 2;
if (diff >= CONN_STATUS_EXPIRE_SECS) {
- resolved_endpoints[ep_url].status.store(ceph::real_clock::zero());
+ res_ep.status.store(ceph::real_clock::zero());
ldout(cct, 10) << endpoint << " unconnectable status expired. mark it connectable" << dendl;
break;
}
num++;
};
- if (num == endpoint_urls.size()) {
+ if (num == resolved_endpoints.size()) {
ldout(cct, 5) << "ERROR: no valid endpoint" << dendl;
return -EINVAL;
}
- get_connect_to_mapping_for_url(endpoint);
+ populate_connect_to(endpoint, resolved_endpoints[selected_idx]);
ldout(cct, 20) << "get_endpoint picked " << endpoint << dendl;
return 0;
{
const string& orig_url = endpoint.get_original_url();
- if (orig_url.empty() || resolved_endpoints.find(orig_url) == resolved_endpoints.end()) {
+ ResolvedEndpoint* res_ep = find_resolved_endpoint(orig_url);
+ if (orig_url.empty() || !res_ep) {
ldout(cct, 0) << "ERROR: endpoint is not a valid or doesn't have status: "
<< endpoint << dendl;
return;
}
- resolved_endpoints[orig_url].status.store(ceph::real_clock::now());
+ res_ep->status.store(ceph::real_clock::now());
ldout(cct, 10) << "set endpoint unconnectable. url=" << orig_url << dendl;
}
class RGWRESTConn
{
CephContext *cct;
- std::vector<std::string> endpoint_urls; // For ordered round-robin
- std::atomic<int64_t> endpoint_urls_counter = { 0 }; // Round-robin counter for endpoint_urls
- std::unordered_map<std::string, ResolvedEndpoint> resolved_endpoints;
+ std::atomic<int64_t> endpoint_round_robin_counter = { 0 }; // Round-robin counter for resolved_endpoints
+ std::vector<ResolvedEndpoint> resolved_endpoints;
RGWAccessKey key;
std::string self_zone_group;
std::string remote_id;
int get_endpoint(RGWEndpoint& endpoint);
RGWEndpoint get_endpoint();
- const std::unordered_map<std::string, ResolvedEndpoint>& get_resolved_endpoints() const { return resolved_endpoints; }
+ const std::vector<ResolvedEndpoint>& get_resolved_endpoints() const { return resolved_endpoints; }
+ ResolvedEndpoint* find_resolved_endpoint(const std::string& url);
void set_endpoint_unconnectable(const RGWEndpoint& endpoint);
const std::string& get_self_zonegroup() {
return self_zone_group;
CephContext *get_ctx() {
return cct;
}
- size_t get_endpoint_count() const { return endpoint_urls.size(); }
+ size_t get_endpoint_count() const { return resolved_endpoints.size(); }
virtual void populate_params(param_vec_t& params, const rgw_owner* uid, const std::string& zonegroup);
ceph::real_time *mtime, optional_yield y);
/* pick an IP to 'connect-to' given the endpoint url */
- void get_connect_to_mapping_for_url(RGWEndpoint& endpoint);
+ void populate_connect_to(RGWEndpoint& endpoint, ResolvedEndpoint& res_ep);
struct get_obj_params {
const rgw_owner *uid{nullptr};