]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: rework sync policy
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 16 Aug 2019 18:59:44 +0000 (11:59 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_json_enc.cc

index ecdffaca5bed95e13f690bf8dd08b2c426f87e8e..8732bce32f888cfcd1dcd5797e3d62b7c642c07c 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "services/svc_zone.h"
 
+#define dout_subsys ceph_subsys_rgw
 
 #if 0
 void RGWBucketSyncPolicyInfo::post_init()
@@ -23,6 +24,7 @@ void RGWBucketSyncPolicyInfo::post_init()
 }
 #endif
 
+
 int RGWBucketSyncPolicyHandler::init()
 {
   const auto& zone_id = zone_svc->get_zone().id;
@@ -33,32 +35,58 @@ int RGWBucketSyncPolicyHandler::init()
 
   auto& sync_policy = *bucket_info.sync_policy;
 
-  for (auto& entry : sync_policy.entries) {
-    if (!entry.bucket ||
-        !(*entry.bucket == bucket_info.bucket)) {
-      continue;
-    }
-
-
-  }
-
-  source_zones.clear();
-
-#warning FIXME
-#if 0
-  if (!sync_policy ||
-      !sync_policy->pipes) {
-    return 0;
-  }
-
-  for (auto& p : *sync_policy->pipes) {
-    auto& pipe = p.second;
-
-    if (pipe.target.zone_id == zone_id) {
-      source_zones.insert(pipe.source.zone_id());
+  if (sync_policy.targets) {
+    for (auto& target : *sync_policy.targets) {
+      if (!(target.bucket || *target.bucket == bucket_info.bucket)) {
+        continue;
+      }
+
+      if (!(target.type.empty() ||
+            target.type == "rgw")) {
+        ldout(zone_svc->ctx(), 20) << "unsuppported sync target: " << target.type << dendl;
+        continue;
+      }
+
+      if (target.zones.find("*") == target.zones.end() &&
+          target.zones.find(zone_id) == target.zones.end()) {
+        continue;
+      }
+
+      /* populate trivial peers */
+      for (auto& rule : target.flow_rules) {
+        set<string> source_zones;
+        set<string> target_zones;
+        rule.get_zone_peers(zone_id, &source_zones, &target_zones);
+
+        for (auto& sz : source_zones) {
+          peer_info sinfo;
+          sinfo.bucket = bucket_info.bucket;
+          sources[sz].insert(sinfo);
+        }
+
+        for (auto& tz : target_zones) {
+          peer_info tinfo;
+          tinfo.bucket = bucket_info.bucket;
+          targets[tz].insert(tinfo);
+        }
+      }
+
+      /* non trivial sources */
+      for (auto& source : target.sources) {
+        if (!source.bucket ||
+            *source.bucket == bucket_info.bucket) {
+          if ((source.type.empty() || source.type == "rgw") &&
+              source.zone &&
+              source.bucket) {
+            peer_info sinfo;
+            sinfo.type = source.type;
+            sinfo.bucket = *source.bucket;
+            sources[*source.zone].insert(sinfo);
+          }
+        }
+      }
     }
   }
-#endif
 
   return 0;
 }
index 90a3d2e27b53e6bf95a1af87fca1999718c717f6..ca104f0c4ce36c3953ddc54efa311a5ed3160694 100644 (file)
@@ -27,6 +27,22 @@ class RGWBucketSyncPolicyHandler {
 
   std::set<string> source_zones;
 
+  struct peer_info {
+    std::string type;
+    rgw_bucket bucket;
+    /* need to have config for other type of sources */
+
+    bool operator<(const peer_info& si) const {
+      if (type == si.type) {
+        return (bucket < si.bucket);
+      }
+      return (type < si.type);
+    }
+  };
+
+  std::map<string, std::set<peer_info> > sources;
+  std::map<string, std::set<peer_info> > targets;
+
 public:
   RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
                              RGWBucketInfo& _bucket_info);
@@ -34,7 +50,7 @@ public:
   int init();
 
   bool zone_is_source(const string& zone_id) const {
-    return source_zones.find(zone_id) != source_zones.end();
+    return sources.find(zone_id) != sources.end();
   }
 };
 
index e7a2c4b5019609ed936ce2ed1976e3f8c9d6ea02..81f5686db0e83236ea2144bf04f927b7cd58f57f 100644 (file)
@@ -64,20 +64,25 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
-void rgw_sync_group_info::encode(bufferlist& bl) const
+void rgw_sync_flow_rule::get_zone_peers(const string& zone_id,
+                                        std::set<string> *sources,
+                                        std::set<string> *targets) const
 {
-  ENCODE_START(1, 1, bl);
-  encode(id, bl);
-  encode(config, bl);
-  ENCODE_FINISH(bl);
-}
+  sources->clear();
+  targets->clear();
 
-void rgw_sync_group_info::decode(bufferlist::const_iterator& bl)
-{
-  DECODE_START(1, bl);
-  decode(id, bl);
-  decode(config, bl);
-  DECODE_FINISH(bl);
+  if (directional) {
+    if (directional->target_zone == zone_id) {
+      sources->insert(directional->source_zone);
+    } else if (directional->source_zone == zone_id) {
+      targets->insert(directional->target_zone);
+    }
+  } else if (symmetrical &&
+             symmetrical->find(zone_id) != symmetrical->end()) {
+    *sources = *symmetrical;
+    sources->erase(zone_id);
+    *targets = *sources;
+  }
 }
 
 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
index 9ca1cdc7cf6df7cbad6a3970639e9e6ebb6d6ed6..c67d3586e8ca15a33c1c7c9d5b03c2c908edf5d8 100644 (file)
@@ -259,122 +259,144 @@ struct rgw_bucket_entry_owner {
   void decode_json(JSONObj *obj);
 };
 
-struct rgw_sync_group_info {
-  static constexpr int CONFIG_FLAG_NONE = 0x0;
-  static constexpr int CONFIG_FLAG_DR   = 0x1;
+struct rgw_sync_flow_directional_rule {
+  string source_zone;
+  string target_zone;
 
-  std::string id;
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(source_zone, bl);
+    encode(target_zone, bl);
+    ENCODE_FINISH(bl);
+  }
 
-  struct _config {
-    int flags{CONFIG_FLAG_NONE};
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(source_zone, bl);
+    decode(target_zone, bl);
+    DECODE_FINISH(bl);
+  }
 
-    void encode(bufferlist& bl) const {
-      ENCODE_START(1, 1, bl);
-      encode(flags, bl);
-      ENCODE_FINISH(bl);
-    }
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_flow_directional_rule)
 
-    void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(1, bl);
-      decode(flags, bl);
-      DECODE_FINISH(bl);
-    }
+struct rgw_sync_flow_rule {
+  string id;
+  std::optional<rgw_sync_flow_directional_rule> directional;
+  std::optional<std::set<string> > symmetrical;
 
-    void dump(ceph::Formatter *f) const;
-    void decode_json(JSONObj *obj);
-  } config;
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(directional, bl);
+    encode(symmetrical, bl);
+    ENCODE_FINISH(bl);
+  }
 
-  void encode(bufferlist& bl) const;
-  void decode(bufferlist::const_iterator& bl);
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(directional, bl);
+    decode(symmetrical, bl);
+    DECODE_FINISH(bl);
+  }
 
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(rgw_sync_group_info::_config)
-WRITE_CLASS_ENCODER(rgw_sync_group_info)
 
-struct rgw_sync_instance_info {
-  std::string id;
+  void get_zone_peers(const string& zone_id, std::set<string> *sources, std::set<string> *targets) const;
+};
+WRITE_CLASS_ENCODER(rgw_sync_flow_rule)
 
-  std::optional<std::set<std::string> > sync_groups; /* name of groups entity belongs to */
-  std::optional<std::string> zone_id;
+struct rgw_sync_source {
+  string id;
+  string type;
+  std::optional<string> zone;
   std::optional<rgw_bucket> bucket;
-  std::optional<std::string> obj_prefix;
-
-  struct sync_pipe {
-    string source;
-    string target_prefix;
-
-    bool operator<(const sync_pipe& rhs) const {
-      if (source == rhs.source) {
-        return (target_prefix < rhs.target_prefix);
-      }
-      return (source  < rhs.source);
-    }
-
-    void encode(bufferlist& bl) const {
-      ENCODE_START(1, 1, bl);
-      encode(source, bl);
-      encode(target_prefix, bl);
-      ENCODE_FINISH(bl);
-    }
+  /* FIXME: config */
 
-    void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(1, bl);
-      decode(source, bl);
-      decode(target_prefix, bl);
-      DECODE_FINISH(bl);
-    }
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(type, bl);
+    encode(zone, bl);
+    encode(bucket, bl);
+    ENCODE_FINISH(bl);
+  }
 
-    void dump(ceph::Formatter *f) const;
-    void decode_json(JSONObj *obj);
-  };
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(type, bl);
+    decode(zone, bl);
+    decode(bucket, bl);
+    DECODE_FINISH(bl);
+  }
 
-  std::optional<std::set<sync_pipe> > sync_from; /* optional group/entity ids */
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_source)
 
+struct rgw_sync_target {
+  string id;
+  string type;
+  std::vector<rgw_sync_flow_rule> flow_rules; /* flow rules for trivial sources */
+  std::set<string> zones;  /* target zones. Can be wildcard */
+  /* FIXME: add config */
+
+  std::vector<rgw_sync_source> sources; /* non-trivial sources */
+  std::optional<rgw_bucket> bucket; /* can be explicit, or not set. If not set then depending
+                                       on the context */
+  
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(id, bl);
-    encode(sync_groups, bl);
-    encode(zone_id, bl);
+    encode(type, bl);
+    encode(flow_rules, bl);
+    encode(zones, bl);
+    encode(sources, bl);
     encode(bucket, bl);
-    encode(obj_prefix, bl);
-    encode(sync_from, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(id, bl);
-    decode(sync_groups, bl);
-    decode(zone_id, bl);
+    decode(type, bl);
+    decode(flow_rules, bl);
+    decode(zones, bl);
+    decode(sources, bl);
     decode(bucket, bl);
-    decode(obj_prefix, bl);
-    decode(sync_from, bl);
     DECODE_FINISH(bl);
   }
 
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);
 };
-WRITE_CLASS_ENCODER(rgw_sync_instance_info::sync_pipe)
-WRITE_CLASS_ENCODER(rgw_sync_instance_info)
+WRITE_CLASS_ENCODER(rgw_sync_target)
+
 
 struct rgw_sync_policy_info {
-  std::vector<rgw_sync_group_info> groups;
-  std::vector<rgw_sync_instance_info> entries;
+  std::optional<std::vector<rgw_sync_flow_rule> > flow_rules;
+  std::optional<std::vector<rgw_sync_source> > sources;
+  std::optional<std::vector<rgw_sync_target> > targets;
 
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(groups, bl);
-    encode(entries, bl);
+    encode(flow_rules, bl);
+    encode(sources, bl);
+    encode(targets, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(groups, bl);
-    decode(entries, bl);
+    decode(flow_rules, bl);
+    decode(sources, bl);
+    decode(targets, bl);
     DECODE_FINISH(bl);
   }
 
@@ -382,8 +404,8 @@ struct rgw_sync_policy_info {
   void decode_json(JSONObj *obj);
 
   bool empty() const {
-    return groups.empty() &&
-           entries.empty();
+    return (!flow_rules || flow_rules->empty()) &&
+           (!targets || targets->empty());
   }
 
 };
index d08cc9704095a1f0099a979b988527b099f8be33..c3740b233fe805062d0d96f3866cd4ec0f3f14b7 100644 (file)
@@ -832,89 +832,80 @@ void RGWBucketInfo::decode_json(JSONObj *obj) {
   reshard_status = (cls_rgw_reshard_status)rs;
 }
 
-void rgw_sync_group_info::dump(Formatter *f) const
+void rgw_sync_flow_directional_rule::dump(Formatter *f) const
 {
-  encode_json("id", id, f);
-  encode_json("config", config, f);
+  encode_json("source_zone", source_zone, f);
+  encode_json("target_zone", target_zone, f);
 }
 
-void rgw_sync_group_info::decode_json(JSONObj *obj)
+void rgw_sync_flow_directional_rule::decode_json(JSONObj *obj)
 {
-  JSONDecoder::decode_json("id", id, obj);
-  JSONDecoder::decode_json("config", config, obj);
+  JSONDecoder::decode_json("source_zone", source_zone, obj);
+  JSONDecoder::decode_json("target_zone", target_zone, obj);
 }
 
-static struct rgw_flags_desc sync_group_flags_desc[] = {
- { rgw_sync_group_info::CONFIG_FLAG_DR, "mirror" },
- { rgw_sync_group_info::CONFIG_FLAG_NONE, "none" },
- { 0, NULL }
-};
-
-void rgw_sync_group_info::_config::dump(Formatter *f) const
+void rgw_sync_flow_rule::dump(Formatter *f) const
 {
-  char buf[256];
-  mask_to_str(sync_group_flags_desc, flags, buf, sizeof(buf));
-  encode_json("flags", (const char *)buf, f);
+  encode_json("id", id, f);
+  encode_json("directional", directional, f);
+  encode_json("symmetrical", symmetrical, f);
 }
 
-static struct rgw_name_to_flag sync_group_flags_mapping[] = {
-                  {"mirror",  rgw_sync_group_info::CONFIG_FLAG_DR},
-                 {"none", 0},
-                 {NULL, 0} };
-
-
-
-void rgw_sync_group_info::_config::decode_json(JSONObj *obj)
+void rgw_sync_flow_rule::decode_json(JSONObj *obj)
 {
-  string s;
-  JSONDecoder::decode_json("flags", s, obj);
-  uint32_t f = 0;
-  rgw_parse_list_of_flags(sync_group_flags_mapping, s, &f);
-  flags = f;
+  JSONDecoder::decode_json("id", id, obj);
+  JSONDecoder::decode_json("directional", directional, obj);
+  JSONDecoder::decode_json("symmetrical", symmetrical, obj);
 }
 
-void rgw_sync_instance_info::dump(Formatter *f) const
+void rgw_sync_source::dump(Formatter *f) const
 {
   encode_json("id", id, f);
-  encode_json("sync_groups", sync_groups, f);
-  encode_json("zone_id", zone_id, f);
+  encode_json("type", type, f);
+  encode_json("zone", zone, f);
   encode_json("bucket", bucket, f);
-  encode_json("obj_prefix", obj_prefix, f);
-  encode_json("sync_from", sync_from, f);
 }
 
-void rgw_sync_instance_info::decode_json(JSONObj *obj)
+void rgw_sync_source::decode_json(JSONObj *obj)
 {
   JSONDecoder::decode_json("id", id, obj);
-  JSONDecoder::decode_json("sync_groups", sync_groups, obj);
-  JSONDecoder::decode_json("zone_id", zone_id, obj);
+  JSONDecoder::decode_json("type", type, obj);
+  JSONDecoder::decode_json("zone", zone, obj);
   JSONDecoder::decode_json("bucket", bucket, obj);
-  JSONDecoder::decode_json("obj_prefix", obj_prefix, obj);
-  JSONDecoder::decode_json("sync_from", sync_from, obj);
 }
 
-void rgw_sync_instance_info::sync_pipe::dump(Formatter *f) const
+void rgw_sync_target::dump(Formatter *f) const
 {
-  encode_json("source", source, f);
-  encode_json("target_prefix", target_prefix, f);
+  encode_json("id", id, f);
+  encode_json("type", type, f);
+  encode_json("flow_rules", flow_rules, f);
+  encode_json("zones", zones, f);
+  encode_json("sources", sources, f);
+  encode_json("bucket", bucket, f);
 }
 
-void rgw_sync_instance_info::sync_pipe::decode_json(JSONObj *obj)
+void rgw_sync_target::decode_json(JSONObj *obj)
 {
-  JSONDecoder::decode_json("source", source, obj);
-  JSONDecoder::decode_json("target_prefix", target_prefix, obj);
+  JSONDecoder::decode_json("id", id, obj);
+  JSONDecoder::decode_json("type", type, obj);
+  JSONDecoder::decode_json("flow_rules", flow_rules, obj);
+  JSONDecoder::decode_json("zones", zones, obj);
+  JSONDecoder::decode_json("sources", sources, obj);
+  JSONDecoder::decode_json("bucket", bucket, obj);
 }
 
 void rgw_sync_policy_info::dump(Formatter *f) const
 {
-  encode_json("groups", groups, f);
-  encode_json("entries", entries, f);
+  encode_json("flow_rules", flow_rules, f);
+  encode_json("sources", sources, f);
+  encode_json("targets", targets, f);
 }
 
 void rgw_sync_policy_info::decode_json(JSONObj *obj)
 {
-  JSONDecoder::decode_json("groups", groups, obj);
-  JSONDecoder::decode_json("entries", entries, obj);
+  JSONDecoder::decode_json("flow_rules", flow_rules, obj);
+  JSONDecoder::decode_json("sources", sources, obj);
+  JSONDecoder::decode_json("targets", targets, obj);
 }
 
 void rgw_obj_key::dump(Formatter *f) const