From 5ff5ecedbe84acb36564dcbee1f6c5557ffa849c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 11 Nov 2019 16:19:56 -0800 Subject: [PATCH] rgw: sync: pipe rules handler Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 52 +++++++++++++++++++++++++++++++ src/rgw/rgw_bucket_sync.h | 38 ++++++++++++++++++++++ src/rgw/rgw_data_sync.cc | 29 +---------------- src/rgw/rgw_json_enc.cc | 4 +-- src/rgw/rgw_sync_policy.cc | 31 ++++++++++++++++-- src/rgw/rgw_sync_policy.h | 64 ++++++++++++++++++++------------------ 6 files changed, 154 insertions(+), 64 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index e20d26da19e..79a3f65ed0b 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -267,6 +267,58 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& s return vector(); } +void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) +{ + auto ppipe = &pipe_map[pipe.id]; + + auto prefix = pipe.params.filter.prefix.value_or(string()); + + prefix_by_size.insert(make_pair(prefix.size(), ppipe)); + + for (auto& tag : pipe.params.filter.tags) { + auto titer = tag_refs.find(tag); + if (titer != tag_refs.end() && + pipe.params.priority > titer->second->params.priority) { + titer->second = ppipe; + } else { + tag_refs[tag] = ppipe; + } + } +} + +void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe) +{ + auto prefix = ppipe->params.filter.prefix.value_or(string()); + auto iter = prefix_refs.lower_bound(prefix); + + while (iter != prefix_refs.end()) { + auto cur_iter = iter++; + + auto& cur_pipe = *cur_iter->second; + auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string()); + + if (!boost::starts_with(cur_prefix, prefix)) { + return; + } + + if (cur_pipe.params.priority > ppipe->params.priority) { + continue; + } + + prefix_refs.erase(cur_iter); + } +} + +void RGWBucketSyncFlowManager::pipe_rules::finish_init() +{ + /* go from the bigger prefixes to the shorter ones, this way we know that we covered all + * overlapping prefixes + */ + for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) { + resolve_prefix(iter->second); + } +} + void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const { encode_json("pipes", pipes, f); diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index d413ab04e50..7dcb757eccd 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -111,7 +111,45 @@ public: class RGWBucketSyncFlowManager { friend class RGWBucketSyncPolicyHandler; public: + struct endpoints_pair { + rgw_sync_bucket_entity source; + rgw_sync_bucket_entity dest; + + endpoints_pair() {} + endpoints_pair(const rgw_sync_bucket_pipe& pipe) { + source = pipe.source; + dest = pipe.dest; + } + + bool operator<(const endpoints_pair& e) const { + if (source < e.source) { + return true; + } + if (e.source < source) { + return false; + } + return (dest < e.dest); + } + }; + + class pipe_rules { + void resolve_prefix(rgw_sync_bucket_pipe *ppipe); + + public: + std::map pipe_map; /* id to pipe */ + + std::multimap prefix_by_size; + + map tag_refs; + map prefix_refs; + + void insert(const rgw_sync_bucket_pipe& pipe); + + void finish_init(); + }; + struct pipe_set { + std::map rules; std::set pipes; using iterator = std::set::iterator; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index bbc4bc67175..7659877d721 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1277,7 +1277,6 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { string entry_marker; rgw_bucket_shard source_bs; - rgw_bucket_sync_pair_info sync_pair; int sync_status; @@ -3765,38 +3764,12 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, int RGWRunBucketSourcesSyncCR::operate() { reenter(this) { -#if 0 - yield { - set_status("acquiring sync lock"); - auto store = sync_env->store; - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - sources_obj, - "sync_lock", - cct->_conf->rgw_sync_lease_period, - this)); - lease_stack.reset(spawn(lease_cr.get(), false)); - } - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - tn->log(5, "failed to take lease"); - set_status("lease lock failed, early abort"); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - - tn->log(10, "took lease"); -#endif yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); -#if 0 - lease_cr->go_down(); - drain_all(); -#endif return set_cr_error(retcode); } + ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl; if (pipes.empty()) { diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 06c93880571..4a6abb6d513 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -886,13 +886,13 @@ void rgw_sync_bucket_entity::decode_json(JSONObj *obj) } } -void rgw_sync_pipe_filter::_tag::dump(Formatter *f) const +void rgw_sync_pipe_filter_tag::dump(Formatter *f) const { encode_json("key", key, f); encode_json("value", value, f); } -void rgw_sync_pipe_filter::_tag::decode_json(JSONObj *obj) +void rgw_sync_pipe_filter_tag::decode_json(JSONObj *obj) { JSONDecoder::decode_json("key", key, obj); JSONDecoder::decode_json("value", value, obj); diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index cbc979df789..47763e286c0 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -11,7 +11,7 @@ string rgw_sync_bucket_entity::bucket_key() const return rgw_sync_bucket_entities::bucket_key(bucket); } -bool rgw_sync_pipe_filter::_tag::from_str(const string& s) +bool rgw_sync_pipe_filter_tag::from_str(const string& s) { if (s.empty()) { return false; @@ -61,20 +61,45 @@ void rgw_sync_pipe_filter::set_tags(std::list& tags_add, std::list& tags_rm) { for (auto& t : tags_rm) { - _tag tag; + rgw_sync_pipe_filter_tag tag; if (tag.from_str(t)) { tags.erase(tag); } } for (auto& t : tags_add) { - _tag tag; + rgw_sync_pipe_filter_tag tag; if (tag.from_str(t)) { tags.insert(tag); } } } +bool rgw_sync_pipe_filter::is_subset_of(const rgw_sync_pipe_filter& f) const +{ + if (f.prefix) { + if (!prefix) { + return false; + } + /* f.prefix exists, and this->prefix is either equal or bigger, + * therefore this->prefix also set */ + + if (!boost::starts_with(*prefix, *f.prefix)) { + return false; + } + } + + /* prefix is subset, now check tags. All our tags should exist in f.tags */ + + for (auto& t : tags) { + if (f.tags.find(t) == f.tags.end()) { + return false; + } + } + + return true; +} + void rgw_sync_bucket_entity::apply_bucket(std::optional b) { if (!b) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 0fca7408fa5..c61ce95bc6f 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -176,43 +176,44 @@ struct rgw_sync_bucket_entity { }; WRITE_CLASS_ENCODER(rgw_sync_bucket_entity) -struct rgw_sync_pipe_filter { - struct _tag { - string key; - string value; - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(key, bl); - encode(value, bl); - ENCODE_FINISH(bl); - } +struct rgw_sync_pipe_filter_tag { + string key; + string value; - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(key, bl); - decode(value, bl); - DECODE_FINISH(bl); - } + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(key, bl); + encode(value, bl); + ENCODE_FINISH(bl); + } - void dump(ceph::Formatter *f) const; - void decode_json(JSONObj *obj); + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(key, bl); + decode(value, bl); + DECODE_FINISH(bl); + } - bool from_str(const string& s); + void dump(ceph::Formatter *f) const; + void decode_json(JSONObj *obj); + + bool from_str(const string& s); - bool operator<(const _tag& t) const { - if (key < t.key) { - return true; - } - if (t.key < key) { - return false; - } - return (value < t.value); + bool operator<(const rgw_sync_pipe_filter_tag& t) const { + if (key < t.key) { + return true; + } + if (t.key < key) { + return false; } - }; + return (value < t.value); + } +}; +WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag) +struct rgw_sync_pipe_filter { std::optional prefix; - std::set<_tag> tags; + std::set tags; void set_prefix(std::optional opt_prefix, bool prefix_rm); @@ -224,9 +225,10 @@ struct rgw_sync_pipe_filter { void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); + + bool is_subset_of(const rgw_sync_pipe_filter& f) const; }; WRITE_CLASS_ENCODER(rgw_sync_pipe_filter) -WRITE_CLASS_ENCODER(rgw_sync_pipe_filter::_tag) struct rgw_sync_pipe_acl_translation { rgw_user owner; -- 2.39.5