From 1d72020c8ebd7f7b22983b580ccb0046739d8867 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 12 Nov 2019 15:50:00 -0800 Subject: [PATCH] rgw: sync: full sync checks for rules prefixes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 100 ++++++++++++++++++++++--------------- src/rgw/rgw_bucket_sync.h | 44 ++++++++++++---- src/rgw/rgw_data_sync.cc | 57 ++++++++++++++++++++- src/rgw/rgw_data_sync.h | 4 ++ src/rgw/rgw_sync_policy.cc | 39 +++++++++++++++ src/rgw/rgw_sync_policy.h | 5 ++ 6 files changed, 197 insertions(+), 52 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 144cd640b7a..8ec6001bd03 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -267,13 +267,17 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& s return vector(); } -void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe) +void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) { + pipes.push_back(pipe); + + auto ppipe = &pipes.back(); auto prefix = ppipe->params.filter.prefix.value_or(string()); - prefix_by_size.insert(make_pair(prefix.size(), ppipe)); + prefix_refs.insert(make_pair(prefix, ppipe)); - for (auto& tag : ppipe->params.filter.tags) { + for (auto& t : ppipe->params.filter.tags) { + string tag = t.key + "=" + t.value; auto titer = tag_refs.find(tag); if (titer != tag_refs.end() && ppipe->params.priority > titer->second->params.priority) { @@ -284,55 +288,80 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe) } } -void RGWBucketSyncFlowManager::pipe_set::finish_init() +#warning add support for tags +bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key, + const vector& tags, + rgw_sync_pipe_params *params) const { - for (auto& entry : rules) { - entry.second.finish_init(); + auto iter = prefix_refs.lower_bound(key.name); + if (iter == prefix_refs.end()) { + return false; } -} - -void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe) -{ - auto prefix = ppipe->params.filter.prefix.value_or(string()); - auto iter = prefix_refs.lower_bound(prefix); - while (iter != prefix_refs.end()) { - auto cur_iter = iter++; + auto max = prefix_refs.end(); - auto& cur_pipe = *cur_iter->second; - auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string()); + std::optional priority; - if (!boost::starts_with(cur_prefix, prefix)) { - return; + for (; iter != prefix_refs.end(); ++iter) { + auto& prefix = iter->first; + if (!boost::starts_with(key.name, prefix)) { + break; } - if (cur_pipe.params.priority > ppipe->params.priority) { + auto& rule_params = iter->second->params; + auto& filter = rule_params.filter; + + if (!filter.check_tags(tags)) { continue; } - prefix_refs.erase(cur_iter); + if (rule_params.priority > priority) { + priority = rule_params.priority; + max = iter; + } + } + + if (max == prefix_refs.end()) { + return false; } + + *params = max->second->params; + return true; } -void RGWBucketSyncFlowManager::pipe_rules::finish_init() +/* + * return either the current prefix for s, or the next one if s is not within a prefix + */ + +RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const { - /* go from the bigger prefixes to the shorter ones, this way we know that we covered all - * overlapping prefixes - */ - for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) { - resolve_prefix(iter->second); + if (prefix_refs.empty()) { + return prefix_refs.end(); + } + auto next = prefix_refs.upper_bound(s); + auto iter = next; + if (iter != prefix_refs.begin()) { + --iter; + } + if (!boost::starts_with(s, iter->first)) { + return next; } + + return iter; } void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) { - auto ppipe = &pipe_map[pipe.id]; - *ppipe = pipe; + pipe_map[pipe.id] = pipe; + + auto& rules_ref = rules[endpoints_pair(pipe)]; - auto prules = &rules[endpoints_pair(*ppipe)]; + if (!rules_ref) { + rules_ref = make_shared(); + } - prules->insert(ppipe); + rules_ref->insert(pipe); - pipe_handler h(prules, pipe); + pipe_handler h(rules_ref, pipe); handlers.insert(h); } @@ -460,9 +489,6 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke dest_pipes->insert(pipe); } } - - source_pipes->finish_init(); - dest_pipes->finish_init(); } @@ -632,9 +658,6 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso } _sources[*new_pipe.source.zone].insert(new_pipe); } - for (auto& s : _sources) { - s.second.finish_init(); - } for (auto& entry : _targets_by_name.pipe_map) { auto& pipe = entry.second; @@ -649,9 +672,6 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso } _targets[*new_pipe.dest.zone].insert(new_pipe); } - for (auto& t : _targets) { - t.second.finish_init(); - } if (psources_by_name) { *psources_by_name = std::move(_sources_by_name); diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 46a5a58952b..23446bffadd 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -136,36 +136,59 @@ public: * pipe_rules: deal with a set of pipes that have common endpoints_pair */ class pipe_rules { - void resolve_prefix(rgw_sync_bucket_pipe *ppipe); + std::vector pipes; public: - std::multimap prefix_by_size; + using prefix_map_t = multimap; - map tag_refs; - map prefix_refs; + map tag_refs; + prefix_map_t prefix_refs; - void insert(rgw_sync_bucket_pipe *pipe); + void insert(const rgw_sync_bucket_pipe& pipe); + + bool find_obj_params(const rgw_obj_key& key, + const vector& tags, + rgw_sync_pipe_params *params) const; + + void scan_prefixes(std::vector *prefixes) const; - void finish_init(); + prefix_map_t::const_iterator prefix_begin() const { + return prefix_refs.begin(); + } + prefix_map_t::const_iterator prefix_search(const std::string& s) const; + prefix_map_t::const_iterator prefix_end() const { + return prefix_refs.end(); + } }; + using pipe_rules_ref = std::shared_ptr; + /* * pipe_handler: extends endpoints_rule to point at the corresponding rules handler */ struct pipe_handler : public endpoints_pair { - pipe_rules *rules; + pipe_rules_ref rules; pipe_handler() {} - pipe_handler(pipe_rules *_rules, + pipe_handler(pipe_rules_ref& _rules, const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe), rules(_rules) {} bool specific() const { return source.specific() && dest.specific(); } + + bool find_obj_params(const rgw_obj_key& key, + const std::vector& tags, + rgw_sync_pipe_params *params) const { + if (!rules) { + return false; + } + return rules->find_obj_params(key, tags, params); + } }; struct pipe_set { - std::map rules; + std::map rules; std::map pipe_map; std::set handlers; @@ -179,7 +202,6 @@ public: } void insert(const rgw_sync_bucket_pipe& pipe); - void finish_init(); iterator begin() const { return handlers.begin(); @@ -229,7 +251,7 @@ public: }; -ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) { +static inline ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) { os << e.dest << " -> " << e.source; return os; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index dea3cec3cad..7f4f82357fa 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3144,6 +3144,51 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { rgw_zone_set zones_trace; RGWSyncTraceNodeRef tn; + + struct _prefix_handler { + RGWBucketSyncFlowManager::pipe_rules_ref rules; + RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter; + std::optional cur_prefix; + + void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref& _rules) { + rules = _rules; + } + + bool revalidate_marker(rgw_obj_key *marker) { + if (cur_prefix && + boost::starts_with(marker->name, *cur_prefix)) { + return true; + } + if (!rules) { + return false; + } + iter = rules->prefix_search(marker->name); + if (iter == rules->prefix_end()) { + return false; + } + cur_prefix = iter->first; + marker->name = *cur_prefix; + marker->instance.clear(); + return true; + } + + bool check_key(const rgw_obj_key& key) { + if (!rules) { + return false; + } + if (cur_prefix && + boost::starts_with(key.name, *cur_prefix)) { + return true; + } + iter = rules->prefix_search(key.name); + if (iter == rules->prefix_end()) { + return false; + } + cur_prefix = iter->first; + return boost::starts_with(key.name, iter->first); + } + } prefix_handler; + public: RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, @@ -3160,6 +3205,7 @@ public: SSTR(bucket_shard_str{bs}))) { zones_trace.insert(sc->source_zone); marker_tracker.set_tn(tn); + prefix_handler.set_rules(sync_pipe.get_rules()); } int operate() override; @@ -3179,6 +3225,11 @@ int RGWBucketShardFullSyncCR::operate() } set_status("listing remote bucket"); tn->log(20, "listing bucket for full sync"); + + if (!prefix_handler.revalidate_marker(&list_marker)) { + break; + } + yield call(new RGWListBucketShardCR(sc, bs, list_marker, &list_result)); if (retcode < 0 && retcode != -ENOENT) { @@ -3198,11 +3249,15 @@ int RGWBucketShardFullSyncCR::operate() tn->log(20, SSTR("[full sync] syncing object: " << bucket_shard_str{bs} << "/" << entries_iter->key)); entry = &(*entries_iter); - total_entries++; list_marker = entries_iter->key; + if (!prefix_handler.check_key(entries_iter->key)) { + continue; + } + total_entries++; if (!marker_tracker.start(entry->key, total_entries, real_time())) { tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?")); } else { +#warning look in here using SyncCR = RGWBucketSyncSingleEntryCR; yield spawn(new SyncCR(sc, sync_pipe, entry->key, false, /* versioned, only matters for object removal */ diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 4ff43c0ed7c..4ea13440068 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -44,6 +44,10 @@ struct rgw_bucket_sync_pipe { rgw_bucket_sync_pair_info info; RGWBucketInfo source_bucket_info; RGWBucketInfo dest_bucket_info; + + RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() { + return info.handler.rules; + } }; inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) { diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index 47763e286c0..5d0cad7e1e4 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -31,6 +31,21 @@ bool rgw_sync_pipe_filter_tag::from_str(const string& s) return true; } +bool rgw_sync_pipe_filter_tag::operator==(const string& s) const +{ + if (s.empty()) { + return false; + } + + auto pos = s.find('='); + if (pos == string::npos) { + return value.empty() && (s == key); + } + + return s.compare(0, pos, s) == 0 && + s.compare(pos + 1, s.size() - pos - 1, value) == 0; +} + void rgw_sync_pipe_filter::encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); @@ -100,6 +115,30 @@ bool rgw_sync_pipe_filter::is_subset_of(const rgw_sync_pipe_filter& f) const return true; } +bool rgw_sync_pipe_filter::check_tag(const string& s) const +{ + if (tags.empty()) { /* tag filter wasn't defined */ + return true; + } + + for (auto& t : tags) { + if (t == s) { + return true; + } + } + return false; +} + +bool rgw_sync_pipe_filter::check_tags(const std::vector& tags) const +{ + for (auto& t : tags) { + if (check_tag(t)) { + return true; + } + } + return false; +} + void rgw_sync_bucket_entity::apply_bucket(std::optional b) { if (!b) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index c61ce95bc6f..d312f7e7908 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -208,6 +208,8 @@ struct rgw_sync_pipe_filter_tag { } return (value < t.value); } + + bool operator==(const string& s) const; }; WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag) @@ -227,6 +229,9 @@ struct rgw_sync_pipe_filter { void decode_json(JSONObj *obj); bool is_subset_of(const rgw_sync_pipe_filter& f) const; + + bool check_tag(const string& s) const; + bool check_tags(const std::vector& tags) const; }; WRITE_CLASS_ENCODER(rgw_sync_pipe_filter) -- 2.39.5