return std::move(result);
}
+static void get_hint_entities(const std::set<string>& zone_names, const std::set<rgw_bucket>& buckets,
+ std::set<rgw_sync_bucket_entity> *hint_entities)
+{
+ for (auto& zone : zone_names) {
+ for (auto& b : buckets) {
+ string zid;
+ if (!store->svc()->zone->find_zone_id_by_name(zone, &zid)) {
+ cerr << "WARNING: cannot find zone id for zone=" << zone << ", skippping" << std::endl;
+ continue;
+ }
+
+ RGWBucketInfo hint_bucket_info;
+ rgw_bucket hint_bucket;
+ int ret = init_bucket(b, hint_bucket_info, hint_bucket);
+ if (ret < 0) {
+ ldout(store->ctx(), 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
+ continue;
+ }
+
+ hint_entities->insert(rgw_sync_bucket_entity(zid, hint_bucket));
+ }
+ }
+}
+
static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
{
+ string zone_name;
std::optional<string> zone_id;
if (opt_target_zone) {
cerr << "WARNING: cannot find zone id for zone=" << *opt_target_zone << std::endl;
return -ENOENT;
}
+ zone_name = *opt_target_zone;
zone_id = zid;
+ } else {
+ zone_name = store->svc ()->zone->zone_name();
}
auto zone_policy_handler = store->svc()->zone->get_sync_policy_handler(zone_id);
auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints());
auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints());
- RGWBucketSyncFlowManager::pipe_set *resolved_sources;
- RGWBucketSyncFlowManager::pipe_set *resolved_dests;
+ RGWBucketSyncFlowManager::pipe_set resolved_sources;
+ RGWBucketSyncFlowManager::pipe_set resolved_dests;
- for (auto& b : handler->get_source_hints()) {
- RGWBucketInfo hint_bucket_info;
- rgw_bucket hint_bucket;
- int ret = init_bucket(b, hint_bucket_info, hint_bucket);
- if (ret < 0) {
- ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
- continue;
+ rgw_sync_bucket_entity self_entity(zone_name, opt_bucket);
+
+ set<string> source_zones;
+ set<string> target_zones;
+
+ zone_policy_handler->reflect(nullptr, nullptr,
+ nullptr, nullptr,
+ &source_zones,
+ &target_zones,
+ false); /* relaxed: also get all zones that we allow to sync to/from */
+
+ std::set<rgw_sync_bucket_entity> hint_entities;
+
+ get_hint_entities(source_zones, handler->get_source_hints(), &hint_entities);
+ get_hint_entities(target_zones, handler->get_target_hints(), &hint_entities);
+
+ for (auto& hint_entity : hint_entities) {
+ if (!hint_entity.zone ||
+ !hint_entity.bucket) {
+ continue; /* shouldn't really happen */
}
+ auto& zid = *hint_entity.zone;
+ auto& hint_bucket = *hint_entity.bucket;
+
RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
- hint_bucket_handler.reset(handler->alloc_child(hint_bucket_indo));
+ int r = store->ctl()->bucket->get_sync_policy_handler(zid, hint_bucket, &hint_bucket_handler, null_yield);
+ if (r < 0) {
+ ldout(store->ctx(), 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl;
+ continue;
+ }
+ hint_bucket_handler->get_pipes(&resolved_dests,
+ &resolved_sources,
+ self_entity); /* flipping resolved dests and sources as these are
+ relative to the remote entity */
}
{
}
{
Formatter::ObjectSection resolved_hints_section(*formatter, "resolved-hints");
- encode_json("resolved-hints", *sources, formatter);
- encode_json("dests", *dests, formatter);
+ encode_json("sources", resolved_sources, formatter);
+ encode_json("dests", resolved_dests, formatter);
}
}
void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
RGWBucketSyncFlowManager::pipe_set *source_pipes,
RGWBucketSyncFlowManager::pipe_set *dest_pipes,
- std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const
{
entity.bucket = effective_bucket.value_or(rgw_bucket());
if (parent) {
- parent->reflect(effective_bucket, source_pipes, dest_pipes, filter_peer_bucket, only_enabled);
+ parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled);
}
for (auto& item : flow_groups) {
if (!pipe.dest.match_bucket(effective_bucket)) {
continue;
}
- if (!pipe.source.match_bucket(filter_peer_bucket)) {
- continue;
- }
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
if (!pipe.source.match_bucket(effective_bucket)) {
continue;
}
- if (!pipe.dest.match_bucket(filter_peer_bucket)) {
- continue;
- }
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy);
}
-int RGWBucketSyncPolicyHandler::init(std::optional<rgw_bucket> filter_peer_bucket,
- optional_yield y)
+int RGWBucketSyncPolicyHandler::init(optional_yield y)
{
int r = bucket_sync_svc->get_bucket_sync_hints(bucket.value_or(rgw_bucket()),
&source_hints,
&targets,
&source_zones,
&target_zones,
- filter_peer_bucket,
true);
return 0;
map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
std::set<string> *psource_zones,
std::set<string> *ptarget_zones,
- std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const
{
RGWBucketSyncFlowManager::pipe_set _sources_by_name;
std::set<string> _source_zones;
std::set<string> _target_zones;
- flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, filter_peer_bucket, only_enabled);
+ flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
for (auto& pipe : _sources_by_name.pipes) {
if (!pipe.source.zone) {
}
}
+void RGWBucketSyncPolicyHandler::get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
+ std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes (with zone name) */
+ if (!filter_peer) {
+ *sources = sources_by_name;
+ *targets = targets_by_name;
+ return;
+ }
+
+ auto& filter = *filter_peer;
+
+ for (auto& source_pipe : sources_by_name.pipes) {
+ if (source_pipe.source.match(filter)) {
+ sources->pipes.insert(source_pipe);
+ }
+ }
+
+ for (auto& target_pipe : targets_by_name.pipes) {
+ if (target_pipe.dest.match(filter)) {
+ targets->pipes.insert(target_pipe);
+ }
+ }
+}
+
bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
{
if (!bucket) {
void reflect(std::optional<rgw_bucket> effective_bucket,
pipe_set *flow_by_source,
pipe_set *flow_by_dest,
- std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const;
};
RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket,
std::optional<rgw_sync_policy_info> sync_policy) const;
- int init(std::optional<rgw_bucket> filter_peer_bucket, optional_yield y);
+ int init(optional_yield y);
void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name,
RGWBucketSyncFlowManager::pipe_set *ptargets_by_name,
map<string, RGWBucketSyncFlowManager::pipe_set> *ptargets,
std::set<string> *psource_zones,
std::set<string> *ptarget_zones,
- std::optional<rgw_bucket> filter_peer_bucket,
bool only_enabled) const;
const std::set<string>& get_source_zones() const {
*sources = &sources_by_name;
*targets = &targets_by_name;
}
+ void get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
+ std::optional<rgw_sync_bucket_entity> filter_peer);
const std::set<rgw_bucket>& get_source_hints() const {
return source_hints;
zone = z;
}
+ static bool match_bucket_id(const string& bid1, const string& bid2) {
+ return (bid1.empty() || bid2.empty() || (bid1 == bid2));
+ }
+
bool match_bucket(std::optional<rgw_bucket> b) const {
if (!b) {
return true;
return (match_str(bucket->tenant, b->tenant) &&
match_str(bucket->name, b->name) &&
- match_str(bucket->bucket_id, b->bucket_id));
+ match_bucket_id(bucket->bucket_id, b->bucket_id));
+ }
+
+ bool match(const rgw_sync_bucket_entity& entity) const {
+ if (!entity.zone) {
+ return match_bucket(entity.bucket);
+ }
+ return (match_zone(*entity.zone) && match_bucket(entity.bucket));
}
const bool operator<(const rgw_sync_bucket_entity& e) const {
return r;
}
- index.get_entities(bucket, dests);
+ index.get_entities(bucket, sources);
if (!bucket.bucket_id.empty()) {
rgw_bucket b = bucket;
b.bucket_id.clear();
- index.get_entities(bucket, dests);
+ index.get_entities(b, sources);
}
}
if (!bucket.bucket_id.empty()) {
rgw_bucket b = bucket;
b.bucket_id.clear();
- index.get_entities(bucket, dests);
+ index.get_entities(b, dests);
}
}