return 0;
}
-void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f)
+void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pset, Formatter *f)
{
Formatter::ObjectSection top_section(*f, name);
Formatter::ArraySection as(*f, "entries");
- for (auto& entry : m) {
- Formatter::ObjectSection os(*f, "entry");
- auto& bucket = entry.first;
- auto& pflow = entry.second;
-
-#if 0
- encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f);
- {
- Formatter::ArraySection fg(*f, "flow_groups");
- for (auto& flow_group : pflow.flow_groups) {
- encode_json("entry", *flow_group, f);
- }
- }
-#endif
- for (auto& pipe : pflow.pipes) {
- encode_json("pipe", pipe, f);
- }
+ for (auto& pipe : pset.pipes) {
+ encode_json("pipe", pipe, f);
}
}
}
}
- RGWBucketSyncFlowManager::flow_map_t sources;
- RGWBucketSyncFlowManager::flow_map_t dests;
+ RGWBucketSyncFlowManager::pipe_set sources;
+ RGWBucketSyncFlowManager::pipe_set dests;
flow_mgr->reflect(eff_bucket, &sources, &dests);
if (opt_cmd == OPT::SYNC_GROUP_MODIFY) {
auto iter = sync_policy.groups.find(*opt_group_id);
- if (iter != sync_policy.groups.end()) {
+ if (iter == sync_policy.groups.end()) {
cerr << "ERROR: could not find group '" << *opt_group_id << "'" << std::endl;
return ENOENT;
}
#define dout_subsys ceph_subsys_rgw
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
+ os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << ",az=" << (int)e.all_zones << "}";
+ return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
+ os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
+ return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
+ os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
+ return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
+ os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
+ return os;
+}
+
static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
const string& source_zone,
const string& dest_zone)
encode_json("dests", dests, f);
}
-ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
- os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << "}";
- return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
- os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
- return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
- os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
- return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
- os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
- return os;
-}
-
template <typename CB1, typename CB2>
void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone,
void rgw_sync_group_pipe_map::init(const string& _zone,
std::optional<rgw_bucket> _bucket,
const rgw_sync_policy_group& group,
+ rgw_sync_data_flow_group *_default_flow,
+ std::set<std::string> *_pall_zones,
CB filter_cb) {
zone = _zone;
bucket = _bucket;
+ default_flow = _default_flow;
+ pall_zones = _pall_zones;
rgw_sync_bucket_entity zb(zone, bucket);
}
}
- if (group.data_flow.empty()) {
- return;
+ const rgw_sync_data_flow_group *pflow;
+
+ if (!group.data_flow.empty()) {
+ pflow = &group.data_flow;
+ } else {
+ if (!default_flow) {
+ return;
+ }
+ pflow = default_flow;
}
- auto& flow = group.data_flow;
+ auto& flow = *pflow;
+
+ pall_zones->insert(zone);
/* symmetrical */
if (flow.symmetrical) {
if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
for (auto& z : symmetrical_group.zones) {
if (z != zone) {
+ pall_zones->insert(z);
try_add_source(z, zone, zone_pipes, filter_cb);
try_add_dest(zone, z, zone_pipes, filter_cb);
}
if (flow.directional) {
for (auto& rule : *flow.directional) {
if (rule.source_zone == zone) {
+ pall_zones->insert(rule.dest_zone);
try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
} else if (rule.dest_zone == zone) {
+ pall_zones->insert(rule.source_zone);
try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
}
}
return vector<rgw_sync_bucket_pipe>();
}
-void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
+void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
{
encode_json("pipes", pipes, f);
}
return found;
}
-/*
- * find all the matching flows om a flow map for a specific bucket
- */
-RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bucket_flow(RGWBucketSyncFlowManager::flow_map_t& m, std::optional<rgw_bucket> bucket) {
- if (bucket) {
- auto iter = m.find(*bucket);
-
- if (iter != m.end()) {
- return iter;
- }
+void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
+ std::optional<rgw_sync_data_flow_group> default_flow;
+ if (parent) {
+ default_flow.emplace();
+ default_flow->init_default(parent->all_zones);
}
- return m.find(rgw_bucket());
-}
-
-void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
for (auto& item : sync_policy.groups) {
auto& group = item.second;
auto& flow_group_map = flow_groups[group.id];
flow_group_map.init(zone_name, bucket, group,
+ (default_flow ? &(*default_flow) : nullptr),
+ &all_zones,
[&](const string& source_zone,
std::optional<rgw_bucket> source_bucket,
const string& dest_zone,
}
void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
- flow_map_t *flow_by_source,
- flow_map_t *flow_by_dest)
+ RGWBucketSyncFlowManager::pipe_set *source_pipes,
+ RGWBucketSyncFlowManager::pipe_set *dest_pipes)
{
rgw_sync_bucket_entity entity;
entity.bucket = effective_bucket.value_or(rgw_bucket());
if (parent) {
- parent->reflect(effective_bucket, flow_by_source, flow_by_dest);
+ parent->reflect(effective_bucket, source_pipes, dest_pipes);
}
for (auto& item : flow_groups) {
auto& flow_group_map = item.second;
+
+ /* only return enabled groups */
+ if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) {
+ continue;
+ }
+
for (auto& entry : flow_group_map.sources) {
rgw_sync_bucket_pipe pipe = entry.second;
if (!pipe.dest.match_bucket(effective_bucket)) {
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
- auto& by_source = (*flow_by_source)[pipe.source.get_bucket()];
- by_source.pipes.insert(pipe);
+ source_pipes->pipes.insert(pipe);
}
for (auto& entry : flow_group_map.dests) {
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
- auto& by_dest = (*flow_by_dest)[pipe.dest.get_bucket()];
- by_dest.pipes.insert(pipe);
+ dest_pipes->pipes.insert(pipe);
}
}
}
zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
zb_pipe_map_t dests; /* all the pipes that pull from zone */
+ std::set<string> *pall_zones{nullptr};
+ rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it,
+ used in the case of bucket sync policy, not at the
+ zonegroup level */
+
void dump(ceph::Formatter *f) const;
template <typename CB1, typename CB2>
void init(const string& _zone,
std::optional<rgw_bucket> _bucket,
const rgw_sync_policy_group& group,
+ rgw_sync_data_flow_group *_default_flow,
+ std::set<std::string> *_pall_zones,
CB filter_cb);
/*
class RGWBucketSyncFlowManager {
public:
- struct pipe_flow {
+ struct pipe_set {
std::set<rgw_sync_bucket_pipe> pipes;
void dump(ceph::Formatter *f) const;
};
- using flow_map_t = map<rgw_bucket, pipe_flow>;
-
private:
string zone_name;
map<string, rgw_sync_group_pipe_map> flow_groups;
+ std::set<std::string> all_zones;
+
bool allowed_data_flow(const string& source_zone,
std::optional<rgw_bucket> source_bucket,
const string& dest_zone,
/*
* find all the matching flows om a flow map for a specific bucket
*/
- flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
-
void update_flow_maps(const rgw_sync_bucket_pipes& pipe);
public:
void init(const rgw_sync_policy_info& sync_policy);
void reflect(std::optional<rgw_bucket> effective_bucket,
- flow_map_t *flow_by_source,
- flow_map_t *flow_by_dest);
+ pipe_set *flow_by_source,
+ pipe_set *flow_by_dest);
};