]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync flow handler: integrate with flow manager
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 24 Oct 2019 22:58:06 +0000 (15:58 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Convert zone names to zone ids, adjust interfaces.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_json_enc.cc

index 8778b8c61d965cb3e70131c6885187231d0aada1..ff6083269162a01ffaaa96ce39d288f9d54a6665 100644 (file)
@@ -2453,8 +2453,8 @@ static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo&
   for (auto& m : sources) {
     auto& zone = m.first;
     out << indented{width, "source zone"} << zone << std::endl;
-    for (auto& s : m.second) {
-      out << indented{width, "bucket"} << s << std::endl;
+    for (auto& pipe : m.second.pipes) {
+      out << indented{width, "bucket"} << *pipe.source.bucket << std::endl;
     }
   }
 
index d74b33c16f910bb9bc840a8f369e5c0404e471f3..e1f6f8a77edc245fc6e52e8ade978e4c4b4a3d32 100644 (file)
@@ -455,19 +455,39 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
 
 int RGWBucketSyncPolicyHandler::init()
 {
-#warning FIXME
-#if 0
-  const auto& zone_id = zone_svc->get_zone().id;
-  auto& zg = zone_svc->get_zonegroup();
+  flow_mgr->init(*bucket_info.sync_policy);
 
-  if (!bucket_info.sync_policy) {
-    return 0;
-  }
+  RGWBucketSyncFlowManager::pipe_set sources_by_name;
+  RGWBucketSyncFlowManager::pipe_set targets_by_name;
+  flow_mgr->reflect(bucket_info.bucket, &sources_by_name, &targets_by_name);
+
+  /* convert to zone ids */
+
+  for (auto& pipe : sources_by_name.pipes) {
+    if (!pipe.source.zone) {
+      continue;
+    }
+    rgw_sync_bucket_pipe new_pipe = pipe;
+    string zone_id;
 
-  auto& sync_policy = *bucket_info.sync_policy;
+    if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
+      new_pipe.source.zone = zone_id;
+    }
+    sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+  }
+  for (auto& pipe : targets_by_name.pipes) {
+    if (!pipe.dest.zone) {
+      continue;
+    }
+    rgw_sync_bucket_pipe new_pipe = pipe;
+    string zone_id;
+    if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
+      new_pipe.dest.zone = zone_id;
+    }
+    targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+  }
 
   return 0;
-#endif
 }
 
 bool RGWBucketSyncPolicyHandler::bucket_exports_data() const
index 159cdbb2449753468821d8fbd4d3eba93fc0ba2d..cdded5a9ddf10074d7d1d3eacb9bcc73fea7fd1c 100644 (file)
@@ -154,45 +154,23 @@ class RGWBucketSyncPolicyHandler {
   RGWBucketInfo bucket_info;
   std::unique_ptr<RGWBucketSyncFlowManager> flow_mgr;
 
-  std::set<string> source_zones;
+  map<string, RGWBucketSyncFlowManager::pipe_set> sources; /* source pipes by source zone id */
+  map<string, RGWBucketSyncFlowManager::pipe_set> targets; /* target pipes by target zone id */
 
-public:
-  struct peer_info {
-    std::string type;
-    rgw_bucket bucket;
-    /* need to have config for other type of sources */
-
-    bool operator<(const peer_info& si) const {
-      if (type == si.type) {
-        return (bucket < si.bucket);
-      }
-      return (type < si.type);
-    }
-
-    bool is_rgw() const {
-      return (type.empty() || type == "rgw");
-    }
-
-    string get_type() const {
-      if (!type.empty()) {
-        return type;
-      }
-      return "rgw";
-    }
-
-    void dump(Formatter *f) const;
-  };
+  bool bucket_is_sync_source() const {
+    return !targets.empty();
+  }
 
-private:
-  std::map<string, std::set<peer_info> > sources; /* peers by zone */
-  std::map<string, std::set<peer_info> > targets; /* peers by zone */
+  bool bucket_is_sync_target() const {
+    return !sources.empty();
+  }
 
 public:
   RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
                              RGWBucketInfo& _bucket_info);
   int init();
 
-  std::map<string, std::set<peer_info> >& get_sources() {
+  const  map<string, RGWBucketSyncFlowManager::pipe_set>& get_sources() {
     return sources;
   }
 
@@ -200,22 +178,7 @@ public:
     return bucket_info;
   }
 
-  bool zone_is_source(const string& zone_id) const {
-    return sources.find(zone_id) != sources.end();
-  }
-
-  bool bucket_is_sync_source() const {
-    return !targets.empty();
-  }
-
-  bool bucket_is_sync_target() const {
-    return !sources.empty();
-  }
-
   bool bucket_exports_data() const;
   bool bucket_imports_data() const;
 };
 
-inline ostream& operator<<(ostream& out, const RGWBucketSyncPolicyHandler::peer_info& pi) {
-  return out << pi.bucket << " (" << pi.get_type() << ")";
-}
index a732804f8a996b44db02d05dcd5e16fbdce731b9..43bfcefaf50726913e93486449a42478854684fc 100644 (file)
@@ -3410,7 +3410,7 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket bucket;
 
-  map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *sources;
+  map<string, RGWBucketSyncFlowManager::pipe_set> *sources;
   RGWBucketInfo *pbucket_info;
 
   rgw_raw_obj sources_obj;
@@ -3426,7 +3426,7 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine {
 public:
   RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env,
                             const rgw_bucket& _bucket,
-                            map<string, set<RGWBucketSyncPolicyHandler::peer_info> > *_sources,
+                            map<string, RGWBucketSyncFlowManager::pipe_set> *_sources,
                             RGWBucketInfo *_pbucket_info,
                             const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sync_env->cct),
@@ -3486,9 +3486,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
   rgw_raw_obj sources_obj;
 
-  map<string, set<RGWBucketSyncPolicyHandler::peer_info> > sources;
-  map<string, set<RGWBucketSyncPolicyHandler::peer_info> >::iterator siter;
-  set<RGWBucketSyncPolicyHandler::peer_info>::iterator piter;
+  map<string, RGWBucketSyncFlowManager::pipe_set> sources;
+  map<string, RGWBucketSyncFlowManager::pipe_set>::iterator siter;
+  set<rgw_sync_bucket_pipe>::iterator piter;
 
   rgw_bucket_sync_pair_info sync_pair;
 
@@ -3565,12 +3565,8 @@ int RGWRunBucketSourcesSyncCR::operate()
         }
         cur_sc->init(sync_env, conn, siter->first);
       }
-      for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) {
-        if (!piter->is_rgw()) {
-          continue;
-        }
-
-        sync_pair.source_bs.bucket = piter->bucket;
+      for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) {
+        sync_pair.source_bs.bucket = *piter->source.bucket;
         sync_pair.dest_bs.bucket = bucket_info.bucket;
 
         yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false);
index d045ae6487d4fd5b97d8d6985c676009ece6a9e4..f73019f3ec6c37b7517423f07d7dab7563cf6085 100644 (file)
@@ -1012,12 +1012,6 @@ void rgw_sync_policy_info::decode_json(JSONObj *obj)
   }
 }
 
-void RGWBucketSyncPolicyHandler::peer_info::dump(Formatter *f) const
-{
-  encode_json("type", get_type(), f);
-  encode_json("bucket", bucket, f);
-}
-
 void rgw_obj_key::dump(Formatter *f) const
 {
   encode_json("name", name, f);