From d7ff4e6986cab9a26b03e4796b100b8341ba774f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 18 Oct 2019 18:49:28 -0700 Subject: [PATCH] rgw: bucket flow manager initialization changes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 2 ++ src/rgw/rgw_bucket_sync.cc | 47 ++++++++++++++++++++++++++++++-------- src/rgw/rgw_bucket_sync.h | 6 ++--- src/rgw/rgw_sync_policy.h | 11 +++++++-- 4 files changed, 51 insertions(+), 15 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 8b5baed2a29..2739b0c87f2 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2331,12 +2331,14 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m auto& pflow = entry.second; encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f); +#if 0 { Formatter::ArraySection fg(*f, "flow_groups"); for (auto& flow_group : pflow.flow_groups) { encode_json("entry", *flow_group, f); } } +#endif encode_json("pipes", pflow.pipe, f); } } diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index fda52b1bdeb..7cd007cf98b 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -481,12 +481,15 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& s void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const { +#warning cleanup +#if 0 { Formatter::ArraySection os(*f, "flow_groups"); for (auto& g : flow_groups) { encode_json("group", *g, f); } } +#endif encode_json("pipe", pipe, f); } @@ -546,20 +549,17 @@ RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bu } -void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe, - rgw_sync_group_pipe_map *flow_group) { +void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe) { auto source_bucket = pipe.source.get_bucket(); auto dest_bucket = pipe.dest.get_bucket(); - if (!flow_group->sources.empty()) { + if (pipe.match_dest(zone_name, bucket)) { /* we're the dest */ auto& by_source = flow_by_source[source_bucket]; - by_source.flow_groups.push_back(flow_group); by_source.pipe.push_back(pipe); } - if (!flow_group->dests.empty()) { + if (pipe.match_source(zone_name, bucket)) { /* we're the source */ auto& by_dest = flow_by_dest[dest_bucket]; - by_dest.flow_groups.push_back(flow_group); by_dest.pipe.push_back(pipe); } @@ -581,6 +581,10 @@ void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe } void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { + rgw_sync_bucket_entity entity; + entity.zones = std::set( { zone_name } ); + entity.bucket = bucket; + for (auto& item : sync_policy.groups) { auto& group = item.second; auto& flow_group_map = flow_groups[group.id]; @@ -600,18 +604,43 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { false); /* just check that it's not disabled */ }); + for (auto& entry : flow_group_map.sources) { + rgw_sync_bucket_pipe pipe; + rgw_sync_bucket_entity source; + pipe.source.zones = std::set( { entry.first.zone } ); + pipe.source.bucket = entry.first.bucket; + pipe.dest = entity; + + auto& by_source = flow_by_source[pipe.source.get_bucket()]; + by_source.pipe.push_back(pipe); + } + + for (auto& entry : flow_group_map.dests) { + rgw_sync_bucket_pipe pipe; + rgw_sync_bucket_entity dest; + pipe.dest.zones = std::set( { entry.first.zone } ); + pipe.dest.bucket = entry.first.bucket; + pipe.source = entity; + + auto& by_dest = flow_by_source[pipe.dest.get_bucket()]; + by_dest.pipe.push_back(pipe); + } + } + +#if 0 if (!group.pipes.empty()) { for (auto& pipe : group.pipes) { - if (!pipe.contains_bucket(bucket)) { + if (!pipe.contains_zone_bucket(zone_name, bucket)) { continue; } - update_flow_maps(pipe, &flow_group_map); + update_flow_maps(pipe, flow_group_map); } } else { - update_flow_maps(rgw_sync_bucket_pipe(), &flow_group_map); + update_flow_maps(rgw_sync_bucket_pipe(), flow_group_map); } } +#endif } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index d9fcedbd2fd..b6d3f4400f2 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -116,8 +116,7 @@ struct rgw_sync_group_pipe_map { class RGWBucketSyncFlowManager { public: struct pipe_flow { - vector flow_groups; - vector pipe; + std::vector pipe; void dump(ceph::Formatter *f) const; }; @@ -147,8 +146,7 @@ private: */ flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional bucket); - void update_flow_maps(const rgw_sync_bucket_pipe& pipe, - rgw_sync_group_pipe_map *flow_group); + void update_flow_maps(const rgw_sync_bucket_pipe& pipe); public: diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 1093cef0c66..fdb803fd75c 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -331,9 +331,16 @@ public: return (source.match_zone(zone) || dest.match_zone(zone)); } + bool match_source(const string& zone, std::optional b) const { + return (source.match_zone(zone) && source.match_bucket(b)); + } + + bool match_dest(const string& zone, std::optional b) const { + return (dest.match_zone(zone) && dest.match_bucket(b)); + } + bool contains_zone_bucket(const string& zone, std::optional b) const { - return ((source.match_zone(zone) && source.match_bucket(b)) || - (dest.match_zone(zone) && dest.match_bucket(b))); + return (match_source(zone, b) || match_dest(zone, b)); } void dump(ceph::Formatter *f) const; -- 2.39.5