]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: define sync policy structures
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 12 Aug 2019 22:58:28 +0000 (15:58 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:36 +0000 (10:20 -0800)
Defines sync policy for both zones and buckets.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/common/ceph_json.h
src/rgw/rgw_bucket.h
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_common.cc
src/rgw/rgw_common.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_tools.cc
src/rgw/rgw_tools.h

index 05615c6dc71d0ecc3b0ce5c5cfc8dbacd81eaaa6..27d2dddd139b2bee1d581866309fe83a443668f2 100644 (file)
@@ -136,6 +136,9 @@ public:
   template<class T>
   static bool decode_json(const char *name, boost::optional<T>& val, JSONObj *obj, bool mandatory = false);
 
+  template<class T>
+  static bool decode_json(const char *name, std::optional<T>& val, JSONObj *obj, bool mandatory = false);
+
 };
 
 template<class T>
@@ -382,6 +385,32 @@ bool JSONDecoder::decode_json(const char *name, boost::optional<T>& val, JSONObj
   return true;
 }
 
+template<class T>
+bool JSONDecoder::decode_json(const char *name, std::optional<T>& val, JSONObj *obj, bool mandatory)
+{
+  JSONObjIter iter = obj->find_first(name);
+  if (iter.end()) {
+    if (mandatory) {
+      std::string s = "missing mandatory field " + std::string(name);
+      throw err(s);
+    }
+    val.reset();
+    return false;
+  }
+
+  try {
+    val.emplace();
+    decode_json_obj(*val, *iter);
+  } catch (const err& e) {
+    val.reset();
+    std::string s = std::string(name) + ": ";
+    s.append(e.what());
+    throw err(s);
+  }
+
+  return true;
+}
+
 template<class T>
 static void encode_json(const char *name, const T& val, ceph::Formatter *f)
 {
@@ -542,6 +571,16 @@ void encode_json_map(const char *name, const char *index_name, const char *value
   encode_json_map<K, V>(name, index_name, NULL, value_name, NULL, NULL, m, f);
 }
 
+template <class T>
+static void encode_json(const char *name, const std::optional<T>& o, ceph::Formatter *f)
+{
+  if (!o) {
+    return;
+  }
+  encode_json(name, *o, f);
+}
+
+
 class JSONFormattable : public ceph::JSONFormatter {
   JSONObj::data_val value;
   std::vector<JSONFormattable> arr;
index a8e782f07e16590213e6bd9488cc7e621e20fffc..9af062da16042043ed2fb6448d8f866bcff02d6b 100644 (file)
@@ -511,7 +511,7 @@ class RGWDataChangesLog {
   std::atomic<bool> down_flag = { false };
 
   struct ChangeStatus {
-    std::shared_ptr<const RGWBucketSyncPolicy> sync_policy;
+    std::shared_ptr<const rgw_sync_policy_info> sync_policy;
     real_time cur_expiration;
     real_time cur_sent;
     bool pending = false;
index d5651fad90d01f6a470001879a05cd79cbd9a7b7..ecdffaca5bed95e13f690bf8dd08b2c426f87e8e 100644 (file)
@@ -2,16 +2,79 @@
 
 #include "rgw_common.h"
 #include "rgw_bucket_sync.h"
+#include "rgw_data_sync.h"
+#include "rgw_zone.h"
 
+#include "services/svc_zone.h"
 
 
-void RGWBucketSyncPolicy::post_init()
+#if 0
+void RGWBucketSyncPolicyInfo::post_init()
 {
+  if (pipes) {
+    return;
+  }
+
+  for (auto& p : *pipes) {
+    auto& pipe = p.second;
+
+    source_zones.insert(pipe.source.zone_id());
+  }
+}
+#endif
+
+int RGWBucketSyncPolicyHandler::init()
+{
+  const auto& zone_id = zone_svc->get_zone().id;
+
+  if (!bucket_info.sync_policy) {
+    return 0;
+  }
+
+  auto& sync_policy = *bucket_info.sync_policy;
+
+  for (auto& entry : sync_policy.entries) {
+    if (!entry.bucket ||
+        !(*entry.bucket == bucket_info.bucket)) {
+      continue;
+    }
+
+
+  }
+
   source_zones.clear();
-  for (auto& t : targets) {
-    for (auto& r : t.second.rules) {
-      source_zones.insert(r.zone_id);
+
+#warning FIXME
+#if 0
+  if (!sync_policy ||
+      !sync_policy->pipes) {
+    return 0;
+  }
+
+  for (auto& p : *sync_policy->pipes) {
+    auto& pipe = p.second;
+
+    if (pipe.target.zone_id == zone_id) {
+      source_zones.insert(pipe.source.zone_id());
     }
   }
+#endif
+
+  return 0;
 }
 
+#if 0
+vector<rgw_bucket_sync_pipe> rgw_bucket_sync_target_info::build_pipes(const rgw_bucket& source_bs)
+{
+  vector<rgw_bucket_sync_pipe> pipes;
+
+  for (auto t : targets) {
+    rgw_bucket_sync_pipe pipe;
+    pipe.source_bs = source_bs;
+    pipe.source_prefix = t.source_prefix;
+    pipe.dest_prefix = t.dest_prefix;
+    pipes.push_back(std::move(pipe));
+  }
+  return pipes;
+}
+#endif
index fb08b6712610bce5d9125961e896729d3bc3ce43..90a3d2e27b53e6bf95a1af87fca1999718c717f6 100644 (file)
 
 #include "rgw_common.h"
 
-class JSONObj;
+class RGWSI_Zone;
 
-class RGWBucketSyncPolicy {
-public:
-  struct target;
 
-private:
-  rgw_bucket bucket; /* source bucket */
-  std::map<string, target> targets; /* map: target zone_id -> target rules */
+class RGWBucketSyncPolicyHandler {
+  RGWSI_Zone *zone_svc;
+  RGWBucketInfo bucket_info;
 
-  /* in-memory only */
   std::set<string> source_zones;
 
-  void post_init();
-
 public:
+  RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
+                             RGWBucketInfo& _bucket_info);
 
-  struct rule {
-    std::string zone_id;
-    std::string dest_bucket;
-    std::string source_obj_prefix;
-    std::string dest_obj_prefix;
-
-    void encode(bufferlist& bl) const {
-      ENCODE_START(1, 1, bl);
-      encode(zone_id, bl);
-      encode(dest_bucket, bl);
-      encode(source_obj_prefix, bl);
-      encode(dest_obj_prefix, bl);
-      ENCODE_FINISH(bl);
-    }
-
-    void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(1, bl);
-      decode(zone_id, bl);
-      decode(dest_bucket, bl);
-      decode(source_obj_prefix, bl);
-      decode(dest_obj_prefix, bl);
-      DECODE_FINISH(bl);
-    }
-
-    void dump(ceph::Formatter *f) const;
-    void decode_json(JSONObj *obj);
-  };
-
-  struct target {
-    std::string target_zone_id;
-    std::vector<rule> rules;
-
-    void encode(bufferlist& bl) const {
-      ENCODE_START(1, 1, bl);
-      encode(target_zone_id, bl);
-      encode(rules, bl);
-      ENCODE_FINISH(bl);
-    }
-
-    void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(1, bl);
-      decode(target_zone_id, bl);
-      decode(rules, bl);
-      DECODE_FINISH(bl);
-    }
-
-    void dump(ceph::Formatter *f) const;
-    void decode_json(JSONObj *obj);
-  };
-
-  void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
-    encode(bucket, bl);
-    encode(targets, bl);
-    ENCODE_FINISH(bl);
-  }
-
-  void decode(bufferlist::const_iterator& bl) {
-    DECODE_START(1, bl);
-    decode(bucket, bl);
-    decode(targets, bl);
-    post_init();
-    DECODE_FINISH(bl);
-  }
-
-  void dump(ceph::Formatter *f) const;
-  void decode_json(JSONObj *obj);
-
-  bool empty() const {
-    return targets.empty();
-  }
+  int init();
 
   bool zone_is_source(const string& zone_id) const {
     return source_zones.find(zone_id) != source_zones.end();
   }
 };
-WRITE_CLASS_ENCODER(RGWBucketSyncPolicy::rule)
-WRITE_CLASS_ENCODER(RGWBucketSyncPolicy::target)
-WRITE_CLASS_ENCODER(RGWBucketSyncPolicy)
+
index 3927900dbe8e1f5b123be76d5b8a86a5ca7224d1..9814a2779f517ea7e3f439c61df45082207c0e3c 100644 (file)
@@ -17,6 +17,7 @@
 #include "rgw_rados.h"
 #include "rgw_http_errors.h"
 #include "rgw_arn.h"
+#include "rgw_data_sync.h"
 
 #include "common/ceph_crypto.h"
 #include "common/armor.h"
@@ -1558,30 +1559,6 @@ string rgw_trim_quotes(const string& val)
   return s;
 }
 
-struct rgw_name_to_flag {
-  const char *type_name;
-  uint32_t flag;
-};
-
-static int parse_list_of_flags(struct rgw_name_to_flag *mapping,
-                               const string& str, uint32_t *perm)
-{
-  list<string> strs;
-  get_str_list(str, strs);
-  list<string>::iterator iter;
-  uint32_t v = 0;
-  for (iter = strs.begin(); iter != strs.end(); ++iter) {
-    string& s = *iter;
-    for (int i = 0; mapping[i].type_name; i++) {
-      if (s.compare(mapping[i].type_name) == 0)
-        v |= mapping[i].flag;
-    }
-  }
-
-  *perm = v;
-  return 0;
-}
-
 static struct rgw_name_to_flag cap_names[] = { {"*",     RGW_CAP_ALL},
                   {"read",  RGW_CAP_READ},
                  {"write", RGW_CAP_WRITE},
@@ -1589,7 +1566,7 @@ static struct rgw_name_to_flag cap_names[] = { {"*",     RGW_CAP_ALL},
 
 int RGWUserCaps::parse_cap_perm(const string& str, uint32_t *perm)
 {
-  return parse_list_of_flags(cap_names, str, perm);
+  return rgw_parse_list_of_flags(cap_names, str, perm);
 }
 
 int RGWUserCaps::get_cap(const string& cap, string& type, uint32_t *pperm)
@@ -1855,7 +1832,7 @@ static struct rgw_name_to_flag op_type_mapping[] = { {"*",  RGW_OP_TYPE_ALL},
 
 int rgw_parse_op_type_list(const string& str, uint32_t *perm)
 {
-  return parse_list_of_flags(op_type_mapping, str, perm);
+  return rgw_parse_list_of_flags(op_type_mapping, str, perm);
 }
 
 bool match_policy(boost::string_view pattern, boost::string_view input,
@@ -2061,9 +2038,9 @@ void RGWBucketInfo::decode(bufferlist::const_iterator& bl) {
     bool has_sync_policy;
     decode(has_sync_policy, bl);
     if (has_sync_policy) {
-      auto policy = make_shared<RGWBucketSyncPolicy>();
+      auto policy = make_shared<rgw_sync_policy_info>();
       decode(*policy, bl);
-      sync_policy = std::const_pointer_cast<const RGWBucketSyncPolicy>(policy);
+      sync_policy = std::const_pointer_cast<const rgw_sync_policy_info>(policy);
     } else {
       sync_policy.reset();
     }
@@ -2072,10 +2049,10 @@ void RGWBucketInfo::decode(bufferlist::const_iterator& bl) {
   DECODE_FINISH(bl);
 }
 
-void RGWBucketInfo::set_sync_policy(RGWBucketSyncPolicy&& policy)
+void RGWBucketInfo::set_sync_policy(rgw_sync_policy_info&& policy)
 {
-  auto shared_policy = make_shared<RGWBucketSyncPolicy>(policy);
-  sync_policy = std::const_pointer_cast<const RGWBucketSyncPolicy>(shared_policy);
+  auto shared_policy = make_shared<rgw_sync_policy_info>(policy);
+  sync_policy = std::const_pointer_cast<const rgw_sync_policy_info>(shared_policy);
 }
 
 bool RGWBucketInfo::empty_sync_policy() const
index 8c943a1658c7c60908d7830a65b70eaf10c7dbb1..12d6c369f0d0dc8bf8549d186c28037b12ba1294 100644 (file)
@@ -1400,7 +1400,7 @@ inline ostream& operator<<(ostream& out, const RGWBucketIndexType &index_type)
   }
 }
 
-struct RGWBucketSyncPolicy;
+struct rgw_sync_policy_info;
 class RGWSI_Zone;
 
 struct RGWBucketInfo {
@@ -1450,8 +1450,7 @@ struct RGWBucketInfo {
 
   RGWObjectLock obj_lock;
 
-  std::shared_ptr<const RGWBucketSyncPolicy> sync_policy;
-
+  std::shared_ptr<const rgw_sync_policy_info> sync_policy;
 
   void encode(bufferlist& bl) const;
   void decode(bufferlist::const_iterator& bl);
@@ -1473,7 +1472,7 @@ struct RGWBucketInfo {
     return swift_versioning && !versioned();
   }
 
-  void set_sync_policy(RGWBucketSyncPolicy&& policy);
+  void set_sync_policy(rgw_sync_policy_info&& policy);
 
   bool empty_sync_policy() const;
   bool bucket_is_sync_source(const string& zone_id) const;
index e7dee938792410dd1521963629b722b6fa1bacd6..e7a2c4b5019609ed936ce2ed1976e3f8c9d6ea02 100644 (file)
@@ -64,6 +64,22 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
+void rgw_sync_group_info::encode(bufferlist& bl) const
+{
+  ENCODE_START(1, 1, bl);
+  encode(id, bl);
+  encode(config, bl);
+  ENCODE_FINISH(bl);
+}
+
+void rgw_sync_group_info::decode(bufferlist::const_iterator& bl)
+{
+  DECODE_START(1, bl);
+  decode(id, bl);
+  decode(config, bl);
+  DECODE_FINISH(bl);
+}
+
 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
   static constexpr int MAX_CONCURRENT_SHARDS = 16;
 
@@ -3264,6 +3280,88 @@ int RGWBucketShardIncrementalSyncCR::operate()
   return 0;
 }
 
+class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket_shard bs;
+  rgw_bucket_source_sync_info info;
+  RGWMetaSyncEnv meta_sync_env;
+
+  const std::string status_oid;
+
+  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+
+  RGWSyncTraceNodeRef tn;
+
+public:
+  RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& _bs, const RGWSyncTraceNodeRef& _tn_parent)
+    : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(_bs),
+      status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_source",
+                                         SSTR(bucket_shard_str{bs}))) {
+  }
+  ~RGWRunBucketSyncCoroutine() override {
+    if (lease_cr) {
+      lease_cr->abort();
+    }
+  }
+
+  int operate() override;
+};
+
+int RGWRunBucketSourcesSyncCR::operate()
+{
+  reenter(this) {
+    yield {
+      set_status("acquiring sync lock");
+      auto store = sync_env->store;
+      lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
+                                              rgw_raw_obj(store->svc.zone->get_zone_params().log_pool, status_oid),
+                                              "sync_lock",
+                                              cct->_conf->rgw_sync_lease_period,
+                                              this));
+      lease_stack.reset(spawn(lease_cr.get(), false));
+    }
+    while (!lease_cr->is_locked()) {
+      if (lease_cr->is_done()) {
+        tn->log(5, "failed to take lease");
+        set_status("lease lock failed, early abort");
+        return set_cr_error(lease_cr->get_ret_status());
+      }
+      set_sleeping(true);
+      yield;
+    }
+
+    tn->log(10, "took lease");
+    yield call(new RGWReadBucketSourcesInfoCR(sync_env, bs.bucket, &info));
+    if (retcode < 0 && retcode != -ENOENT) {
+      tn->log(0, "ERROR: failed to read sync status for bucket");
+      lease_cr->go_down();
+      drain_all();
+      return set_cr_error(retcode);
+    }
+
+    if (retcode == -ENOENT) {
+      rgw_bucket_sync_pipe sync_pipe;
+      sync_pipe.init_default(bs);
+      info.pipes.push_back(sync_pipe);
+    }
+
+    yield {
+      for (auto pipe : info.pipes) {
+        spawn(new RGWRunBucketSyncCoroutine(sync_env, pipe, &tn));
+      }
+    }
+
+    lease_cr->go_down();
+    drain_all();
+    return set_cr_done();
+  }
+
+  return 0;
+}
+#endif
+
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
index f353714988d4e35ccbcfe2e4041bdd135a69a0f3..9ca1cdc7cf6df7cbad6a3970639e9e6ebb6d6ed6 100644 (file)
@@ -16,6 +16,8 @@
 #include "rgw_sync_module.h"
 #include "rgw_sync_trace.h"
 
+class JSONObj;
+
 struct rgw_bucket_sync_pipe {
   rgw_bucket_shard source_bs;
   RGWBucketInfo dest_bucket_info;
@@ -257,6 +259,136 @@ struct rgw_bucket_entry_owner {
   void decode_json(JSONObj *obj);
 };
 
+struct rgw_sync_group_info {
+  static constexpr int CONFIG_FLAG_NONE = 0x0;
+  static constexpr int CONFIG_FLAG_DR   = 0x1;
+
+  std::string id;
+
+  struct _config {
+    int flags{CONFIG_FLAG_NONE};
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      encode(flags, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(1, bl);
+      decode(flags, bl);
+      DECODE_FINISH(bl);
+    }
+
+    void dump(ceph::Formatter *f) const;
+    void decode_json(JSONObj *obj);
+  } config;
+
+  void encode(bufferlist& bl) const;
+  void decode(bufferlist::const_iterator& bl);
+
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_group_info::_config)
+WRITE_CLASS_ENCODER(rgw_sync_group_info)
+
+struct rgw_sync_instance_info {
+  std::string id;
+
+  std::optional<std::set<std::string> > sync_groups; /* name of groups entity belongs to */
+  std::optional<std::string> zone_id;
+  std::optional<rgw_bucket> bucket;
+  std::optional<std::string> obj_prefix;
+
+  struct sync_pipe {
+    string source;
+    string target_prefix;
+
+    bool operator<(const sync_pipe& rhs) const {
+      if (source == rhs.source) {
+        return (target_prefix < rhs.target_prefix);
+      }
+      return (source  < rhs.source);
+    }
+
+    void encode(bufferlist& bl) const {
+      ENCODE_START(1, 1, bl);
+      encode(source, bl);
+      encode(target_prefix, bl);
+      ENCODE_FINISH(bl);
+    }
+
+    void decode(bufferlist::const_iterator& bl) {
+      DECODE_START(1, bl);
+      decode(source, bl);
+      decode(target_prefix, bl);
+      DECODE_FINISH(bl);
+    }
+
+    void dump(ceph::Formatter *f) const;
+    void decode_json(JSONObj *obj);
+  };
+
+  std::optional<std::set<sync_pipe> > sync_from; /* optional group/entity ids */
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(id, bl);
+    encode(sync_groups, bl);
+    encode(zone_id, bl);
+    encode(bucket, bl);
+    encode(obj_prefix, bl);
+    encode(sync_from, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(id, bl);
+    decode(sync_groups, bl);
+    decode(zone_id, bl);
+    decode(bucket, bl);
+    decode(obj_prefix, bl);
+    decode(sync_from, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_instance_info::sync_pipe)
+WRITE_CLASS_ENCODER(rgw_sync_instance_info)
+
+struct rgw_sync_policy_info {
+  std::vector<rgw_sync_group_info> groups;
+  std::vector<rgw_sync_instance_info> entries;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(groups, bl);
+    encode(entries, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(groups, bl);
+    decode(entries, bl);
+    DECODE_FINISH(bl);
+  }
+
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+
+  bool empty() const {
+    return groups.empty() &&
+           entries.empty();
+  }
+
+};
+WRITE_CLASS_ENCODER(rgw_sync_policy_info)
+
 class RGWSyncErrorLogger;
 class RGWRESTConn;
 
index 09c3c894bb8b8e3e83b1b4b8c11b2b146c954400..d08cc9704095a1f0099a979b988527b099f8be33 100644 (file)
@@ -16,6 +16,7 @@
 #include "rgw_sync.h"
 #include "rgw_orphan.h"
 #include "rgw_bucket_sync.h"
+#include "rgw_tools.h"
 
 #include "common/ceph_json.h"
 #include "common/Formatter.h"
@@ -357,7 +358,8 @@ static struct rgw_flags_desc rgw_perms[] = {
  { 0, NULL }
 };
 
-static void mask_to_str(rgw_flags_desc *mask_list, uint32_t mask, char *buf, int len)
+template <class T>
+static void mask_to_str(T *mask_list, uint32_t mask, char *buf, int len)
 {
   const char *sep = "";
   int pos = 0;
@@ -368,7 +370,7 @@ static void mask_to_str(rgw_flags_desc *mask_list, uint32_t mask, char *buf, int
   while (mask) {
     uint32_t orig_mask = mask;
     for (int i = 0; mask_list[i].mask; i++) {
-      struct rgw_flags_desc *desc = &mask_list[i];
+      T *desc = &mask_list[i];
       if ((mask & desc->mask) == desc->mask) {
         pos += snprintf(buf + pos, len - pos, "%s%s", sep, desc->str);
         if (pos == len)
@@ -830,44 +832,89 @@ void RGWBucketInfo::decode_json(JSONObj *obj) {
   reshard_status = (cls_rgw_reshard_status)rs;
 }
 
-void RGWBucketSyncPolicy::rule::dump(Formatter *f) const
+void rgw_sync_group_info::dump(Formatter *f) const
 {
-  encode_json("zone_id", zone_id, f);
-  encode_json("dest_bucket", dest_bucket, f);
-  encode_json("source_obj_prefix", source_obj_prefix, f);
-  encode_json("dest_obj_prefix", dest_obj_prefix, f);
+  encode_json("id", id, f);
+  encode_json("config", config, f);
 }
 
-void RGWBucketSyncPolicy::rule::decode_json(JSONObj *obj)
+void rgw_sync_group_info::decode_json(JSONObj *obj)
 {
-  JSONDecoder::decode_json("zone_id", zone_id, obj);
-  JSONDecoder::decode_json("dest_bucket", dest_bucket, obj);
-  JSONDecoder::decode_json("source_obj_prefix", source_obj_prefix, obj);
-  JSONDecoder::decode_json("dest_obj_prefix", dest_obj_prefix, obj);
+  JSONDecoder::decode_json("id", id, obj);
+  JSONDecoder::decode_json("config", config, obj);
 }
 
-void RGWBucketSyncPolicy::target::dump(Formatter *f) const
+static struct rgw_flags_desc sync_group_flags_desc[] = {
+ { rgw_sync_group_info::CONFIG_FLAG_DR, "mirror" },
+ { rgw_sync_group_info::CONFIG_FLAG_NONE, "none" },
+ { 0, NULL }
+};
+
+void rgw_sync_group_info::_config::dump(Formatter *f) const
 {
-  encode_json("target_zone_id", target_zone_id, f);
-  encode_json("rules", rules, f);
+  char buf[256];
+  mask_to_str(sync_group_flags_desc, flags, buf, sizeof(buf));
+  encode_json("flags", (const char *)buf, f);
 }
 
-void RGWBucketSyncPolicy::target::decode_json(JSONObj *obj)
+static struct rgw_name_to_flag sync_group_flags_mapping[] = {
+                  {"mirror",  rgw_sync_group_info::CONFIG_FLAG_DR},
+                 {"none", 0},
+                 {NULL, 0} };
+
+
+
+void rgw_sync_group_info::_config::decode_json(JSONObj *obj)
 {
-  JSONDecoder::decode_json("target_zone_id", target_zone_id, obj);
-  JSONDecoder::decode_json("rules", rules, obj);
+  string s;
+  JSONDecoder::decode_json("flags", s, obj);
+  uint32_t f = 0;
+  rgw_parse_list_of_flags(sync_group_flags_mapping, s, &f);
+  flags = f;
 }
 
-void RGWBucketSyncPolicy::dump(Formatter *f) const
+void rgw_sync_instance_info::dump(Formatter *f) const
 {
+  encode_json("id", id, f);
+  encode_json("sync_groups", sync_groups, f);
+  encode_json("zone_id", zone_id, f);
   encode_json("bucket", bucket, f);
-  encode_json("targets", targets, f);
+  encode_json("obj_prefix", obj_prefix, f);
+  encode_json("sync_from", sync_from, f);
 }
 
-void RGWBucketSyncPolicy::decode_json(JSONObj *obj)
+void rgw_sync_instance_info::decode_json(JSONObj *obj)
 {
+  JSONDecoder::decode_json("id", id, obj);
+  JSONDecoder::decode_json("sync_groups", sync_groups, obj);
+  JSONDecoder::decode_json("zone_id", zone_id, obj);
   JSONDecoder::decode_json("bucket", bucket, obj);
-  JSONDecoder::decode_json("targets", targets, obj);
+  JSONDecoder::decode_json("obj_prefix", obj_prefix, obj);
+  JSONDecoder::decode_json("sync_from", sync_from, obj);
+}
+
+void rgw_sync_instance_info::sync_pipe::dump(Formatter *f) const
+{
+  encode_json("source", source, f);
+  encode_json("target_prefix", target_prefix, f);
+}
+
+void rgw_sync_instance_info::sync_pipe::decode_json(JSONObj *obj)
+{
+  JSONDecoder::decode_json("source", source, obj);
+  JSONDecoder::decode_json("target_prefix", target_prefix, obj);
+}
+
+void rgw_sync_policy_info::dump(Formatter *f) const
+{
+  encode_json("groups", groups, f);
+  encode_json("entries", entries, f);
+}
+
+void rgw_sync_policy_info::decode_json(JSONObj *obj)
+{
+  JSONDecoder::decode_json("groups", groups, obj);
+  JSONDecoder::decode_json("entries", entries, obj);
 }
 
 void rgw_obj_key::dump(Formatter *f) const
@@ -1172,6 +1219,7 @@ void RGWZoneGroup::dump(Formatter *f) const
   encode_json_map("placement_targets", placement_targets, f); /* more friendly representation */
   encode_json("default_placement", default_placement, f);
   encode_json("realm_id", realm_id, f);
+  encode_json("sync_policy", sync_policy, f);
 }
 
 static void decode_zones(map<string, RGWZone>& zones, JSONObj *o)
index 948862a591fe4f146f3fbbad0ed222a14d3344f1..b05fe0a0cc307c519fcf257042f7aa8159c6f7bf 100644 (file)
@@ -134,6 +134,25 @@ void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
   name = prefix + buf;
 }
 
+int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
+                           const string& str, uint32_t *perm)
+{
+  list<string> strs;
+  get_str_list(str, strs);
+  list<string>::iterator iter;
+  uint32_t v = 0;
+  for (iter = strs.begin(); iter != strs.end(); ++iter) {
+    string& s = *iter;
+    for (int i = 0; mapping[i].type_name; i++) {
+      if (s.compare(mapping[i].type_name) == 0)
+        v |= mapping[i].flag;
+    }
+  }
+
+  *perm = v;
+  return 0;
+}
+
 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
                        RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
 {
index ae85927689e5959c75664cfcbe9fea1d9290f370..8c0065465465ea6fea4f450f0d1eb089b683213e 100644 (file)
@@ -63,6 +63,14 @@ void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key
 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name);
 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name);
 
+struct rgw_name_to_flag {
+  const char *type_name;
+  uint32_t flag;
+};
+
+int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
+                           const string& str, uint32_t *perm);
+
 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
                        RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs = NULL);
 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,