]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use pipe handlers instead of pipes off pipe_set
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 12 Nov 2019 01:50:17 +0000 (17:50 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h

index cfbf4892112a8e9231262ce4a05971c1b9409a1d..bed629ee5c1832f2d52ef13d2d6f94961e96dfcb 100644 (file)
@@ -2328,8 +2328,10 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse
   Formatter::ObjectSection top_section(*f, name);
   Formatter::ArraySection as(*f, "entries");
 
-  for (auto& pipe : pset.pipes) {
-    encode_json("pipe", pipe, f);
+  for (auto& pipe_handler : pset) {
+    Formatter::ObjectSection hs(*f, "handler");
+    encode_json("source", pipe_handler.source, f);
+    encode_json("dest", pipe_handler.dest, f);
   }
 }
 
@@ -2418,16 +2420,16 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
     handler = bucket_handler;
   }
 
-  RGWBucketSyncFlowManager::pipe_set *sources;
-  RGWBucketSyncFlowManager::pipe_set *dests;
+  std::set<rgw_sync_bucket_pipe> sources;
+  std::set<rgw_sync_bucket_pipe> dests;
 
-  handler->get_pipes(&sources, &dests);
+  handler->get_pipes(&sources, &dests, std::nullopt);
 
   auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints());
   auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints());
 
-  RGWBucketSyncFlowManager::pipe_set resolved_sources;
-  RGWBucketSyncFlowManager::pipe_set resolved_dests;
+  std::set<rgw_sync_bucket_pipe> resolved_sources;
+  std::set<rgw_sync_bucket_pipe> resolved_dests;
 
   rgw_sync_bucket_entity self_entity(zone_name, opt_bucket);
 
@@ -2469,8 +2471,8 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
 
   {
     Formatter::ObjectSection os(*formatter, "result");
-    encode_json("sources", *sources, formatter);
-    encode_json("dests", *dests, formatter);
+    encode_json("sources", sources, formatter);
+    encode_json("dests", dests, formatter);
     {
       Formatter::ObjectSection hints_section(*formatter, "hints");
       encode_json("sources", source_hints_vec, formatter);
@@ -2519,8 +2521,8 @@ static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo&
   for (auto& m : sources) {
     auto& zone = m.first;
     out << indented{width, "source zone"} << zone << std::endl;
-    for (auto& pipe : m.second.pipes) {
-      out << indented{width, "bucket"} << *pipe.source.bucket << std::endl;
+    for (auto& pipe_handler : m.second) {
+      out << indented{width, "bucket"} << *pipe_handler.source.bucket << std::endl;
     }
   }
 
@@ -2593,7 +2595,8 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf
     }
 
     for (auto& m : sources) {
-      for (auto& pipe : m.second.pipes) {
+      for (auto& entry : m.second.pipe_map) {
+        auto& pipe = entry.second;
         if (pipe.source.zone.value_or("") == z->second.id) {
           bucket_source_sync_status(store, zone, z->second,
                                     c->second,
index 79a3f65ed0b255547acbf34572d6c2c2bbc31d9f..144cd640b7a5759a60bc75b9096f1e57612fc0bb 100644 (file)
@@ -267,18 +267,16 @@ vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& s
   return vector<rgw_sync_bucket_pipe>();
 }
 
-void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
+void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe)
 {
-  auto ppipe = &pipe_map[pipe.id];
-
-  auto prefix = pipe.params.filter.prefix.value_or(string());
+  auto prefix = ppipe->params.filter.prefix.value_or(string());
 
   prefix_by_size.insert(make_pair(prefix.size(), ppipe));
 
-  for (auto& tag : pipe.params.filter.tags) {
+  for (auto& tag : ppipe->params.filter.tags) {
     auto titer = tag_refs.find(tag);
     if (titer != tag_refs.end() &&
-        pipe.params.priority > titer->second->params.priority) {
+        ppipe->params.priority > titer->second->params.priority) {
       titer->second = ppipe;
     } else {
       tag_refs[tag] = ppipe;
@@ -286,6 +284,13 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi
   }
 }
 
+void RGWBucketSyncFlowManager::pipe_set::finish_init()
+{
+  for (auto& entry : rules) {
+    entry.second.finish_init();
+  }
+}
+
 void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
 {
   auto prefix = ppipe->params.filter.prefix.value_or(string());
@@ -319,9 +324,22 @@ void RGWBucketSyncFlowManager::pipe_rules::finish_init()
   }
 }
 
+void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
+  auto ppipe = &pipe_map[pipe.id];
+  *ppipe = pipe;
+
+  auto prules = &rules[endpoints_pair(*ppipe)];
+
+  prules->insert(ppipe);
+
+  pipe_handler h(prules, pipe);
+
+  handlers.insert(h);
+}
+
 void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
 {
-  encode_json("pipes", pipes, f);
+  encode_json("pipes", pipe_map, f);
 }
 
 bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
@@ -426,7 +444,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
 
-      source_pipes->pipes.insert(pipe);
+      source_pipes->insert(pipe);
     }
 
     for (auto& entry : flow_group_map.dests) {
@@ -439,9 +457,12 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
 
-      dest_pipes->pipes.insert(pipe);
+      dest_pipes->insert(pipe);
     }
   }
+
+  source_pipes->finish_init();
+  dest_pipes->finish_init();
 }
 
 
@@ -597,7 +618,8 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
 
   flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
 
-  for (auto& pipe : _sources_by_name.pipes) {
+  for (auto& entry : _sources_by_name.pipe_map) {
+    auto& pipe = entry.second;
     if (!pipe.source.zone) {
       continue;
     }
@@ -608,9 +630,14 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
     if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
       new_pipe.source.zone = zone_id;
     }
-    _sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+    _sources[*new_pipe.source.zone].insert(new_pipe);
+  }
+  for (auto& s : _sources) {
+    s.second.finish_init();
   }
-  for (auto& pipe : _targets_by_name.pipes) {
+
+  for (auto& entry : _targets_by_name.pipe_map) {
+    auto& pipe = entry.second;
     if (!pipe.dest.zone) {
       continue;
     }
@@ -620,7 +647,10 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
     if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
       new_pipe.dest.zone = zone_id;
     }
-    _targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+    _targets[*new_pipe.dest.zone].insert(new_pipe);
+  }
+  for (auto& t : _targets) {
+    t.second.finish_init();
   }
 
   if (psources_by_name) {
@@ -643,25 +673,21 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
   }
 }
 
-void RGWBucketSyncPolicyHandler::get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
+void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
                                           std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes (with zone name) */
-  if (!filter_peer) {
-    *sources = sources_by_name;
-    *targets = targets_by_name;
-    return;
-  }
-
-  auto& filter = *filter_peer;
-
-  for (auto& source_pipe : sources_by_name.pipes) {
-    if (source_pipe.source.match(filter)) {
-      sources->pipes.insert(source_pipe);
+  for (auto& entry : sources_by_name.pipe_map) {
+    auto& source_pipe = entry.second;
+    if (!filter_peer ||
+        source_pipe.source.match(*filter_peer)) {
+      sources->insert(source_pipe);
     }
   }
 
-  for (auto& target_pipe : targets_by_name.pipes) {
-    if (target_pipe.dest.match(filter)) {
-      targets->pipes.insert(target_pipe);
+  for (auto& entry : targets_by_name.pipe_map) {
+    auto& target_pipe = entry.second;
+    if (!filter_peer ||
+        target_pipe.dest.match(*filter_peer)) {
+      targets->insert(target_pipe);
     }
   }
 }
index 7dcb757eccd9f1e43675ed94a87528fc7f6e4c8e..46a5a58952be286a6f06342d7a3b63e06411da6e 100644 (file)
@@ -132,42 +132,61 @@ public:
     }
   };
 
+  /*
+   * pipe_rules: deal with a set of pipes that have common endpoints_pair
+   */
   class pipe_rules {
     void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
 
   public:
-    std::map<string, rgw_sync_bucket_pipe> pipe_map; /* id to pipe */
-
     std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
 
     map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
     map<string, rgw_sync_bucket_pipe *> prefix_refs;
 
-    void insert(const rgw_sync_bucket_pipe& pipe);
+    void insert(rgw_sync_bucket_pipe *pipe);
 
     void finish_init();
   };
 
+  /*
+   * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
+   */
+  struct pipe_handler : public endpoints_pair {
+    pipe_rules *rules;
+
+    pipe_handler() {}
+    pipe_handler(pipe_rules *_rules,
+                 const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
+                                                      rules(_rules) {}
+    bool specific() const {
+      return source.specific() && dest.specific();
+    }
+  };
+
   struct pipe_set {
     std::map<endpoints_pair, pipe_rules> rules;
-    std::set<rgw_sync_bucket_pipe> pipes;
+    std::map<string, rgw_sync_bucket_pipe> pipe_map;
 
-    using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
+    std::set<pipe_handler> handlers;
+
+    using iterator = std::set<pipe_handler>::iterator;
 
     void clear() {
-      pipes.clear();
+      rules.clear();
+      pipe_map.clear();
+      handlers.clear();
     }
 
-    void insert(const rgw_sync_bucket_pipe& pipe) {
-      pipes.insert(pipe);
-    }
+    void insert(const rgw_sync_bucket_pipe& pipe);
+    void finish_init();
 
-    iterator begin() {
-      return pipes.begin();
+    iterator begin() const {
+      return handlers.begin();
     }
 
-    iterator end() {
-      return pipes.end();
+    iterator end() const {
+      return handlers.end();
     }
 
     void dump(ceph::Formatter *f) const;
@@ -210,6 +229,11 @@ public:
 
 };
 
+ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
+  os << e.dest << " -> " << e.source;
+  return os;
+}
+
 class RGWBucketSyncPolicyHandler {
   const RGWBucketSyncPolicyHandler *parent{nullptr};
   RGWSI_Zone *zone_svc;
@@ -290,8 +314,8 @@ public:
     *sources = &sources_by_name;
     *targets = &targets_by_name;
   }
-  void get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
-                std::optional<rgw_sync_bucket_entity> filter_peer);
+  void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
+                 std::optional<rgw_sync_bucket_entity> filter_peer);
 
   const std::set<rgw_bucket>& get_source_hints() const {
     return source_hints;
index 7659877d7210f6a5af9d3de12c987076e19a60cb..dea3cec3cad60b6e77db3f905daff75d77a19cfc 100644 (file)
@@ -1124,20 +1124,20 @@ std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info_entity& e)
   return out;
 }
 
-struct rgw_sync_pipe_info {
-  rgw_sync_pipe_params params;
+struct rgw_sync_pipe_handler_info {
+  RGWBucketSyncFlowManager::pipe_handler handler;
   rgw_sync_pipe_info_entity source;
   rgw_sync_pipe_info_entity target;
 
-  rgw_sync_pipe_info() {}
-  rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+  rgw_sync_pipe_handler_info() {}
+  rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler,
                      std::optional<RGWBucketInfo> source_bucket_info,
-                     std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
-                                                                        target(pipe.dest, target_bucket_info) {
-    params = pipe.params;
+                     std::optional<RGWBucketInfo> target_bucket_info) : handler(_handler),
+                                                                        source(handler.source, source_bucket_info),
+                                                                        target(handler.dest, target_bucket_info) {
   }
 
-  bool operator<(const rgw_sync_pipe_info& p) const {
+  bool operator<(const rgw_sync_pipe_handler_info& p) const {
     if (source < p.source) {
       return true;
     }
@@ -1153,37 +1153,37 @@ struct rgw_sync_pipe_info {
   }
 };
 
-std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) {
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_handler_info& p) {
   out << p.source << ">" << p.target;
   return out;
 }
 
 struct rgw_sync_pipe_info_set {
-  std::set<rgw_sync_pipe_info> pipes;
+  std::set<rgw_sync_pipe_handler_info> handlers;
 
-  using iterator = std::set<rgw_sync_pipe_info>::iterator;
+  using iterator = std::set<rgw_sync_pipe_handler_info>::iterator;
 
   void clear() {
-    pipes.clear();
+    handlers.clear();
   }
 
-  void insert(const rgw_sync_bucket_pipe& pipe,
+  void insert(const RGWBucketSyncFlowManager::pipe_handler& handler,
               std::optional<RGWBucketInfo>& source_bucket_info,
               std::optional<RGWBucketInfo>& target_bucket_info) {
-    rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
-    pipes.insert(p);
+    rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info);
+    handlers.insert(p);
   }
 
   iterator begin() {
-    return pipes.begin();
+    return handlers.begin();
   }
 
   iterator end() {
-    return pipes.end();
+    return handlers.end();
   }
 
   bool empty() const {
-    return pipes.empty();
+    return handlers.empty();
   }
 
   void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
@@ -1191,14 +1191,14 @@ struct rgw_sync_pipe_info_set {
       return;
     }
 
-    std::set<rgw_sync_pipe_info> p;
+    std::set<rgw_sync_pipe_handler_info> p;
 
-    for (auto pipe : pipes) {
+    for (auto pipe : handlers) {
       pipe.update_empty_bucket_info(buckets_info);
       p.insert(pipe);
     }
 
-    pipes = std::move(p);
+    handlers = std::move(p);
   }
 };
 
@@ -3646,17 +3646,17 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
                                 << " all_sources.size()=" << all_sources.size() << dendl;
     auto iters = get_pipe_iters(all_sources, source_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
-      for (auto& pipe : i->second.pipes) {
-        if (!pipe.specific()) {
-          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
+      for (auto& handler : i->second) {
+        if (!handler.specific()) {
+          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
           continue;
         }
         if (source_bucket &&
-            !source_bucket->match(*pipe.source.bucket)) {
+            !source_bucket->match(*handler.source.bucket)) {
           continue;
         }
-        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
-        result->insert(pipe, source_bucket_info, target_bucket_info);
+        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
+        result->insert(handler, source_bucket_info, target_bucket_info);
       }
     }
   }
@@ -3670,15 +3670,15 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
                                 << " all_targets.size()=" << all_targets.size() << dendl;
     auto iters = get_pipe_iters(all_targets, target_zone);
     for (auto i = iters.first; i != iters.second; ++i) {
-      for (auto& pipe : i->second.pipes) {
+      for (auto& handler : i->second) {
         if (target_bucket &&
-            pipe.dest.bucket &&
-            !target_bucket->match(*pipe.dest.bucket)) {
-          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
+            handler.dest.bucket &&
+            !target_bucket->match(*handler.dest.bucket)) {
+          ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
           continue;
         }
-        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
-        result->insert(pipe, source_bucket_info, target_bucket_info);
+        ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
+        result->insert(handler, source_bucket_info, target_bucket_info);
       }
     }
   }
@@ -3789,7 +3789,7 @@ int RGWRunBucketSourcesSyncCR::operate()
         }
         sync_pair.dest_bs.bucket = siter->target.get_bucket();
 
-        sync_pair.params = siter->params;
+        sync_pair.handler = siter->handler;
 
         if (sync_pair.source_bs.shard_id >= 0) {
           num_shards = 1;
index 273da1b8624bb81c131ba61560cc303132e2ec34..4ff43c0ed7ccabc661b7d36218e2535a98bd9c27 100644 (file)
 #include "rgw_sync_trace.h"
 #include "rgw_sync_policy.h"
 
+#include "rgw_bucket_sync.h"
+
 class JSONObj;
 struct rgw_sync_bucket_pipe;
 
 struct rgw_bucket_sync_pair_info {
-  rgw_sync_pipe_params params;
+  RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
   rgw_bucket_shard source_bs;
   rgw_bucket_shard dest_bs;
 };
@@ -33,10 +35,6 @@ inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pair_info& p) {
 
   out << p.source_bs;
 
-  if (p.params.filter.prefix) {
-    out << "/" << *p.params.filter.prefix + "*";
-  }
-
   out << "->" << p.dest_bs.bucket;
 
   return out;