};
-class RGWBucketSyncFlowManager {
- RGWSI_Zone *zone_svc;
- std::optional<rgw_bucket> bucket;
-
- RGWBucketSyncFlowManager *parent{nullptr};
-
- map<string, group_pipe_map> flow_groups;
-
- struct pipe_flow {
- vector<group_pipe_map *> flow_groups;
- vector<rgw_sync_bucket_pipe> pipe;
- };
-
- bool allowed_data_flow(const string& source_zone,
- std::optional<rgw_bucket> source_bucket,
- const string& dest_zone,
- std::optional<rgw_bucket> dest_bucket,
- bool check_activated) {
- bool found = false;
- bool found_activated = false;
+bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket,
+ bool check_activated) {
+ bool found = false;
+ bool found_activated = false;
- for (auto m : flow_groups) {
- auto& fm = m.second;
- auto pipes = fm.find_pipes(source_zone, source_bucket,
- dest_zone, dest_bucket);
+ for (auto m : flow_groups) {
+ auto& fm = m.second;
+ auto pipes = fm.find_pipes(source_zone, source_bucket,
+ dest_zone, dest_bucket);
- bool is_found = !pipes.empty();
+ bool is_found = !pipes.empty();
- if (is_found) {
- switch (fm.status) {
+ if (is_found) {
+ switch (fm.status) {
case rgw_sync_policy_group::Status::FORBIDDEN:
return false;
case rgw_sync_policy_group::Status::ENABLED:
break;
default:
break; /* unknown -- ignore */
- }
}
}
-
- if (check_activated && found_activated) {
- return true;
- }
-
- return found;
}
- using flow_map_t = map<rgw_bucket, pipe_flow>;
-
- flow_map_t flow_by_source;
- flow_map_t flow_by_dest;
+ if (check_activated && found_activated) {
+ return true;
+ }
+ return found;
+}
- /*
- * find all the matching flows om a flow map for a specific bucket
- */
- flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket) {
- if (bucket) {
- auto iter = m.find(*bucket);
+/*
+ * find all the matching flows om a flow map for a specific bucket
+ */
+RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bucket_flow(RGWBucketSyncFlowManager::flow_map_t& m, std::optional<rgw_bucket> bucket) {
+ if (bucket) {
+ auto iter = m.find(*bucket);
- if (iter != m.end()) {
- return iter;
- }
+ if (iter != m.end()) {
+ return iter;
}
-
- return m.find(rgw_bucket());
}
+ return m.find(rgw_bucket());
+}
- void update_flow_maps(const rgw_sync_bucket_pipe& pipe,
- group_pipe_map *flow_group) {
- auto source_bucket = pipe.source.get_bucket();
- auto dest_bucket = pipe.dest.get_bucket();
- if (!bucket ||
- *bucket != source_bucket) {
- auto& by_source = flow_by_source[source_bucket];
- by_source.flow_groups.push_back(flow_group);
- by_source.pipe.push_back(pipe);
- }
+void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe,
+ group_pipe_map *flow_group) {
+ auto source_bucket = pipe.source.get_bucket();
+ auto dest_bucket = pipe.dest.get_bucket();
- if (!bucket ||
- *bucket != dest_bucket) {
- auto& by_dest = flow_by_dest[dest_bucket];
- by_dest.flow_groups.push_back(flow_group);
- by_dest.pipe.push_back(pipe);
- }
+ if (!bucket ||
+ *bucket != source_bucket) {
+ auto& by_source = flow_by_source[source_bucket];
+ by_source.flow_groups.push_back(flow_group);
+ by_source.pipe.push_back(pipe);
}
-#warning FIXME
-#if 0
- bool allowed_sync_flow(std::optional<rgw_bucket> bucket,
- const string& source,
- const string& dest) {
- auto& zone = zone_svc->zone_name();
-
- if (source == zone) {
- for (auto& item : range) {
- auto& pf = range.second;
-
- auto& pipe = pf.pipe;
- }
- }
+ if (!bucket ||
+ *bucket != dest_bucket) {
+ auto& by_dest = flow_by_dest[dest_bucket];
+ by_dest.flow_groups.push_back(flow_group);
+ by_dest.pipe.push_back(pipe);
}
-#endif
-
-
-
-public:
-
- RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
- std::optional<rgw_bucket> _bucket,
- RGWBucketSyncFlowManager *_parent) : zone_svc(_zone_svc),
- bucket(_bucket),
- parent(_parent) {}
-
- void init(const rgw_sync_policy_info& sync_policy) {
- for (auto& item : sync_policy.groups) {
- auto& group = item.second;
- auto& flow_group_map = flow_groups[group.id];
-
- flow_group_map.init(zone_svc->zone_name(), bucket, group,
- [&](const string& source_zone,
- std::optional<rgw_bucket> source_bucket,
- const string& dest_zone,
- std::optional<rgw_bucket> dest_bucket) {
- if (!parent) {
- return true;
- }
- return parent->allowed_data_flow(source_zone,
- source_bucket,
- dest_zone,
- dest_bucket,
- false); /* just check that it's not disabled */
- });
-
- if (!group.pipes.empty()) {
- for (auto& pipe : group.pipes) {
- if (!pipe.contains_bucket(bucket)) {
- continue;
- }
+}
- update_flow_maps(pipe, &flow_group_map);
+void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
+ for (auto& item : sync_policy.groups) {
+ auto& group = item.second;
+ auto& flow_group_map = flow_groups[group.id];
+
+ flow_group_map.init(zone_svc->zone_name(), bucket, group,
+ [&](const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket) {
+ if (!parent) {
+ return true;
+ }
+ return parent->allowed_data_flow(source_zone,
+ source_bucket,
+ dest_zone,
+ dest_bucket,
+ false); /* just check that it's not disabled */
+ });
+
+ if (!group.pipes.empty()) {
+ for (auto& pipe : group.pipes) {
+ if (!pipe.contains_bucket(bucket)) {
+ continue;
}
- } else {
- update_flow_maps(rgw_sync_bucket_pipe(), &flow_group_map);
- }
- }
- }
-#warning implement me
-#if 0
- bool get_bucket_sources(const rgw_bucket& bucket,
- std::vector<rgw_sync_bucket_entity> *sources) {
- auto iter = find_bucket_flow(flow_by_dest, bucket);
- if (iter == flow_by_dest.end()) {
- if (!parent) {
- return false;
+ update_flow_maps(pipe, &flow_group_map);
}
-
- return parent->get_bucket_sources(bucket, sources);
+ } else {
+ update_flow_maps(rgw_sync_bucket_pipe(), &flow_group_map);
}
-
- auto& pipe_flow = iter->second;
-
-
}
-#endif
-};
+}
+RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+ std::optional<rgw_bucket> _bucket,
+ RGWBucketSyncFlowManager *_parent) : zone_svc(_zone_svc),
+ bucket(_bucket),
+ parent(_parent) {}
int RGWBucketSyncPolicyHandler::init()
{
#warning FIXME
#include "rgw_common.h"
class RGWSI_Zone;
+struct group_pipe_map;
+struct rgw_sync_bucket_pipe;;
+struct rgw_sync_policy_info;
+class RGWBucketSyncFlowManager {
+ RGWSI_Zone *zone_svc;
+ std::optional<rgw_bucket> bucket;
+
+ RGWBucketSyncFlowManager *parent{nullptr};
+
+ map<string, group_pipe_map> flow_groups;
+
+ struct pipe_flow {
+ vector<group_pipe_map *> flow_groups;
+ vector<rgw_sync_bucket_pipe> pipe;
+ };
+
+ bool allowed_data_flow(const string& source_zone,
+ std::optional<rgw_bucket> source_bucket,
+ const string& dest_zone,
+ std::optional<rgw_bucket> dest_bucket,
+ bool check_activated);
+
+ using flow_map_t = map<rgw_bucket, pipe_flow>;
+
+ flow_map_t flow_by_source;
+ flow_map_t flow_by_dest;
+
+ /*
+ * find all the matching flows om a flow map for a specific bucket
+ */
+ flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
+
+ void update_flow_maps(const rgw_sync_bucket_pipe& pipe,
+ group_pipe_map *flow_group);
+
+public:
+
+ RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+ std::optional<rgw_bucket> _bucket,
+ RGWBucketSyncFlowManager *_parent);
+
+ void init(const rgw_sync_policy_info& sync_policy);
+};
class RGWBucketSyncPolicyHandler {
RGWSI_Zone *zone_svc;