]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket flow manager initialization changes
authorYehuda Sadeh <yehuda@redhat.com>
Sat, 19 Oct 2019 01:49:28 +0000 (18:49 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_sync_policy.h

index 8b5baed2a299836468fc422a0d7741e4c38c4f1a..2739b0c87f224ea9eb3ad05f7b6c6c8ae337f6bb 100644 (file)
@@ -2331,12 +2331,14 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m
     auto& pflow = entry.second;
 
     encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+#if 0
     {
       Formatter::ArraySection fg(*f, "flow_groups");
       for (auto& flow_group : pflow.flow_groups) {
         encode_json("entry", *flow_group, f);
       }
     }
+#endif
     encode_json("pipes", pflow.pipe, f);
   }
 }
index fda52b1bdeb38aae42454f014791f057dd5240bb..7cd007cf98bdd8382a18414818785b38db82c49e 100644 (file)
@@ -481,12 +481,15 @@ vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& s
 
 void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
 {
+#warning cleanup
+#if 0
   {
     Formatter::ArraySection os(*f, "flow_groups");
     for (auto& g : flow_groups) {
       encode_json("group", *g, f);
     }
   }
+#endif
 
   encode_json("pipe", pipe, f);
 }
@@ -546,20 +549,17 @@ RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bu
 }
 
 
-void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe,
-                                                rgw_sync_group_pipe_map *flow_group) {
+void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe) {
   auto source_bucket = pipe.source.get_bucket();
   auto dest_bucket = pipe.dest.get_bucket();
 
-  if (!flow_group->sources.empty()) {
+  if (pipe.match_dest(zone_name, bucket)) { /* we're the dest */
     auto& by_source = flow_by_source[source_bucket];
-    by_source.flow_groups.push_back(flow_group);
     by_source.pipe.push_back(pipe);
   }
 
-  if (!flow_group->dests.empty()) {
+  if (pipe.match_source(zone_name, bucket)) { /* we're the source */
     auto& by_dest = flow_by_dest[dest_bucket];
-    by_dest.flow_groups.push_back(flow_group);
     by_dest.pipe.push_back(pipe);
   }
 
@@ -581,6 +581,10 @@ void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe
 }
 
 void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
+    rgw_sync_bucket_entity entity;
+    entity.zones = std::set<string>( { zone_name } );
+    entity.bucket =  bucket;
+
   for (auto& item : sync_policy.groups) {
     auto& group = item.second;
     auto& flow_group_map = flow_groups[group.id];
@@ -600,18 +604,43 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
                                                          false); /* just check that it's not disabled */
                         });
 
+    for (auto& entry : flow_group_map.sources) {
+      rgw_sync_bucket_pipe pipe;
+      rgw_sync_bucket_entity source;
+      pipe.source.zones = std::set<string>( { entry.first.zone } );
+      pipe.source.bucket = entry.first.bucket;
+      pipe.dest = entity;
+
+      auto& by_source = flow_by_source[pipe.source.get_bucket()];
+      by_source.pipe.push_back(pipe);
+    }
+
+    for (auto& entry : flow_group_map.dests) {
+      rgw_sync_bucket_pipe pipe;
+      rgw_sync_bucket_entity dest;
+      pipe.dest.zones = std::set<string>( { entry.first.zone } );
+      pipe.dest.bucket = entry.first.bucket;
+      pipe.source = entity;
+
+      auto& by_dest = flow_by_source[pipe.dest.get_bucket()];
+      by_dest.pipe.push_back(pipe);
+    }
+  }
+
+#if 0
     if (!group.pipes.empty()) {
       for (auto& pipe : group.pipes) {
-        if (!pipe.contains_bucket(bucket)) {
+        if (!pipe.contains_zone_bucket(zone_name, bucket)) {
           continue;
         }
 
-        update_flow_maps(pipe, &flow_group_map);
+        update_flow_maps(pipe, flow_group_map);
       }
     } else {
-      update_flow_maps(rgw_sync_bucket_pipe(), &flow_group_map);
+      update_flow_maps(rgw_sync_bucket_pipe(), flow_group_map);
     }
   }
+#endif
 }
 
 
index d9fcedbd2fd8eced62f70fead74d18ad8ba741db..b6d3f4400f2445f8a684f5659701fbd9b3067286 100644 (file)
@@ -116,8 +116,7 @@ struct rgw_sync_group_pipe_map {
 class RGWBucketSyncFlowManager {
 public:
   struct pipe_flow {
-    vector<rgw_sync_group_pipe_map *> flow_groups;
-    vector<rgw_sync_bucket_pipe> pipe;
+    std::vector<rgw_sync_bucket_pipe> pipe;
 
     void dump(ceph::Formatter *f) const;
   };
@@ -147,8 +146,7 @@ private:
    */
   flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
 
-  void update_flow_maps(const rgw_sync_bucket_pipe& pipe,
-                        rgw_sync_group_pipe_map *flow_group);
+  void update_flow_maps(const rgw_sync_bucket_pipe& pipe);
 
 public:
 
index 1093cef0c66d8e81a761f65249286cd5d6d2b2f3..fdb803fd75cda6a9f3c3f3ac2a17d7cc58d0a020 100644 (file)
@@ -331,9 +331,16 @@ public:
     return (source.match_zone(zone) || dest.match_zone(zone));
   }
 
+  bool match_source(const string& zone, std::optional<rgw_bucket> b) const {
+    return (source.match_zone(zone) && source.match_bucket(b));
+  }
+
+  bool match_dest(const string& zone, std::optional<rgw_bucket> b) const {
+    return (dest.match_zone(zone) && dest.match_bucket(b));
+  }
+
   bool contains_zone_bucket(const string& zone, std::optional<rgw_bucket> b) const {
-    return ((source.match_zone(zone) && source.match_bucket(b)) ||
-            (dest.match_zone(zone) && dest.match_bucket(b)));
+    return (match_source(zone, b) || match_dest(zone, b));
   }
 
   void dump(ceph::Formatter *f) const;