From: Yehuda Sadeh Date: Wed, 23 Oct 2019 01:34:46 +0000 (-0700) Subject: rgw: sync policy mgr: more fixes X-Git-Tag: v15.1.0~22^2~85 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=c5deedf771e8e8b4585a6c59cc923e60aa3232ea;p=ceph-ci.git rgw: sync policy mgr: more fixes Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 1f9ca347160..8778b8c61d9 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2320,28 +2320,13 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo return 0; } -void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f) +void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pset, Formatter *f) { Formatter::ObjectSection top_section(*f, name); Formatter::ArraySection as(*f, "entries"); - for (auto& entry : m) { - Formatter::ObjectSection os(*f, "entry"); - auto& bucket = entry.first; - auto& pflow = entry.second; - -#if 0 - encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); - { - Formatter::ArraySection fg(*f, "flow_groups"); - for (auto& flow_group : pflow.flow_groups) { - encode_json("entry", *flow_group, f); - } - } -#endif - for (auto& pipe : pflow.pipes) { - encode_json("pipe", pipe, f); - } + for (auto& pipe : pset.pipes) { + encode_json("pipe", pipe, f); } } @@ -2394,8 +2379,8 @@ static int sync_info(std::optional opt_target_zone, std::optionalreflect(eff_bucket, &sources, &dests); @@ -7885,7 +7870,7 @@ next: if (opt_cmd == OPT::SYNC_GROUP_MODIFY) { auto iter = sync_policy.groups.find(*opt_group_id); - if (iter != sync_policy.groups.end()) { + if (iter == sync_policy.groups.end()) { cerr << "ERROR: could not find group '" << *opt_group_id << "'" << std::endl; return ENOENT; } diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 87592392f5c..f910b016c74 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -10,6 +10,26 @@ #define dout_subsys ceph_subsys_rgw +ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { + os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << ",az=" << (int)e.all_zones << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { + os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { + os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set()) << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) { + os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} + static std::vector filter_relevant_pipes(const std::vector& pipes, const string& source_zone, const string& dest_zone) @@ -42,26 +62,6 @@ void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const encode_json("dests", dests, f); } -ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { - os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << "}"; - return os; -} - -ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { - os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; - return os; -} - -ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { - os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set()) << "}"; - return os; -} - -ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) { - os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}"; - return os; -} - template void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone, @@ -143,9 +143,13 @@ template void rgw_sync_group_pipe_map::init(const string& _zone, std::optional _bucket, const rgw_sync_policy_group& group, + rgw_sync_data_flow_group *_default_flow, + std::set *_pall_zones, CB filter_cb) { zone = _zone; bucket = _bucket; + default_flow = _default_flow; + pall_zones = _pall_zones; rgw_sync_bucket_entity zb(zone, bucket); @@ -160,11 +164,20 @@ void rgw_sync_group_pipe_map::init(const string& _zone, } } - if (group.data_flow.empty()) { - return; + const rgw_sync_data_flow_group *pflow; + + if (!group.data_flow.empty()) { + pflow = &group.data_flow; + } else { + if (!default_flow) { + return; + } + pflow = default_flow; } - auto& flow = group.data_flow; + auto& flow = *pflow; + + pall_zones->insert(zone); /* symmetrical */ if (flow.symmetrical) { @@ -172,6 +185,7 @@ void rgw_sync_group_pipe_map::init(const string& _zone, if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) { for (auto& z : symmetrical_group.zones) { if (z != zone) { + pall_zones->insert(z); try_add_source(z, zone, zone_pipes, filter_cb); try_add_dest(zone, z, zone_pipes, filter_cb); } @@ -184,8 +198,10 @@ void rgw_sync_group_pipe_map::init(const string& _zone, if (flow.directional) { for (auto& rule : *flow.directional) { if (rule.source_zone == zone) { + pall_zones->insert(rule.dest_zone); try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb); } else if (rule.dest_zone == zone) { + pall_zones->insert(rule.source_zone); try_add_source(rule.source_zone, zone, zone_pipes, filter_cb); } } @@ -250,7 +266,7 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& s return vector(); } -void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const +void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const { encode_json("pipes", pipes, f); } @@ -295,27 +311,20 @@ bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone, return found; } -/* - * find all the matching flows om a flow map for a specific bucket - */ -RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bucket_flow(RGWBucketSyncFlowManager::flow_map_t& m, std::optional bucket) { - if (bucket) { - auto iter = m.find(*bucket); - - if (iter != m.end()) { - return iter; - } +void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { + std::optional default_flow; + if (parent) { + default_flow.emplace(); + default_flow->init_default(parent->all_zones); } - return m.find(rgw_bucket()); -} - -void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { for (auto& item : sync_policy.groups) { auto& group = item.second; auto& flow_group_map = flow_groups[group.id]; flow_group_map.init(zone_name, bucket, group, + (default_flow ? &(*default_flow) : nullptr), + &all_zones, [&](const string& source_zone, std::optional source_bucket, const string& dest_zone, @@ -333,8 +342,8 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { } void RGWBucketSyncFlowManager::reflect(std::optional effective_bucket, - flow_map_t *flow_by_source, - flow_map_t *flow_by_dest) + RGWBucketSyncFlowManager::pipe_set *source_pipes, + RGWBucketSyncFlowManager::pipe_set *dest_pipes) { rgw_sync_bucket_entity entity; @@ -342,11 +351,17 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke entity.bucket = effective_bucket.value_or(rgw_bucket()); if (parent) { - parent->reflect(effective_bucket, flow_by_source, flow_by_dest); + parent->reflect(effective_bucket, source_pipes, dest_pipes); } for (auto& item : flow_groups) { auto& flow_group_map = item.second; + + /* only return enabled groups */ + if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) { + continue; + } + for (auto& entry : flow_group_map.sources) { rgw_sync_bucket_pipe pipe = entry.second; if (!pipe.dest.match_bucket(effective_bucket)) { @@ -356,8 +371,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); - auto& by_source = (*flow_by_source)[pipe.source.get_bucket()]; - by_source.pipes.insert(pipe); + source_pipes->pipes.insert(pipe); } for (auto& entry : flow_group_map.dests) { @@ -370,8 +384,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); - auto& by_dest = (*flow_by_dest)[pipe.dest.get_bucket()]; - by_dest.pipes.insert(pipe); + dest_pipes->pipes.insert(pipe); } } } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index f13d438bc7a..7e705c030d5 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -35,6 +35,11 @@ struct rgw_sync_group_pipe_map { zb_pipe_map_t sources; /* all the pipes where zone is pulling from */ zb_pipe_map_t dests; /* all the pipes that pull from zone */ + std::set *pall_zones{nullptr}; + rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it, + used in the case of bucket sync policy, not at the + zonegroup level */ + void dump(ceph::Formatter *f) const; template @@ -65,6 +70,8 @@ struct rgw_sync_group_pipe_map { void init(const string& _zone, std::optional _bucket, const rgw_sync_policy_group& group, + rgw_sync_data_flow_group *_default_flow, + std::set *_pall_zones, CB filter_cb); /* @@ -93,14 +100,12 @@ struct rgw_sync_group_pipe_map { class RGWBucketSyncFlowManager { public: - struct pipe_flow { + struct pipe_set { std::set pipes; void dump(ceph::Formatter *f) const; }; - using flow_map_t = map; - private: string zone_name; @@ -110,6 +115,8 @@ private: map flow_groups; + std::set all_zones; + bool allowed_data_flow(const string& source_zone, std::optional source_bucket, const string& dest_zone, @@ -119,8 +126,6 @@ private: /* * find all the matching flows om a flow map for a specific bucket */ - flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional bucket); - void update_flow_maps(const rgw_sync_bucket_pipes& pipe); public: @@ -131,8 +136,8 @@ public: void init(const rgw_sync_policy_info& sync_policy); void reflect(std::optional effective_bucket, - flow_map_t *flow_by_source, - flow_map_t *flow_by_dest); + pipe_set *flow_by_source, + pipe_set *flow_by_dest); }; diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index 1cbf79179a1..d2485aa2562 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -158,8 +158,8 @@ std::vector rgw_sync_bucket_pipes::expand() const for (auto& s : sources) { for (auto& d : dests) { rgw_sync_bucket_pipe pipe; - pipe.source = std::move(s); - pipe.dest = std::move(d); + pipe.source = s; + pipe.dest = d; result.push_back(pipe); } } @@ -208,6 +208,9 @@ void rgw_sync_data_flow_group::remove_symmetrical(const string& flow_id, std::op if (iter->id == flow_id) { if (!zones) { groups.erase(iter); + if (groups.empty()) { + symmetrical.reset(); + } return; } break; @@ -227,6 +230,9 @@ void rgw_sync_data_flow_group::remove_symmetrical(const string& flow_id, std::op if (flow_group.zones.empty()) { groups.erase(iter); } + if (groups.empty()) { + symmetrical.reset(); + } } bool rgw_sync_data_flow_group::find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group) @@ -270,11 +276,19 @@ void rgw_sync_data_flow_group::remove_directional(const string& source_zone, con if (source_zone == rule.source_zone && dest_zone == rule.dest_zone) { directional->erase(iter); + if (directional->empty()) { + directional.reset(); + } return; } } } +void rgw_sync_data_flow_group::init_default(const std::set& zones) +{ + symmetrical.emplace(); + symmetrical->push_back(rgw_sync_symmetric_group("default", zones)); +} bool rgw_sync_policy_group::find_pipe(const string& pipe_id, bool create, rgw_sync_bucket_pipes **pipe) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 9313789b8ef..57081a62aae 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -22,6 +22,11 @@ struct rgw_sync_symmetric_group { string id; std::set zones; + rgw_sync_symmetric_group() {} + rgw_sync_symmetric_group(const string& _id, + const std::set _zones) : id(_id), zones(_zones) {} + + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(id, bl); @@ -137,13 +142,17 @@ struct rgw_sync_bucket_entity { } const bool operator<(const rgw_sync_bucket_entity& e) const { - if (all_zones != e.all_zones) { - if (zone < e.zone) { - return true; - } - if (zone > e.zone) { - return false; - } + if (all_zones && !e.all_zones) { + return false; + } + if (!all_zones && e.all_zones) { + return true; + } + if (zone < e.zone) { + return true; + } + if (zone > e.zone) { + return false; } return (bucket < e.bucket); } @@ -329,6 +338,8 @@ struct rgw_sync_data_flow_group { void remove_symmetrical(const string& flow_id, std::optional > zones); bool find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group); void remove_directional(const string& source_zone, const string& dest_zone); + + void init_default(const std::set& zones); }; WRITE_CLASS_ENCODER(rgw_sync_data_flow_group)