]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync policy mgr: more fixes
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 23 Oct 2019 01:34:46 +0000 (18:34 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h

index 1f9ca3471603d673715dc80d16c838cc87cee0b0..8778b8c61d965cb3e70131c6885187231d0aada1 100644 (file)
@@ -2320,28 +2320,13 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo
   return 0;
 }
 
-void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f)
+void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pset, Formatter *f)
 {
   Formatter::ObjectSection top_section(*f, name);
   Formatter::ArraySection as(*f, "entries");
 
-  for (auto& entry : m) {
-    Formatter::ObjectSection os(*f, "entry");
-    auto& bucket = entry.first;
-    auto& pflow = entry.second;
-
-#if 0
-    encode_json("buckets", rgw_sync_bucket_entities::bucket_key(bucket), f);
-    {
-      Formatter::ArraySection fg(*f, "flow_groups");
-      for (auto& flow_group : pflow.flow_groups) {
-        encode_json("entry", *flow_group, f);
-      }
-    }
-#endif
-    for (auto& pipe : pflow.pipes) {
-      encode_json("pipe", pipe, f);
-    }
+  for (auto& pipe : pset.pipes) {
+    encode_json("pipe", pipe, f);
   }
 }
 
@@ -2394,8 +2379,8 @@ static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bu
     }
   }
 
-  RGWBucketSyncFlowManager::flow_map_t sources;
-  RGWBucketSyncFlowManager::flow_map_t dests;
+  RGWBucketSyncFlowManager::pipe_set sources;
+  RGWBucketSyncFlowManager::pipe_set dests;
 
   flow_mgr->reflect(eff_bucket, &sources, &dests);
 
@@ -7885,7 +7870,7 @@ next:
 
     if (opt_cmd == OPT::SYNC_GROUP_MODIFY) {
       auto iter = sync_policy.groups.find(*opt_group_id);
-      if (iter != sync_policy.groups.end()) {
+      if (iter == sync_policy.groups.end()) {
         cerr << "ERROR: could not find group '" << *opt_group_id << "'" << std::endl;
         return ENOENT;
       }
index 87592392f5cdde8d3512f11147392aa2bdfe841e..f910b016c74fcdeec19f69ef428fbf8f0249fcbd 100644 (file)
 #define dout_subsys ceph_subsys_rgw
 
 
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
+  os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << ",az=" << (int)e.all_zones << "}";
+  return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
+  os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
+  return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
+  os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
+  return os;
+}
+
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
+  os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
+  return os;
+}
+
 static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
                                                                const string& source_zone,
                                                                const string& dest_zone)
@@ -42,26 +62,6 @@ void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
   encode_json("dests", dests, f);
 }
 
-ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
-  os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zone.value_or("") << "}";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
-  os << "{s=" << pipe.source << ",d=" << pipe.dest << "}";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_entities& e) {
-  os << "{b=" << rgw_sync_bucket_entities::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
-  return os;
-}
-
-ostream& operator<<(ostream& os, const rgw_sync_bucket_pipes& pipe) {
-  os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
-  return os;
-}
-
 
 template <typename CB1, typename CB2>
 void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone,
@@ -143,9 +143,13 @@ template <typename CB>
 void rgw_sync_group_pipe_map::init(const string& _zone,
                                    std::optional<rgw_bucket> _bucket,
                                    const rgw_sync_policy_group& group,
+                                   rgw_sync_data_flow_group *_default_flow,
+                                   std::set<std::string> *_pall_zones,
                                    CB filter_cb) {
   zone = _zone;
   bucket = _bucket;
+  default_flow = _default_flow;
+  pall_zones = _pall_zones;
 
   rgw_sync_bucket_entity zb(zone, bucket);
 
@@ -160,11 +164,20 @@ void rgw_sync_group_pipe_map::init(const string& _zone,
     }
   }
 
-  if (group.data_flow.empty()) {
-    return;
+  const rgw_sync_data_flow_group *pflow;
+
+  if (!group.data_flow.empty()) {
+    pflow = &group.data_flow;
+  } else {
+    if (!default_flow) {
+      return;
+    }
+    pflow = default_flow;
   }
 
-  auto& flow = group.data_flow;
+  auto& flow = *pflow;
+
+  pall_zones->insert(zone);
 
   /* symmetrical */
   if (flow.symmetrical) {
@@ -172,6 +185,7 @@ void rgw_sync_group_pipe_map::init(const string& _zone,
       if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
         for (auto& z : symmetrical_group.zones) {
           if (z != zone) {
+            pall_zones->insert(z);
             try_add_source(z, zone, zone_pipes, filter_cb);
             try_add_dest(zone, z, zone_pipes, filter_cb);
           }
@@ -184,8 +198,10 @@ void rgw_sync_group_pipe_map::init(const string& _zone,
   if (flow.directional) {
     for (auto& rule : *flow.directional) {
       if (rule.source_zone == zone) {
+        pall_zones->insert(rule.dest_zone);
         try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
       } else if (rule.dest_zone == zone) {
+        pall_zones->insert(rule.source_zone);
         try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
       }
     }
@@ -250,7 +266,7 @@ vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& s
   return vector<rgw_sync_bucket_pipe>();
 }
 
-void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
+void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
 {
   encode_json("pipes", pipes, f);
 }
@@ -295,27 +311,20 @@ bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
   return found;
 }
 
-/*
- * find all the matching flows om a flow map for a specific bucket
- */
-RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bucket_flow(RGWBucketSyncFlowManager::flow_map_t& m, std::optional<rgw_bucket> bucket) {
-  if (bucket) {
-    auto iter = m.find(*bucket);
-
-    if (iter != m.end()) {
-      return iter;
-    }
+void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
+  std::optional<rgw_sync_data_flow_group> default_flow;
+  if (parent) {
+    default_flow.emplace();
+    default_flow->init_default(parent->all_zones);
   }
 
-  return m.find(rgw_bucket());
-}
-
-void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
   for (auto& item : sync_policy.groups) {
     auto& group = item.second;
     auto& flow_group_map = flow_groups[group.id];
 
     flow_group_map.init(zone_name, bucket, group,
+                        (default_flow ? &(*default_flow) : nullptr),
+                        &all_zones,
                         [&](const string& source_zone,
                             std::optional<rgw_bucket> source_bucket,
                             const string& dest_zone,
@@ -333,8 +342,8 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
 }
 
 void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucket,
-                                       flow_map_t *flow_by_source,
-                                       flow_map_t *flow_by_dest)
+                                       RGWBucketSyncFlowManager::pipe_set *source_pipes,
+                                       RGWBucketSyncFlowManager::pipe_set *dest_pipes)
 
 {
   rgw_sync_bucket_entity entity;
@@ -342,11 +351,17 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
   entity.bucket = effective_bucket.value_or(rgw_bucket());
 
   if (parent) {
-    parent->reflect(effective_bucket, flow_by_source, flow_by_dest);
+    parent->reflect(effective_bucket, source_pipes, dest_pipes);
   }
 
   for (auto& item : flow_groups) {
     auto& flow_group_map = item.second;
+
+    /* only return enabled groups */
+    if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) {
+      continue;
+    }
+
     for (auto& entry : flow_group_map.sources) {
       rgw_sync_bucket_pipe pipe = entry.second;
       if (!pipe.dest.match_bucket(effective_bucket)) {
@@ -356,8 +371,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
 
-      auto& by_source = (*flow_by_source)[pipe.source.get_bucket()];
-      by_source.pipes.insert(pipe);
+      source_pipes->pipes.insert(pipe);
     }
 
     for (auto& entry : flow_group_map.dests) {
@@ -370,8 +384,7 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       pipe.source.apply_bucket(effective_bucket);
       pipe.dest.apply_bucket(effective_bucket);
 
-      auto& by_dest = (*flow_by_dest)[pipe.dest.get_bucket()];
-      by_dest.pipes.insert(pipe);
+      dest_pipes->pipes.insert(pipe);
     }
   }
 }
index f13d438bc7a46f4374d5eeb8e031ceabfdc4dabe..7e705c030d55891688fcbea3f439395c09f7eac5 100644 (file)
@@ -35,6 +35,11 @@ struct rgw_sync_group_pipe_map {
   zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
   zb_pipe_map_t dests; /* all the pipes that pull from zone */
 
+  std::set<string> *pall_zones{nullptr};
+  rgw_sync_data_flow_group *default_flow{nullptr}; /* flow to use if policy doesn't define it,
+                                                      used in the case of bucket sync policy, not at the
+                                                      zonegroup level */
+
   void dump(ceph::Formatter *f) const;
 
   template <typename CB1, typename CB2>
@@ -65,6 +70,8 @@ struct rgw_sync_group_pipe_map {
   void init(const string& _zone,
             std::optional<rgw_bucket> _bucket,
             const rgw_sync_policy_group& group,
+            rgw_sync_data_flow_group *_default_flow,
+            std::set<std::string> *_pall_zones,
             CB filter_cb);
 
   /*
@@ -93,14 +100,12 @@ struct rgw_sync_group_pipe_map {
 
 class RGWBucketSyncFlowManager {
 public:
-  struct pipe_flow {
+  struct pipe_set {
     std::set<rgw_sync_bucket_pipe> pipes;
 
     void dump(ceph::Formatter *f) const;
   };
 
-  using flow_map_t = map<rgw_bucket, pipe_flow>;
-
 private:
 
   string zone_name;
@@ -110,6 +115,8 @@ private:
 
   map<string, rgw_sync_group_pipe_map> flow_groups;
 
+  std::set<std::string> all_zones;
+
   bool allowed_data_flow(const string& source_zone,
                          std::optional<rgw_bucket> source_bucket,
                          const string& dest_zone,
@@ -119,8 +126,6 @@ private:
   /*
    * find all the matching flows om a flow map for a specific bucket
    */
-  flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
-
   void update_flow_maps(const rgw_sync_bucket_pipes& pipe);
 
 public:
@@ -131,8 +136,8 @@ public:
 
   void init(const rgw_sync_policy_info& sync_policy);
   void reflect(std::optional<rgw_bucket> effective_bucket,
-               flow_map_t *flow_by_source,
-               flow_map_t *flow_by_dest);
+               pipe_set *flow_by_source,
+               pipe_set *flow_by_dest);
 
 };
 
index 1cbf79179a1784f6b348d8bb9b522acd8697678a..d2485aa25625bf1597a8673aaf98e6b196a9e92b 100644 (file)
@@ -158,8 +158,8 @@ std::vector<rgw_sync_bucket_pipe> rgw_sync_bucket_pipes::expand() const
   for (auto& s : sources) {
     for (auto& d : dests) {
       rgw_sync_bucket_pipe pipe;
-      pipe.source = std::move(s);
-      pipe.dest = std::move(d);
+      pipe.source = s;
+      pipe.dest = d;
       result.push_back(pipe);
     }
   }
@@ -208,6 +208,9 @@ void rgw_sync_data_flow_group::remove_symmetrical(const string& flow_id, std::op
     if (iter->id == flow_id) {
       if (!zones) {
         groups.erase(iter);
+        if (groups.empty()) {
+          symmetrical.reset();
+        }
         return;
       }
       break;
@@ -227,6 +230,9 @@ void rgw_sync_data_flow_group::remove_symmetrical(const string& flow_id, std::op
   if (flow_group.zones.empty()) {
     groups.erase(iter);
   }
+  if (groups.empty()) {
+    symmetrical.reset();
+  }
 }
 
 bool rgw_sync_data_flow_group::find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group)
@@ -270,11 +276,19 @@ void rgw_sync_data_flow_group::remove_directional(const string& source_zone, con
     if (source_zone == rule.source_zone &&
         dest_zone == rule.dest_zone) {
       directional->erase(iter);
+      if (directional->empty()) {
+        directional.reset();
+      }
       return;
     }
   }
 }
 
+void rgw_sync_data_flow_group::init_default(const std::set<string>& zones)
+{
+  symmetrical.emplace();
+  symmetrical->push_back(rgw_sync_symmetric_group("default", zones));
+}
 
 bool rgw_sync_policy_group::find_pipe(const string& pipe_id, bool create, rgw_sync_bucket_pipes **pipe)
 {
index 9313789b8ef59ef3b80f82a88342a0d6d54c8154..57081a62aae8b0d556bf2d929499f7eba7c3740c 100644 (file)
@@ -22,6 +22,11 @@ struct rgw_sync_symmetric_group {
   string id;
   std::set<string> zones;
 
+  rgw_sync_symmetric_group() {}
+  rgw_sync_symmetric_group(const string& _id,
+                           const std::set<string> _zones) : id(_id), zones(_zones) {}
+
+
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(id, bl);
@@ -137,13 +142,17 @@ struct rgw_sync_bucket_entity {
   }
 
   const bool operator<(const rgw_sync_bucket_entity& e) const {
-    if (all_zones != e.all_zones) {
-      if (zone < e.zone) {
-        return true;
-      }
-      if (zone > e.zone) {
-        return false;
-      }
+    if (all_zones && !e.all_zones) {
+      return false;
+    }
+    if (!all_zones && e.all_zones) {
+      return true;
+    }
+    if (zone < e.zone) {
+      return true;
+    }
+    if (zone > e.zone) {
+      return false;
     }
     return (bucket < e.bucket);
   }
@@ -329,6 +338,8 @@ struct rgw_sync_data_flow_group {
   void remove_symmetrical(const string& flow_id, std::optional<std::vector<string> > zones);
   bool find_directional(const string& source_zone, const string& dest_zone, bool create, rgw_sync_directional_rule **flow_group);
   void remove_directional(const string& source_zone, const string& dest_zone);
+
+  void init_default(const std::set<string>& zones);
 };
 WRITE_CLASS_ENCODER(rgw_sync_data_flow_group)