From: Yehuda Sadeh Date: Tue, 12 Nov 2019 01:50:17 +0000 (-0800) Subject: rgw: use pipe handlers instead of pipes off pipe_set X-Git-Tag: v15.1.0~22^2~63 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa1e2dbceda459809eda781243265353690be2b5;p=ceph.git rgw: use pipe handlers instead of pipes off pipe_set Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index cfbf4892112a..bed629ee5c18 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2328,8 +2328,10 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse Formatter::ObjectSection top_section(*f, name); Formatter::ArraySection as(*f, "entries"); - for (auto& pipe : pset.pipes) { - encode_json("pipe", pipe, f); + for (auto& pipe_handler : pset) { + Formatter::ObjectSection hs(*f, "handler"); + encode_json("source", pipe_handler.source, f); + encode_json("dest", pipe_handler.dest, f); } } @@ -2418,16 +2420,16 @@ static int sync_info(std::optional opt_target_zone, std::optional sources; + std::set dests; - handler->get_pipes(&sources, &dests); + handler->get_pipes(&sources, &dests, std::nullopt); auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints()); auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints()); - RGWBucketSyncFlowManager::pipe_set resolved_sources; - RGWBucketSyncFlowManager::pipe_set resolved_dests; + std::set resolved_sources; + std::set resolved_dests; rgw_sync_bucket_entity self_entity(zone_name, opt_bucket); @@ -2469,8 +2471,8 @@ static int sync_info(std::optional opt_target_zone, std::optionalsecond.id) { bucket_source_sync_status(store, zone, z->second, c->second, diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index 79a3f65ed0b2..144cd640b7a5 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -267,18 +267,16 @@ vector rgw_sync_group_pipe_map::find_pipes(const string& s return vector(); } -void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe) +void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe) { - auto ppipe = &pipe_map[pipe.id]; - - auto prefix = pipe.params.filter.prefix.value_or(string()); + auto prefix = ppipe->params.filter.prefix.value_or(string()); prefix_by_size.insert(make_pair(prefix.size(), ppipe)); - for (auto& tag : pipe.params.filter.tags) { + for (auto& tag : ppipe->params.filter.tags) { auto titer = tag_refs.find(tag); if (titer != tag_refs.end() && - pipe.params.priority > titer->second->params.priority) { + ppipe->params.priority > titer->second->params.priority) { titer->second = ppipe; } else { tag_refs[tag] = ppipe; @@ -286,6 +284,13 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi } } +void RGWBucketSyncFlowManager::pipe_set::finish_init() +{ + for (auto& entry : rules) { + entry.second.finish_init(); + } +} + void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe) { auto prefix = ppipe->params.filter.prefix.value_or(string()); @@ -319,9 +324,22 @@ void RGWBucketSyncFlowManager::pipe_rules::finish_init() } } +void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) { + auto ppipe = &pipe_map[pipe.id]; + *ppipe = pipe; + + auto prules = &rules[endpoints_pair(*ppipe)]; + + prules->insert(ppipe); + + pipe_handler h(prules, pipe); + + handlers.insert(h); +} + void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const { - encode_json("pipes", pipes, f); + encode_json("pipes", pipe_map, f); } bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone, @@ -426,7 +444,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); - source_pipes->pipes.insert(pipe); + source_pipes->insert(pipe); } for (auto& entry : flow_group_map.dests) { @@ -439,9 +457,12 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke pipe.source.apply_bucket(effective_bucket); pipe.dest.apply_bucket(effective_bucket); - dest_pipes->pipes.insert(pipe); + dest_pipes->insert(pipe); } } + + source_pipes->finish_init(); + dest_pipes->finish_init(); } @@ -597,7 +618,8 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled); - for (auto& pipe : _sources_by_name.pipes) { + for (auto& entry : _sources_by_name.pipe_map) { + auto& pipe = entry.second; if (!pipe.source.zone) { continue; } @@ -608,9 +630,14 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) { new_pipe.source.zone = zone_id; } - _sources[*new_pipe.source.zone].pipes.insert(new_pipe); + _sources[*new_pipe.source.zone].insert(new_pipe); + } + for (auto& s : _sources) { + s.second.finish_init(); } - for (auto& pipe : _targets_by_name.pipes) { + + for (auto& entry : _targets_by_name.pipe_map) { + auto& pipe = entry.second; if (!pipe.dest.zone) { continue; } @@ -620,7 +647,10 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) { new_pipe.dest.zone = zone_id; } - _targets[*new_pipe.dest.zone].pipes.insert(new_pipe); + _targets[*new_pipe.dest.zone].insert(new_pipe); + } + for (auto& t : _targets) { + t.second.finish_init(); } if (psources_by_name) { @@ -643,25 +673,21 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso } } -void RGWBucketSyncPolicyHandler::get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets, +void RGWBucketSyncPolicyHandler::get_pipes(std::set *sources, std::set *targets, std::optional filter_peer) { /* return raw pipes (with zone name) */ - if (!filter_peer) { - *sources = sources_by_name; - *targets = targets_by_name; - return; - } - - auto& filter = *filter_peer; - - for (auto& source_pipe : sources_by_name.pipes) { - if (source_pipe.source.match(filter)) { - sources->pipes.insert(source_pipe); + for (auto& entry : sources_by_name.pipe_map) { + auto& source_pipe = entry.second; + if (!filter_peer || + source_pipe.source.match(*filter_peer)) { + sources->insert(source_pipe); } } - for (auto& target_pipe : targets_by_name.pipes) { - if (target_pipe.dest.match(filter)) { - targets->pipes.insert(target_pipe); + for (auto& entry : targets_by_name.pipe_map) { + auto& target_pipe = entry.second; + if (!filter_peer || + target_pipe.dest.match(*filter_peer)) { + targets->insert(target_pipe); } } } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 7dcb757eccd9..46a5a58952be 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -132,42 +132,61 @@ public: } }; + /* + * pipe_rules: deal with a set of pipes that have common endpoints_pair + */ class pipe_rules { void resolve_prefix(rgw_sync_bucket_pipe *ppipe); public: - std::map pipe_map; /* id to pipe */ - std::multimap prefix_by_size; map tag_refs; map prefix_refs; - void insert(const rgw_sync_bucket_pipe& pipe); + void insert(rgw_sync_bucket_pipe *pipe); void finish_init(); }; + /* + * pipe_handler: extends endpoints_rule to point at the corresponding rules handler + */ + struct pipe_handler : public endpoints_pair { + pipe_rules *rules; + + pipe_handler() {} + pipe_handler(pipe_rules *_rules, + const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe), + rules(_rules) {} + bool specific() const { + return source.specific() && dest.specific(); + } + }; + struct pipe_set { std::map rules; - std::set pipes; + std::map pipe_map; - using iterator = std::set::iterator; + std::set handlers; + + using iterator = std::set::iterator; void clear() { - pipes.clear(); + rules.clear(); + pipe_map.clear(); + handlers.clear(); } - void insert(const rgw_sync_bucket_pipe& pipe) { - pipes.insert(pipe); - } + void insert(const rgw_sync_bucket_pipe& pipe); + void finish_init(); - iterator begin() { - return pipes.begin(); + iterator begin() const { + return handlers.begin(); } - iterator end() { - return pipes.end(); + iterator end() const { + return handlers.end(); } void dump(ceph::Formatter *f) const; @@ -210,6 +229,11 @@ public: }; +ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) { + os << e.dest << " -> " << e.source; + return os; +} + class RGWBucketSyncPolicyHandler { const RGWBucketSyncPolicyHandler *parent{nullptr}; RGWSI_Zone *zone_svc; @@ -290,8 +314,8 @@ public: *sources = &sources_by_name; *targets = &targets_by_name; } - void get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets, - std::optional filter_peer); + void get_pipes(std::set *sources, std::set *targets, + std::optional filter_peer); const std::set& get_source_hints() const { return source_hints; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 7659877d7210..dea3cec3cad6 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1124,20 +1124,20 @@ std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e) return out; } -struct rgw_sync_pipe_info { - rgw_sync_pipe_params params; +struct rgw_sync_pipe_handler_info { + RGWBucketSyncFlowManager::pipe_handler handler; rgw_sync_pipe_info_entity source; rgw_sync_pipe_info_entity target; - rgw_sync_pipe_info() {} - rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe, + rgw_sync_pipe_handler_info() {} + rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler, std::optional source_bucket_info, - std::optional target_bucket_info) : source(pipe.source, source_bucket_info), - target(pipe.dest, target_bucket_info) { - params = pipe.params; + std::optional target_bucket_info) : handler(_handler), + source(handler.source, source_bucket_info), + target(handler.dest, target_bucket_info) { } - bool operator<(const rgw_sync_pipe_info& p) const { + bool operator<(const rgw_sync_pipe_handler_info& p) const { if (source < p.source) { return true; } @@ -1153,37 +1153,37 @@ struct rgw_sync_pipe_info { } }; -std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) { +std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_handler_info& p) { out << p.source << ">" << p.target; return out; } struct rgw_sync_pipe_info_set { - std::set pipes; + std::set handlers; - using iterator = std::set::iterator; + using iterator = std::set::iterator; void clear() { - pipes.clear(); + handlers.clear(); } - void insert(const rgw_sync_bucket_pipe& pipe, + void insert(const RGWBucketSyncFlowManager::pipe_handler& handler, std::optional& source_bucket_info, std::optional& target_bucket_info) { - rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info); - pipes.insert(p); + rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info); + handlers.insert(p); } iterator begin() { - return pipes.begin(); + return handlers.begin(); } iterator end() { - return pipes.end(); + return handlers.end(); } bool empty() const { - return pipes.empty(); + return handlers.empty(); } void update_empty_bucket_info(const std::map& buckets_info) { @@ -1191,14 +1191,14 @@ struct rgw_sync_pipe_info_set { return; } - std::set p; + std::set p; - for (auto pipe : pipes) { + for (auto pipe : handlers) { pipe.update_empty_bucket_info(buckets_info); p.insert(pipe); } - pipes = std::move(p); + handlers = std::move(p); } }; @@ -3646,17 +3646,17 @@ class RGWGetBucketPeersCR : public RGWCoroutine { << " all_sources.size()=" << all_sources.size() << dendl; auto iters = get_pipe_iters(all_sources, source_zone); for (auto i = iters.first; i != iters.second; ++i) { - for (auto& pipe : i->second.pipes) { - if (!pipe.specific()) { - ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl; + for (auto& handler : i->second) { + if (!handler.specific()) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl; continue; } if (source_bucket && - !source_bucket->match(*pipe.source.bucket)) { + !source_bucket->match(*handler.source.bucket)) { continue; } - ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl; - result->insert(pipe, source_bucket_info, target_bucket_info); + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl; + result->insert(handler, source_bucket_info, target_bucket_info); } } } @@ -3670,15 +3670,15 @@ class RGWGetBucketPeersCR : public RGWCoroutine { << " all_targets.size()=" << all_targets.size() << dendl; auto iters = get_pipe_iters(all_targets, target_zone); for (auto i = iters.first; i != iters.second; ++i) { - for (auto& pipe : i->second.pipes) { + for (auto& handler : i->second) { if (target_bucket && - pipe.dest.bucket && - !target_bucket->match(*pipe.dest.bucket)) { - ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl; + handler.dest.bucket && + !target_bucket->match(*handler.dest.bucket)) { + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl; continue; } - ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl; - result->insert(pipe, source_bucket_info, target_bucket_info); + ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl; + result->insert(handler, source_bucket_info, target_bucket_info); } } } @@ -3789,7 +3789,7 @@ int RGWRunBucketSourcesSyncCR::operate() } sync_pair.dest_bs.bucket = siter->target.get_bucket(); - sync_pair.params = siter->params; + sync_pair.handler = siter->handler; if (sync_pair.source_bs.shard_id >= 0) { num_shards = 1; diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 273da1b8624b..4ff43c0ed7cc 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -17,11 +17,13 @@ #include "rgw_sync_trace.h" #include "rgw_sync_policy.h" +#include "rgw_bucket_sync.h" + class JSONObj; struct rgw_sync_bucket_pipe; struct rgw_bucket_sync_pair_info { - rgw_sync_pipe_params params; + RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */ rgw_bucket_shard source_bs; rgw_bucket_shard dest_bs; }; @@ -33,10 +35,6 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) { out << p.source_bs; - if (p.params.filter.prefix) { - out << "/" << *p.params.filter.prefix + "*"; - } - out << "->" << p.dest_bs.bucket; return out;