From a19c1a4b8261d94f779fa979d7c54d92794b4415 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 22 Oct 2019 13:11:22 -0700 Subject: [PATCH] rgw: flow manager generates explicit pipes Have multiple pipes instead of having a single pipe that can describe multiple different zones have multiple pipes, each point at a single zone (or all zones). Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 39 +++++--- src/rgw/rgw_bucket_sync.cc | 179 +++++++++++++++++++++++++------------ src/rgw/rgw_bucket_sync.h | 62 ++++--------- src/rgw/rgw_json_enc.cc | 12 ++- src/rgw/rgw_sync_policy.h | 88 ++++++++++++++---- 5 files changed, 246 insertions(+), 134 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index e83ca9221d90..1f9ca3471603 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2330,8 +2330,8 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m auto& bucket = entry.first; auto& pflow = entry.second; - encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); #if 0 + encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); { Formatter::ArraySection fg(*f, "flow_groups"); for (auto& flow_group : pflow.flow_groups) { @@ -2339,7 +2339,9 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m } } #endif - encode_json("pipes", pflow.pipe, f); + for (auto& pipe : pflow.pipes) { + encode_json("pipe", pipe, f); + } } } @@ -2359,28 +2361,43 @@ static int sync_info(std::optional opt_target_zone, std::optional bucket_flow; - if (opt_bucket) { + std::optional eff_bucket = opt_bucket; + + if (eff_bucket) { rgw_bucket bucket; RGWBucketInfo bucket_info; - int ret = init_bucket(*opt_bucket, bucket_info, bucket); - if (ret < 0) { + int ret = init_bucket(*eff_bucket, bucket_info, bucket); + if (ret < 0 && ret != -ENOENT) { cerr << "ERROR: init_bucket failed: " << cpp_strerror(-ret) << std::endl; return ret; } - if (ret >= 0 && - bucket_info.sync_policy) { - bucket_flow.emplace(zone_name, opt_bucket, &zone_flow); + rgw_sync_policy_info default_policy; + rgw_sync_policy_info *policy; - bucket_flow->init(*bucket_info.sync_policy); + + if (ret == -ENOENT) { + cerr << "WARNING: bucket not found, simulating result" << std::endl; + bucket = *eff_bucket; + } else { + eff_bucket = bucket_info.bucket; + } + + if (bucket_info.sync_policy) { + policy = (rgw_sync_policy_info *)bucket_info.sync_policy.get(); + bucket_flow.emplace(zone_name, bucket, &zone_flow); + + bucket_flow->init(*policy); flow_mgr = &(*bucket_flow); } } - auto& sources = flow_mgr->get_sources(); - auto& dests = flow_mgr->get_dests(); + RGWBucketSyncFlowManager::flow_map_t sources; + RGWBucketSyncFlowManager::flow_map_t dests; + + flow_mgr->reflect(eff_bucket, &sources, &dests); { Formatter::ObjectSection os(*formatter, "result"); diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 4302ca178548..ad978e949939 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -14,6 +14,17 @@ string rgw_sync_bucket_entity::bucket_key() const { return rgw_sync_bucket_entities::bucket_key(bucket); } +void rgw_sync_bucket_entity::apply_bucket(std::optional b) +{ + if (!b) { + return; + } + + if (!bucket || + bucket->name.empty()) { + bucket = b; + } +} void rgw_sync_bucket_entities::add_zones(const std::vector& new_zones) { for (auto& z : new_zones) { @@ -31,6 +42,33 @@ void rgw_sync_bucket_entities::add_zones(const std::vector& new_zones) { } } +std::vector rgw_sync_bucket_entities::expand() const +{ + std::vector result; + rgw_bucket b = get_bucket(); + if (all_zones) { + rgw_sync_bucket_entity e; + e.all_zones = true; + e.bucket = b; + result.push_back(e); + return std::move(result); + } + + if (!zones) { + return result; + } + + for (auto& z : *zones) { + rgw_sync_bucket_entity e; + e.all_zones = false; + e.bucket = b; + e.zone = z; + result.push_back(e); + } + + return result; +} + void rgw_sync_bucket_entities::remove_zones(const std::vector& rm_zones) { all_zones = false; @@ -252,6 +290,26 @@ void rgw_sync_policy_group::remove_pipe(const string& pipe_id) } } +std::vector rgw_sync_bucket_pipes::expand() const +{ + std::vector result; + + auto sources = source.expand(); + auto dests = dest.expand(); + + for (auto& s : sources) { + for (auto& d : dests) { + rgw_sync_bucket_pipe pipe; + pipe.source = std::move(s); + pipe.dest = std::move(d); + result.push_back(pipe); + } + } + + return result; +} + + static std::vector filter_relevant_pipes(const std::vector& pipes, const string& source_zone, const string& dest_zone) @@ -272,12 +330,6 @@ static bool is_wildcard_bucket(const rgw_bucket& bucket) return bucket.name.empty(); } -void rgw_sync_group_pipe_map::zone_bucket::dump(ceph::Formatter *f) const -{ - encode_json("zone", zone, f); - encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f); -} - void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const { encode_json("zone", zone, f); @@ -286,6 +338,16 @@ void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const encode_json("dests", dests, f); } +ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) { + os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << "}"; + return os; +} + +ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) { + os << "{s=" << pipe.source << ",d=" << pipe.dest << "}"; + return os; +} + ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) { os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set()) << "}"; return os; @@ -310,12 +372,14 @@ void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone, } auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone); - for (auto& pipe : relevant_pipes) { - zone_bucket zb; - if (!call_filter_cb(pipe, &zb)) { - continue; + for (auto& pipes : relevant_pipes) { + for (auto& pipe : pipes.expand()) { + rgw_sync_bucket_entity zb; + if (!call_filter_cb(pipe, &zb)) { + continue; + } + pipe_map->insert(make_pair(zb, pipe)); } - pipe_map->insert(make_pair(zb, pipe)); } } @@ -328,8 +392,8 @@ void rgw_sync_group_pipe_map::try_add_source(const string& source_zone, return try_add_to_pipe_map(source_zone, dest_zone, pipes, &sources, filter_cb, - [&](const rgw_sync_bucket_pipes& pipe, zone_bucket *zb) { - *zb = zone_bucket{source_zone, pipe.source.get_bucket()}; + [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { + *zb = rgw_sync_bucket_entity{source_zone, pipe.source.get_bucket()}; return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket()); }); } @@ -343,8 +407,8 @@ void rgw_sync_group_pipe_map::try_add_dest(const string& source_zone, return try_add_to_pipe_map(source_zone, dest_zone, pipes, &dests, filter_cb, - [&](const rgw_sync_bucket_pipes& pipe, zone_bucket *zb) { - *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()}; + [&](const rgw_sync_bucket_pipe& pipe, rgw_sync_bucket_entity *zb) { + *zb = rgw_sync_bucket_entity{dest_zone, pipe.dest.get_bucket()}; return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket); }); } @@ -353,17 +417,17 @@ using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t; pair rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m, const string& zone, - std::optional b) + std::optional b) const { if (!b) { - return m.equal_range(zone_bucket{zone, rgw_bucket()}); + return m.equal_range(rgw_sync_bucket_entity{zone, rgw_bucket()}); } - auto zb = zone_bucket{zone, *bucket}; + auto zb = rgw_sync_bucket_entity{zone, *b}; auto range = m.equal_range(zb); if (range.first == range.second && - !is_wildcard_bucket(*bucket)) { + !is_wildcard_bucket(*b)) { /* couldn't find the specific bucket, try to find by wildcard */ zb.bucket = rgw_bucket(); range = m.equal_range(zb); @@ -381,7 +445,7 @@ void rgw_sync_group_pipe_map::init(const string& _zone, zone = _zone; bucket = _bucket; - zone_bucket zb(zone, bucket); + rgw_sync_bucket_entity zb(zone, bucket); status = group.status; @@ -429,10 +493,10 @@ void rgw_sync_group_pipe_map::init(const string& _zone, /* * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} */ -vector rgw_sync_group_pipe_map::find_source_pipes(const string& source_zone, +vector rgw_sync_group_pipe_map::find_source_pipes(const string& source_zone, std::optional source_bucket, - std::optional dest_bucket) { - vector result; + std::optional dest_bucket) const { + vector result; auto range = find_pipes(sources, source_zone, source_bucket); @@ -449,10 +513,10 @@ vector rgw_sync_group_pipe_map::find_source_pipes(const s * find all relevant pipes in other zones that pull from a specific * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} */ -vector rgw_sync_group_pipe_map::find_dest_pipes(std::optional source_bucket, +vector rgw_sync_group_pipe_map::find_dest_pipes(std::optional source_bucket, const string& dest_zone, - std::optional dest_bucket) { - vector result; + std::optional dest_bucket) const { + vector result; auto range = find_pipes(dests, dest_zone, dest_bucket); @@ -469,10 +533,10 @@ vector rgw_sync_group_pipe_map::find_dest_pipes(std::opti /* * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} */ -vector rgw_sync_group_pipe_map::find_pipes(const string& source_zone, +vector rgw_sync_group_pipe_map::find_pipes(const string& source_zone, std::optional source_bucket, const string& dest_zone, - std::optional dest_bucket) { + std::optional dest_bucket) const { if (dest_zone == zone) { return find_source_pipes(source_zone, source_bucket, dest_bucket); } @@ -481,7 +545,7 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& return find_dest_pipes(source_bucket, dest_zone, dest_bucket); } - return vector(); + return vector(); } void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const @@ -496,14 +560,15 @@ void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const } #endif - encode_json("pipe", pipe, f); + encode_json("pipes", pipes, f); } bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone, std::optional source_bucket, const string& dest_zone, std::optional dest_bucket, - bool check_activated) { + bool check_activated) const +{ bool found = false; bool found_activated = false; @@ -588,10 +653,6 @@ void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipes& pip #endif void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { - rgw_sync_bucket_entity entity; - entity.zone = zone_name; - entity.bucket = bucket.value_or(rgw_bucket()); - for (auto& item : sync_policy.groups) { auto& group = item.second; auto& flow_group_map = flow_groups[group.id]; @@ -610,44 +671,46 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { dest_bucket, false); /* just check that it's not disabled */ }); + } +} + +void RGWBucketSyncFlowManager::reflect(std::optional effective_bucket, + flow_map_t *flow_by_source, + flow_map_t *flow_by_dest) + +{ + rgw_sync_bucket_entity entity; + entity.zone = zone_name; + entity.bucket = effective_bucket.value_or(rgw_bucket()); + + if (parent) { + parent->reflect(effective_bucket, flow_by_source, flow_by_dest); + } + for (auto& item : flow_groups) { + auto& flow_group_map = item.second; for (auto& entry : flow_group_map.sources) { rgw_sync_bucket_pipe pipe; rgw_sync_bucket_entity source; - pipe.source.zone = entry.first.zone; - pipe.source.bucket = entry.first.bucket; + pipe.source = entry.first; + pipe.source.apply_bucket(effective_bucket); pipe.dest = entity; - auto& by_source = flow_by_source[pipe.source.bucket]; - by_source.pipe.push_back(pipe); + auto& by_source = (*flow_by_source)[pipe.source.get_bucket()]; + by_source.pipes.insert(pipe); } for (auto& entry : flow_group_map.dests) { rgw_sync_bucket_pipe pipe; rgw_sync_bucket_entity dest; - pipe.dest.zone = entry.first.zone; - pipe.dest.bucket = entry.first.bucket; + pipe.dest = entry.first; + pipe.dest.apply_bucket(effective_bucket); pipe.source = entity; - auto& by_dest = flow_by_source[pipe.dest.bucket]; - by_dest.pipe.push_back(pipe); + auto& by_dest = (*flow_by_dest)[pipe.dest.get_bucket()]; + by_dest.pipes.insert(pipe); } } - -#if 0 - if (!group.pipes.empty()) { - for (auto& pipe : group.pipes) { - if (!pipe.contains_zone_bucket(zone_name, bucket)) { - continue; - } - - update_flow_maps(pipe, flow_group_map); - } - } else { - update_flow_maps(rgw_sync_bucket_pipes(), flow_group_map); - } - } -#endif } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 81547dea96f8..f13d438bc7a4 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -30,29 +30,7 @@ struct rgw_sync_group_pipe_map { rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN}; - struct zone_bucket { - string zone; /* zone name */ - rgw_bucket bucket; /* bucket, if empty then wildcard */ - - zone_bucket() {} - zone_bucket(const string& _zone, - std::optional _bucket) : zone(_zone), - bucket(_bucket.value_or(rgw_bucket())) {} - - - bool operator<(const zone_bucket& zb) const { - if (zone < zb.zone) { - return true; - } - if (zone > zb.zone) { - return false; - } - return (bucket < zb.bucket); - } - void dump(ceph::Formatter *f) const; - }; - - using zb_pipe_map_t = std::multimap; + using zb_pipe_map_t = std::multimap; zb_pipe_map_t sources; /* all the pipes where zone is pulling from */ zb_pipe_map_t dests; /* all the pipes that pull from zone */ @@ -69,9 +47,9 @@ struct rgw_sync_group_pipe_map { template void try_add_source(const string& source_zone, - const string& dest_zone, - const std::vector& pipes, - CB filter_cb); + const string& dest_zone, + const std::vector& pipes, + CB filter_cb); template void try_add_dest(const string& source_zone, @@ -81,7 +59,7 @@ struct rgw_sync_group_pipe_map { pair find_pipes(const zb_pipe_map_t& m, const string& zone, - std::optional b); + std::optional b) const; template void init(const string& _zone, @@ -92,31 +70,31 @@ struct rgw_sync_group_pipe_map { /* * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket} */ - vector find_source_pipes(const string& source_zone, + vector find_source_pipes(const string& source_zone, std::optional source_bucket, - std::optional dest_bucket); + std::optional dest_bucket) const; /* * find all relevant pipes in other zones that pull from a specific * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket} */ - vector find_dest_pipes(std::optional source_bucket, - const string& dest_zone, - std::optional dest_bucket); + vector find_dest_pipes(std::optional source_bucket, + const string& dest_zone, + std::optional dest_bucket) const; /* * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket} */ - vector find_pipes(const string& source_zone, + vector find_pipes(const string& source_zone, std::optional source_bucket, const string& dest_zone, - std::optional dest_bucket); + std::optional dest_bucket) const; }; class RGWBucketSyncFlowManager { public: struct pipe_flow { - std::vector pipe; + std::set pipes; void dump(ceph::Formatter *f) const; }; @@ -136,10 +114,7 @@ private: std::optional source_bucket, const string& dest_zone, std::optional dest_bucket, - bool check_activated); - - flow_map_t flow_by_source; - flow_map_t flow_by_dest; + bool check_activated) const; /* * find all the matching flows om a flow map for a specific bucket @@ -155,13 +130,10 @@ public: RGWBucketSyncFlowManager *_parent); void init(const rgw_sync_policy_info& sync_policy); + void reflect(std::optional effective_bucket, + flow_map_t *flow_by_source, + flow_map_t *flow_by_dest); - const flow_map_t& get_sources() { - return flow_by_source; - } - const flow_map_t& get_dests() { - return flow_by_dest; - } }; class RGWBucketSyncPolicyHandler { diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 980d23fd1257..d045ae6487d4 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -875,10 +875,14 @@ void rgw_sync_bucket_entity::decode_json(JSONObj *obj) { JSONDecoder::decode_json("zone", zone, obj); string s; - JSONDecoder::decode_json("bucket", s, obj); - int ret = rgw_bucket_parse_bucket_key(nullptr, s, &bucket, nullptr); - if (ret < 0) { - bucket = rgw_bucket(); + if (JSONDecoder::decode_json("bucket", s, obj)) { + rgw_bucket b; + int ret = rgw_bucket_parse_bucket_key(nullptr, s, &b, nullptr); + if (ret >= 0) { + bucket = b; + } else { + bucket.reset(); + } } } void rgw_sync_bucket_entities::dump(Formatter *f) const diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index e96c833149a4..da5d1fc1d086 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -220,11 +220,25 @@ struct rgw_sync_directional_rule { WRITE_CLASS_ENCODER(rgw_sync_directional_rule) struct rgw_sync_bucket_entity { - string zone; /* define specific zones */ - rgw_bucket bucket; /* define specific bucket */ + std::optional zone; /* define specific zones */ + std::optional bucket; /* define specific bucket */ + + static bool match_str(const string& s1, const string& s2) { /* empty string is wildcard */ + return (s1.empty() || + s2.empty() || + s1 == s2); + } + + bool all_zones{false}; + + rgw_sync_bucket_entity() {} + rgw_sync_bucket_entity(const string& _zone, + std::optional _bucket) : zone(_zone), + bucket(_bucket.value_or(rgw_bucket())) {} void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); + encode(all_zones, bl); encode(zone, bl); encode(bucket, bl); ENCODE_FINISH(bl); @@ -232,6 +246,7 @@ struct rgw_sync_bucket_entity { void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); + decode(all_zones, bl); decode(zone, bl); decode(bucket, bl); DECODE_FINISH(bl); @@ -240,17 +255,55 @@ struct rgw_sync_bucket_entity { void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); + rgw_bucket get_bucket() const { + return bucket.value_or(rgw_bucket()); + } + string bucket_key() const; - const bool operator<(const rgw_sync_bucket_entity& e) const { - if (zone < e.zone) { + bool match_zone(const string& z) const { + if (all_zones) { return true; } - if (zone > e.zone) { + if (!zone) { return false; } + + return (*zone == z); + } + + void apply_zone(const string& z) { + all_zones = false; + zone = z; + } + + bool match_bucket(std::optional b) const { + if (!b) { + return true; + } + + if (!bucket) { + return true; + } + + return (match_str(bucket->tenant, b->tenant) && + match_str(bucket->name, b->name) && + match_str(bucket->bucket_id, b->bucket_id)); + } + + const bool operator<(const rgw_sync_bucket_entity& e) const { + if (all_zones != e.all_zones) { + if (zone < e.zone) { + return true; + } + if (zone > e.zone) { + return false; + } + } return (bucket < e.bucket); } + + void apply_bucket(std::optional _b); }; WRITE_CLASS_ENCODER(rgw_sync_bucket_entity) @@ -273,6 +326,16 @@ public: DECODE_FINISH(bl); } + const bool operator<(const rgw_sync_bucket_pipe& p) const { + if (source < p.source) { + return true; + } + if (p.source < source) { + return false; + } + return (dest < p.dest); + } + void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); }; @@ -292,6 +355,7 @@ public: bool all_zones{false}; + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(bucket, bl); @@ -344,6 +408,8 @@ public: return (zones->find(zone) != zones->end()); } + std::vector expand() const; + rgw_bucket get_bucket() const { return bucket.value_or(rgw_bucket()); } @@ -405,17 +471,7 @@ public: void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); - void get_bucket_pair(rgw_bucket *source_bucket, - rgw_bucket *dest_bucket) const { - *source_bucket = source.get_bucket(); - *dest_bucket = dest.get_bucket(); - - symmetrical_copy_if_empty(source_bucket->tenant, dest_bucket->tenant); - symmetrical_copy_if_empty(source_bucket->name, dest_bucket->name); - if (source_bucket->name == dest_bucket->name) { /* doesn't make sense to copy bucket id if not same bucket name */ - symmetrical_copy_if_empty(source_bucket->bucket_id, dest_bucket->bucket_id); - } - } + std::vector expand() const; }; WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes) -- 2.47.3