void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
{
+#warning cleanup
+#if 0
{
Formatter::ArraySection os(*f, "flow_groups");
for (auto& g : flow_groups) {
encode_json("group", *g, f);
}
}
+#endif
encode_json("pipe", pipe, f);
}
}
-void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe,
- rgw_sync_group_pipe_map *flow_group) {
+void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe) {
auto source_bucket = pipe.source.get_bucket();
auto dest_bucket = pipe.dest.get_bucket();
- if (!flow_group->sources.empty()) {
+ if (pipe.match_dest(zone_name, bucket)) { /* we're the dest */
auto& by_source = flow_by_source[source_bucket];
- by_source.flow_groups.push_back(flow_group);
by_source.pipe.push_back(pipe);
}
- if (!flow_group->dests.empty()) {
+ if (pipe.match_source(zone_name, bucket)) { /* we're the source */
auto& by_dest = flow_by_dest[dest_bucket];
- by_dest.flow_groups.push_back(flow_group);
by_dest.pipe.push_back(pipe);
}
}
void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
+ rgw_sync_bucket_entity entity;
+ entity.zones = std::set<string>( { zone_name } );
+ entity.bucket = bucket;
+
for (auto& item : sync_policy.groups) {
auto& group = item.second;
auto& flow_group_map = flow_groups[group.id];
false); /* just check that it's not disabled */
});
+ for (auto& entry : flow_group_map.sources) {
+ rgw_sync_bucket_pipe pipe;
+ rgw_sync_bucket_entity source;
+ pipe.source.zones = std::set<string>( { entry.first.zone } );
+ pipe.source.bucket = entry.first.bucket;
+ pipe.dest = entity;
+
+ auto& by_source = flow_by_source[pipe.source.get_bucket()];
+ by_source.pipe.push_back(pipe);
+ }
+
+ for (auto& entry : flow_group_map.dests) {
+ rgw_sync_bucket_pipe pipe;
+ rgw_sync_bucket_entity dest;
+ pipe.dest.zones = std::set<string>( { entry.first.zone } );
+ pipe.dest.bucket = entry.first.bucket;
+ pipe.source = entity;
+
+ auto& by_dest = flow_by_source[pipe.dest.get_bucket()];
+ by_dest.pipe.push_back(pipe);
+ }
+ }
+
+#if 0
if (!group.pipes.empty()) {
for (auto& pipe : group.pipes) {
- if (!pipe.contains_bucket(bucket)) {
+ if (!pipe.contains_zone_bucket(zone_name, bucket)) {
continue;
}
- update_flow_maps(pipe, &flow_group_map);
+ update_flow_maps(pipe, flow_group_map);
}
} else {
- update_flow_maps(rgw_sync_bucket_pipe(), &flow_group_map);
+ update_flow_maps(rgw_sync_bucket_pipe(), flow_group_map);
}
}
+#endif
}
return (source.match_zone(zone) || dest.match_zone(zone));
}
+ bool match_source(const string& zone, std::optional<rgw_bucket> b) const {
+ return (source.match_zone(zone) && source.match_bucket(b));
+ }
+
+ bool match_dest(const string& zone, std::optional<rgw_bucket> b) const {
+ return (dest.match_zone(zone) && dest.match_bucket(b));
+ }
+
bool contains_zone_bucket(const string& zone, std::optional<rgw_bucket> b) const {
- return ((source.match_zone(zone) && source.match_bucket(b)) ||
- (dest.match_zone(zone) && dest.match_bucket(b)));
+ return (match_source(zone, b) || match_dest(zone, b));
}
void dump(ceph::Formatter *f) const;