]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw-admin: add 'sync info' command
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 2 Dec 2019 23:13:28 +0000 (15:13 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:37 +0000 (10:20 -0800)
Also move code into header so that flow manager could be used in radosgw-admin

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_sync_policy.h

index f8bb4e37dbf5dee15be587324dab7fa86aa534b2..8b5baed2a299836468fc422a0d7741e4c38c4f1a 100644 (file)
@@ -85,6 +85,23 @@ static const DoutPrefixProvider* dpp() {
   return &global_dpp;
 }
 
+#define CHECK_TRUE(x, msg, err) \
+  do { \
+    if (!x) { \
+      cerr << msg << std::endl; \
+      return err; \
+    } \
+  } while (0)
+
+#define CHECK_SUCCESS(x, msg) \
+  do { \
+    int _x_val = (x); \
+    if (_x_val < 0) { \
+      cerr << msg << ": " << cpp_strerror(-_x_val) << std::endl; \
+      return _x_val; \
+    } \
+  } while (0)
+
 void usage()
 {
   cout << "usage: radosgw-admin <cmd> [options...]" << std::endl;
@@ -691,6 +708,7 @@ enum class OPT {
   GLOBAL_QUOTA_SET,
   GLOBAL_QUOTA_ENABLE,
   GLOBAL_QUOTA_DISABLE,
+  SYNC_INFO,
   SYNC_STATUS,
   ROLE_CREATE,
   ROLE_DELETE,
@@ -896,6 +914,7 @@ static SimpleCmd::Commands all_cmds = {
   { "global quota set", OPT::GLOBAL_QUOTA_SET },
   { "global quota enable", OPT::GLOBAL_QUOTA_ENABLE },
   { "global quota disable", OPT::GLOBAL_QUOTA_DISABLE },
+  { "sync info", OPT::SYNC_INFO },
   { "sync status", OPT::SYNC_STATUS },
   { "role create", OPT::ROLE_CREATE },
   { "role delete", OPT::ROLE_DELETE },
@@ -1075,6 +1094,15 @@ static int init_bucket(const string& tenant_name, const string& bucket_name, con
   return 0;
 }
 
+static int init_bucket(const rgw_bucket& b,
+                       RGWBucketInfo& bucket_info,
+                       rgw_bucket& bucket,
+                       map<string, bufferlist> *pattrs = nullptr)
+{
+  return init_bucket(b.tenant, b.name, b.bucket_id,
+                     bucket_info, bucket, pattrs);
+}
+
 static int read_input(const string& infile, bufferlist& bl)
 {
   int fd = 0;
@@ -2292,6 +2320,77 @@ static int bucket_source_sync_status(rgw::sal::RGWRadosStore *store, const RGWZo
   return 0;
 }
 
+void encode_json(const char *name, const RGWBucketSyncFlowManager::flow_map_t& m, Formatter *f)
+{
+  Formatter::ObjectSection top_section(*f, name);
+  Formatter::ArraySection as(*f, "entries");
+
+  for (auto& entry : m) {
+    Formatter::ObjectSection os(*f, "entry");
+    auto& bucket = entry.first;
+    auto& pflow = entry.second;
+
+    encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+    {
+      Formatter::ArraySection fg(*f, "flow_groups");
+      for (auto& flow_group : pflow.flow_groups) {
+        encode_json("entry", *flow_group, f);
+      }
+    }
+    encode_json("pipes", pflow.pipe, f);
+  }
+}
+
+static int sync_info(std::optional<string> opt_target_zone, std::optional<rgw_bucket> opt_bucket, Formatter *formatter)
+{
+  const RGWRealm& realm = store->svc()->zone->get_realm();
+  const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup();
+  const RGWZone& zone = store->svc()->zone->get_zone();
+
+  string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name());
+
+  RGWBucketSyncFlowManager zone_flow(zone_name, nullopt, nullptr);
+
+  zone_flow.init(zonegroup.sync_policy);
+
+  RGWBucketSyncFlowManager *flow_mgr = &zone_flow;
+
+  std::optional<RGWBucketSyncFlowManager> bucket_flow;
+
+  if (opt_bucket) {
+    rgw_bucket bucket;
+    RGWBucketInfo bucket_info;
+
+    int ret = init_bucket(*opt_bucket, bucket_info, bucket);
+    if (ret < 0) {
+      cerr << "ERROR: init_bucket failed: " << cpp_strerror(-ret) << std::endl;
+      return ret;
+    }
+
+    if (ret >= 0 &&
+        bucket_info.sync_policy) {
+      bucket_flow.emplace(zone_name, opt_bucket, &zone_flow);
+
+      bucket_flow->init(*bucket_info.sync_policy);
+
+      flow_mgr = &(*bucket_flow);
+    }
+  }
+
+  auto& sources = flow_mgr->get_sources();
+  auto& dests = flow_mgr->get_dests();
+
+  {
+    Formatter::ObjectSection os(*formatter, "result");
+    encode_json("sources", sources, formatter);
+    encode_json("dests", dests, formatter);
+  }
+
+  formatter->flush(cout);
+
+  return 0;
+}
+
 static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& info,
                               std::ostream& out)
 {
@@ -2619,14 +2718,6 @@ static bool require_non_empty_opt(std::optional<T> opt, bool extra_check = true)
   return true;
 }
 
-#define CHECK_TRUE(x, msg, err) \
-  do { \
-    if (!x) { \
-      cerr << msg << std::endl; \
-      return err; \
-    } \
-  } while (0)
-
 template <class T>
 static void show_result(T& obj,
                         Formatter *formatter,
@@ -2684,7 +2775,7 @@ public:
       return 0;
     }
 
-    ret = init_bucket(bucket->tenant, bucket->name, bucket->bucket_id, bucket_info, *bucket, &bucket_attrs);
+    ret = init_bucket(*bucket, bucket_info, *bucket, &bucket_attrs);
     if (ret < 0) {
       cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
       return ret;
@@ -2926,6 +3017,7 @@ int main(int argc, const char **argv)
   std::optional<string> opt_dest_tenant;
   std::optional<string> opt_dest_bucket_name;
   std::optional<string> opt_dest_bucket_id;
+  std::optional<string> opt_effective_zone;
 
   rgw::notify::EventTypeList event_types;
 
@@ -3317,6 +3409,8 @@ int main(int argc, const char **argv)
       opt_dest_bucket_name = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--dest-bucket-id", (char*)NULL)) {
       opt_dest_bucket_id = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--effective-zone", (char*)NULL)) {
+      opt_effective_zone = val;
     } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
       // do nothing
     } else if (strncmp(*i, "-", 1) == 0) {
@@ -3515,6 +3609,7 @@ int main(int argc, const char **argv)
                         OPT::PERIOD_GET_CURRENT,
                         OPT::PERIOD_LIST,
                         OPT::GLOBAL_QUOTA_GET,
+                        OPT::SYNC_INFO,
                         OPT::SYNC_STATUS,
                         OPT::ROLE_GET,
                         OPT::ROLE_LIST,
@@ -7280,6 +7375,10 @@ next:
     }
   }
 
+  if (opt_cmd == OPT::SYNC_INFO) {
+    sync_info(opt_effective_zone, opt_bucket, formatter);
+  }
+
   if (opt_cmd == OPT::SYNC_STATUS) {
     sync_status(formatter);
   }
index 05cb9c88ab8ed48ed7119a39ec3dd67da18d0b3c..fda52b1bdeb38aae42454f014791f057dd5240bb 100644 (file)
@@ -93,6 +93,23 @@ void rgw_sync_bucket_entity::remove_bucket(std::optional<string> tenant,
   }
 }
 
+
+string rgw_sync_bucket_entity::bucket_key(std::optional<rgw_bucket> b)
+{
+  if (!b) {
+    return string("*");
+  }
+
+  rgw_bucket _b = *b;
+
+  if (_b.name.empty()) {
+    _b.name = "*";
+  }
+
+  return _b.get_key();
+}
+
+
 bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group)
 {
   if (!symmetrical) {
@@ -235,11 +252,9 @@ static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector
                                                                const string& dest_zone)
 {
   std::vector<rgw_sync_bucket_pipe> relevant_pipes;
-  for (auto& pipe : relevant_pipes) {
-    if (pipe.source.match_zone(source_zone)) {
-      relevant_pipes.push_back(pipe);
-    }
-    if (pipe.dest.match_zone(dest_zone)) {
+  for (auto& pipe : pipes) {
+    if (pipe.source.match_zone(source_zone) &&
+        pipe.dest.match_zone(dest_zone)) {
       relevant_pipes.push_back(pipe);
     }
   }
@@ -252,216 +267,229 @@ static bool is_wildcard_bucket(const rgw_bucket& bucket)
   return bucket.name.empty();
 }
 
-struct group_pipe_map {
-  string zone;
-  std::optional<rgw_bucket> bucket;
-
-  rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
-
-  struct zone_bucket {
-    string zone; /* zone name */
-    rgw_bucket bucket; /* bucket, if empty then wildcard */
+void rgw_sync_group_pipe_map::zone_bucket::dump(ceph::Formatter *f) const
+{
+  encode_json("zone", zone, f);
+  encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+}
 
-    bool operator<(const zone_bucket& zb) const {
-      if (zone < zb.zone) {
-        return true;
-      }
-      if (zone > zb.zone) {
-        return false;
-      }
-      return (bucket < zb.bucket);
-    }
-  };
+void rgw_sync_group_pipe_map::dump(ceph::Formatter *f) const
+{
+  encode_json("zone", zone, f);
+  encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+  encode_json("sources", sources, f);
+  encode_json("dests", dests, f);
+}
 
-  using zb_pipe_map_t = std::multimap<zone_bucket, rgw_sync_bucket_pipe>;
+ostream& operator<<(ostream& os, const rgw_sync_bucket_entity& e) {
+  os << "{b=" << rgw_sync_bucket_entity::bucket_key(e.bucket) << ",z=" << e.zones.value_or(std::set<string>()) << "}";
+  return os;
+}
 
-  zb_pipe_map_t sources; /* all the pipes where zone is pulling from, by source_zone, s */
-  zb_pipe_map_t dests; /* all the pipes that pull from zone */
+ostream& operator<<(ostream& os, const rgw_sync_bucket_pipe& pipe) {
+  os << "{id=" << pipe.id << ",s=" << pipe.source << ",d=" << pipe.dest << "}";
+  return os;
+}
 
 
-  template <typename CB1, typename CB2>
-  void try_add_to_pipe_map(const string& source_zone,
-                           const string& dest_zone,
-                           const std::vector<rgw_sync_bucket_pipe>& pipes,
-                           zb_pipe_map_t *pipe_map,
-                           CB1 filter_cb,
-                           CB2 call_filter_cb) {
-    if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
-      return;
-    }
-    auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
+template <typename CB1, typename CB2>
+void rgw_sync_group_pipe_map::try_add_to_pipe_map(const string& source_zone,
+                                                  const string& dest_zone,
+                                                  const std::vector<rgw_sync_bucket_pipe>& pipes,
+                                                  zb_pipe_map_t *pipe_map,
+                                                  CB1 filter_cb,
+                                                  CB2 call_filter_cb)
+{
+  if (!filter_cb(source_zone, nullopt, dest_zone, nullopt)) {
+    return;
+  }
+  auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
 
-    for (auto& pipe : relevant_pipes) {
-      zone_bucket zb;
-      if (!call_filter_cb(pipe, &zb)) {
-        continue;
-      }
-      pipe_map->insert(make_pair(zb, pipe));
+  for (auto& pipe : relevant_pipes) {
+    zone_bucket zb;
+    if (!call_filter_cb(pipe, &zb)) {
+      continue;
     }
+    pipe_map->insert(make_pair(zb, pipe));
   }
+}
           
-  template <typename CB>
-  void try_add_source(const string& source_zone,
-                  const string& dest_zone,
-                  const std::vector<rgw_sync_bucket_pipe>& pipes,
-                  CB filter_cb)
-  {
-    return try_add_to_pipe_map(source_zone, dest_zone, pipes,
-                               &sources,
-                               filter_cb,
-                               [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
-        *zb = zone_bucket{source_zone, pipe.source.get_bucket()};
-        return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
-      });
-  }
-          
-  template <typename CB>
-  void try_add_dest(const string& source_zone,
+template <typename CB>
+void rgw_sync_group_pipe_map::try_add_source(const string& source_zone,
                   const string& dest_zone,
                   const std::vector<rgw_sync_bucket_pipe>& pipes,
                   CB filter_cb)
-  {
-    return try_add_to_pipe_map(source_zone, dest_zone, pipes,
-                               &dests,
-                               filter_cb,
-                               [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
-        *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()};
-        return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
-      });
-  }
-          
-  pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
-                                                                                const string& zone,
-                                                                                std::optional<rgw_bucket> b) {
-    if (!b) {
-      return m.equal_range(zone_bucket{zone, rgw_bucket()});
-    }
+{
+  return try_add_to_pipe_map(source_zone, dest_zone, pipes,
+                             &sources,
+                             filter_cb,
+                             [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
+                             *zb = zone_bucket{source_zone, pipe.source.get_bucket()};
+                             return filter_cb(source_zone, zb->bucket, dest_zone, pipe.dest.get_bucket());
+                             });
+}
 
-    auto zb = zone_bucket{zone, *bucket};
+template <typename CB>
+void rgw_sync_group_pipe_map::try_add_dest(const string& source_zone,
+                                           const string& dest_zone,
+                                           const std::vector<rgw_sync_bucket_pipe>& pipes,
+                                           CB filter_cb)
+{
+  return try_add_to_pipe_map(source_zone, dest_zone, pipes,
+                             &dests,
+                             filter_cb,
+                             [&](const rgw_sync_bucket_pipe& pipe, zone_bucket *zb) {
+                             *zb = zone_bucket{dest_zone, pipe.dest.get_bucket()};
+                             return filter_cb(source_zone, pipe.source.get_bucket(), dest_zone, zb->bucket);
+                             });
+}
 
-    auto range = m.equal_range(zb);
-    if (range.first == range.second &&
-        !is_wildcard_bucket(*bucket)) {
-      /* couldn't find the specific bucket, try to find by wildcard */
-      zb.bucket = rgw_bucket();
-      range = m.equal_range(zb);
-    }
+using zb_pipe_map_t = rgw_sync_group_pipe_map::zb_pipe_map_t;
 
-    return range;
+pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> rgw_sync_group_pipe_map::find_pipes(const zb_pipe_map_t& m,
+                                                                                                       const string& zone,
+                                                                                                       std::optional<rgw_bucket> b)
+{
+  if (!b) {
+    return m.equal_range(zone_bucket{zone, rgw_bucket()});
   }
 
+  auto zb = zone_bucket{zone, *bucket};
 
-  template <typename CB>
-  void init(const string& _zone,
-            std::optional<rgw_bucket> _bucket,
-            const rgw_sync_policy_group& group,
-            CB filter_cb) {
-    zone = _zone;
-    bucket = _bucket;
+  auto range = m.equal_range(zb);
+  if (range.first == range.second &&
+      !is_wildcard_bucket(*bucket)) {
+    /* couldn't find the specific bucket, try to find by wildcard */
+    zb.bucket = rgw_bucket();
+    range = m.equal_range(zb);
+  }
 
-    status = group.status;
+  return range;
+}
 
-    std::vector<rgw_sync_bucket_pipe> zone_pipes;
 
-    /* only look at pipes that touch the specific zone and bucket */
-    for (auto& pipe : group.pipes) {
-      if (pipe.contains_zone(zone) &&
-          pipe.contains_bucket(bucket)) {
-        zone_pipes.push_back(pipe);
-      }
-    }
+template <typename CB>
+void rgw_sync_group_pipe_map::init(const string& _zone,
+                                   std::optional<rgw_bucket> _bucket,
+                                   const rgw_sync_policy_group& group,
+                                   CB filter_cb) {
+  zone = _zone;
+  bucket = _bucket;
 
-    if (group.data_flow.empty()) {
-      return;
+  zone_bucket zb(zone, bucket);
+
+  status = group.status;
+
+  std::vector<rgw_sync_bucket_pipe> zone_pipes;
+
+  /* only look at pipes that touch the specific zone and bucket */
+  for (auto& pipe : group.pipes) {
+    if (pipe.contains_zone_bucket(zone, bucket)) {
+      zone_pipes.push_back(pipe);
     }
+  }
 
-    auto& flow = group.data_flow;
-
-    /* symmetrical */
-    if (flow.symmetrical) {
-      for (auto& symmetrical_group : *flow.symmetrical) {
-        if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
-          for (auto& z : symmetrical_group.zones) {
-            if (z != zone) {
-              try_add_source(z, zone, zone_pipes, filter_cb);
-              try_add_dest(zone, z, zone_pipes, filter_cb);
-            }
+  if (group.data_flow.empty()) {
+    return;
+  }
+
+  auto& flow = group.data_flow;
+
+  /* symmetrical */
+  if (flow.symmetrical) {
+    for (auto& symmetrical_group : *flow.symmetrical) {
+      if (symmetrical_group.zones.find(zone) != symmetrical_group.zones.end()) {
+        for (auto& z : symmetrical_group.zones) {
+          if (z != zone) {
+            try_add_source(z, zone, zone_pipes, filter_cb);
+            try_add_dest(zone, z, zone_pipes, filter_cb);
           }
         }
       }
     }
+  }
 
-    /* directional */
-    if (flow.directional) {
-      for (auto& rule : *flow.directional) {
-        if (rule.source_zone == zone) {
-          try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
-        } else if (rule.dest_zone == zone) {
-          try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
-        }
+  /* directional */
+  if (flow.directional) {
+    for (auto& rule : *flow.directional) {
+      if (rule.source_zone == zone) {
+        try_add_dest(zone, rule.dest_zone, zone_pipes, filter_cb);
+      } else if (rule.dest_zone == zone) {
+        try_add_source(rule.source_zone, zone, zone_pipes, filter_cb);
       }
     }
   }
+}
 
-  /*
  * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
  */
-  vector<rgw_sync_bucket_pipe> find_source_pipes(const string& source_zone,
-                                                 std::optional<rgw_bucket> source_bucket,
-                                                 std::optional<rgw_bucket> dest_bucket) {
-    vector<rgw_sync_bucket_pipe> result;
+/*
+ * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_source_pipes(const string& source_zone,
+                                                                        std::optional<rgw_bucket> source_bucket,
+                                                                        std::optional<rgw_bucket> dest_bucket) {
+  vector<rgw_sync_bucket_pipe> result;
 
-    auto range = find_pipes(sources, source_zone, source_bucket);
+  auto range = find_pipes(sources, source_zone, source_bucket);
 
-    for (auto iter = range.first; iter != range.second; ++iter) {
-      auto pipe = iter->second;
-      if (pipe.dest.match_bucket(dest_bucket)) {
-        result.push_back(pipe);
-      }
+  for (auto iter = range.first; iter != range.second; ++iter) {
+    auto pipe = iter->second;
+    if (pipe.dest.match_bucket(dest_bucket)) {
+      result.push_back(pipe);
     }
-    return std::move(result);
   }
+  return std::move(result);
+}
 
-  /*
  * find all relevant pipes in other zones that pull from a specific
  * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
  */
-  vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
-                                                 const string& dest_zone,
-                                                 std::optional<rgw_bucket> dest_bucket) {
-    vector<rgw_sync_bucket_pipe> result;
+/*
+ * find all relevant pipes in other zones that pull from a specific
+ * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_dest_pipes(std::optional<rgw_bucket> source_bucket,
+                                                                      const string& dest_zone,
+                                                                      std::optional<rgw_bucket> dest_bucket) {
+  vector<rgw_sync_bucket_pipe> result;
 
-    auto range = find_pipes(dests, dest_zone, dest_bucket);
+  auto range = find_pipes(dests, dest_zone, dest_bucket);
 
-    for (auto iter = range.first; iter != range.second; ++iter) {
-      auto pipe = iter->second;
-      if (pipe.source.match_bucket(source_bucket)) {
-        result.push_back(pipe);
-      }
+  for (auto iter = range.first; iter != range.second; ++iter) {
+    auto pipe = iter->second;
+    if (pipe.source.match_bucket(source_bucket)) {
+      result.push_back(pipe);
     }
+  }
+
+  return std::move(result);
+}
 
-    return std::move(result);
+/*
+ * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
+ */
+vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& source_zone,
+                                                                 std::optional<rgw_bucket> source_bucket,
+                                                                 const string& dest_zone,
+                                                                 std::optional<rgw_bucket> dest_bucket) {
+  if (dest_zone == zone) {
+    return find_source_pipes(source_zone, source_bucket, dest_bucket);
   }
 
-  /*
-   * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
-   */
-  vector<rgw_sync_bucket_pipe> find_pipes(const string& source_zone,
-                                          std::optional<rgw_bucket> source_bucket,
-                                          const string& dest_zone,
-                                          std::optional<rgw_bucket> dest_bucket) {
-    if (dest_zone == zone) {
-      return find_source_pipes(source_zone, source_bucket, dest_bucket);
-    }
+  if (source_zone == zone) {
+    return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
+  }
 
-    if (source_zone == zone) {
-      return find_dest_pipes(source_bucket, dest_zone, dest_bucket);
-    }
+  return vector<rgw_sync_bucket_pipe>();
+}
 
-    return vector<rgw_sync_bucket_pipe>();
+void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
+{
+  {
+    Formatter::ArraySection os(*f, "flow_groups");
+    for (auto& g : flow_groups) {
+      encode_json("group", *g, f);
+    }
   }
-};
 
+  encode_json("pipe", pipe, f);
+}
 
 bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
                                                  std::optional<rgw_bucket> source_bucket,
@@ -519,10 +547,23 @@ RGWBucketSyncFlowManager::flow_map_t::iterator RGWBucketSyncFlowManager::find_bu
 
 
 void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe,
-                                                group_pipe_map *flow_group) {
+                                                rgw_sync_group_pipe_map *flow_group) {
   auto source_bucket = pipe.source.get_bucket();
   auto dest_bucket = pipe.dest.get_bucket();
 
+  if (!flow_group->sources.empty()) {
+    auto& by_source = flow_by_source[source_bucket];
+    by_source.flow_groups.push_back(flow_group);
+    by_source.pipe.push_back(pipe);
+  }
+
+  if (!flow_group->dests.empty()) {
+    auto& by_dest = flow_by_dest[dest_bucket];
+    by_dest.flow_groups.push_back(flow_group);
+    by_dest.pipe.push_back(pipe);
+  }
+
+#if 0
   if (!bucket ||
       *bucket != source_bucket) {
     auto& by_source = flow_by_source[source_bucket];
@@ -536,6 +577,7 @@ void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipe& pipe
     by_dest.flow_groups.push_back(flow_group);
     by_dest.pipe.push_back(pipe);
   }
+#endif
 }
 
 void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
@@ -543,13 +585,13 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
     auto& group = item.second;
     auto& flow_group_map = flow_groups[group.id];
 
-    flow_group_map.init(zone_svc->zone_name(), bucket, group,
+    flow_group_map.init(zone_name, bucket, group,
                         [&](const string& source_zone,
                             std::optional<rgw_bucket> source_bucket,
                             const string& dest_zone,
                             std::optional<rgw_bucket> dest_bucket) {
                         if (!parent) {
-                        return true;
+                          return true;
                         }
                         return parent->allowed_data_flow(source_zone,
                                                          source_bucket,
@@ -573,11 +615,13 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
 }
 
 
-RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(const string& _zone_name,
                                                    std::optional<rgw_bucket> _bucket,
-                                                   RGWBucketSyncFlowManager *_parent) : zone_svc(_zone_svc),
+                                                   RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name),
                                                                                         bucket(_bucket),
                                                                                         parent(_parent) {}
+
+
 int RGWBucketSyncPolicyHandler::init()
 {
 #warning FIXME
index 6379227c7e9450cced39a9d110a97231529308e5..d9fcedbd2fd8eced62f70fead74d18ad8ba741db 100644 (file)
 #pragma once
 
 #include "rgw_common.h"
+#include "rgw_sync_policy.h"
 
 class RGWSI_Zone;
-struct group_pipe_map;
-struct rgw_sync_bucket_pipe;;
+struct rgw_sync_group_pipe_map;
+struct rgw_sync_bucket_pipe;
 struct rgw_sync_policy_info;
 
-class RGWBucketSyncFlowManager {
-  RGWSI_Zone *zone_svc;
+struct rgw_sync_group_pipe_map {
+  string zone;
   std::optional<rgw_bucket> bucket;
 
-  RGWBucketSyncFlowManager *parent{nullptr};
+  rgw_sync_policy_group::Status status{rgw_sync_policy_group::Status::FORBIDDEN};
+
+  struct zone_bucket {
+    string zone; /* zone name */
+    rgw_bucket bucket; /* bucket, if empty then wildcard */
+
+    zone_bucket() {}
+    zone_bucket(const string& _zone,
+                std::optional<rgw_bucket> _bucket) : zone(_zone),
+                                                     bucket(_bucket.value_or(rgw_bucket())) {}
 
-  map<string, group_pipe_map> flow_groups;
 
+    bool operator<(const zone_bucket& zb) const {
+      if (zone < zb.zone) {
+        return true;
+      }
+      if (zone > zb.zone) {
+        return false;
+      }
+      return (bucket < zb.bucket);
+    }
+    void dump(ceph::Formatter *f) const;
+  };
+
+  using zb_pipe_map_t = std::multimap<zone_bucket, rgw_sync_bucket_pipe>;
+
+  zb_pipe_map_t sources; /* all the pipes where zone is pulling from */
+  zb_pipe_map_t dests; /* all the pipes that pull from zone */
+
+  void dump(ceph::Formatter *f) const;
+
+  template <typename CB1, typename CB2>
+  void try_add_to_pipe_map(const string& source_zone,
+                           const string& dest_zone,
+                           const std::vector<rgw_sync_bucket_pipe>& pipes,
+                           zb_pipe_map_t *pipe_map,
+                           CB1 filter_cb,
+                           CB2 call_filter_cb);
+          
+  template <typename CB>
+  void try_add_source(const string& source_zone,
+                  const string& dest_zone,
+                  const std::vector<rgw_sync_bucket_pipe>& pipes,
+                  CB filter_cb);
+          
+  template <typename CB>
+  void try_add_dest(const string& source_zone,
+                  const string& dest_zone,
+                  const std::vector<rgw_sync_bucket_pipe>& pipes,
+                  CB filter_cb);
+          
+  pair<zb_pipe_map_t::const_iterator, zb_pipe_map_t::const_iterator> find_pipes(const zb_pipe_map_t& m,
+                                                                                const string& zone,
+                                                                                std::optional<rgw_bucket> b);
+
+  template <typename CB>
+  void init(const string& _zone,
+            std::optional<rgw_bucket> _bucket,
+            const rgw_sync_policy_group& group,
+            CB filter_cb);
+
+  /*
+   * find all relevant pipes in our zone that match {dest_bucket} <- {source_zone, source_bucket}
+   */
+  vector<rgw_sync_bucket_pipe> find_source_pipes(const string& source_zone,
+                                                 std::optional<rgw_bucket> source_bucket,
+                                                 std::optional<rgw_bucket> dest_bucket);
+
+  /*
+   * find all relevant pipes in other zones that pull from a specific
+   * source bucket in out zone {source_bucket} -> {dest_zone, dest_bucket}
+   */
+  vector<rgw_sync_bucket_pipe> find_dest_pipes(std::optional<rgw_bucket> source_bucket,
+                                                 const string& dest_zone,
+                                                 std::optional<rgw_bucket> dest_bucket);
+
+  /*
+   * find all relevant pipes from {source_zone, source_bucket} -> {dest_zone, dest_bucket}
+   */
+  vector<rgw_sync_bucket_pipe> find_pipes(const string& source_zone,
+                                          std::optional<rgw_bucket> source_bucket,
+                                          const string& dest_zone,
+                                          std::optional<rgw_bucket> dest_bucket);
+};
+
+class RGWBucketSyncFlowManager {
+public:
   struct pipe_flow {
-    vector<group_pipe_map *> flow_groups;
+    vector<rgw_sync_group_pipe_map *> flow_groups;
     vector<rgw_sync_bucket_pipe> pipe;
+
+    void dump(ceph::Formatter *f) const;
   };
 
+  using flow_map_t = map<rgw_bucket, pipe_flow>;
+
+private:
+
+  string zone_name;
+  std::optional<rgw_bucket> bucket;
+
+  RGWBucketSyncFlowManager *parent{nullptr};
+
+  map<string, rgw_sync_group_pipe_map> flow_groups;
+
   bool allowed_data_flow(const string& source_zone,
                          std::optional<rgw_bucket> source_bucket,
                          const string& dest_zone,
                          std::optional<rgw_bucket> dest_bucket,
                          bool check_activated);
 
-  using flow_map_t = map<rgw_bucket, pipe_flow>;
-
   flow_map_t flow_by_source;
   flow_map_t flow_by_dest;
 
@@ -53,15 +148,22 @@ class RGWBucketSyncFlowManager {
   flow_map_t::iterator find_bucket_flow(flow_map_t& m, std::optional<rgw_bucket> bucket);
 
   void update_flow_maps(const rgw_sync_bucket_pipe& pipe,
-                        group_pipe_map *flow_group);
+                        rgw_sync_group_pipe_map *flow_group);
 
 public:
 
-  RGWBucketSyncFlowManager(RGWSI_Zone *_zone_svc,
+  RGWBucketSyncFlowManager(const string& _zone_name,
                            std::optional<rgw_bucket> _bucket,
                            RGWBucketSyncFlowManager *_parent);
 
   void init(const rgw_sync_policy_info& sync_policy);
+
+  const flow_map_t& get_sources() {
+    return flow_by_source;
+  }
+  const flow_map_t& get_dests() {
+    return flow_by_dest;
+  }
 };
 
 class RGWBucketSyncPolicyHandler {
index c331bdf8f88050b99ea64a0b9c26ddbcca1822c0..fa7c00e12aee3e38fe9889c19c74c37457efc7cf 100644 (file)
@@ -867,17 +867,13 @@ void rgw_sync_symmetric_group::decode_json(JSONObj *obj)
 
 void rgw_sync_bucket_entity::dump(Formatter *f) const
 {
-  if (bucket) {
-    rgw_bucket b = *bucket;
-    if (b.name.empty()) {
-      b.name = "*";
-    }
-
-    encode_json("bucket", b.get_key(), f);
-  } else {
-    encode_json("bucket", "*", f);
+  encode_json("bucket", rgw_sync_bucket_entity::bucket_key(bucket), f);
+  if (zones) {
+    encode_json("zones", zones, f);
+  } else if (all_zones) {
+    set<string> z = { "*" };
+    encode_json("zones", z, f);
   }
-  encode_json("zones", zones, f);
 }
 
 void rgw_sync_bucket_entity::decode_json(JSONObj *obj)
index 37ded0181823d4dcf31d580b08b0f09084d10384..1093cef0c66d8e81a761f65249286cd5d6d2b2f3 100644 (file)
@@ -237,6 +237,7 @@ public:
     ENCODE_START(1, 1, bl);
     encode(bucket, bl);
     encode(zones, bl);
+    encode(all_zones, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -244,6 +245,7 @@ public:
     DECODE_START(1, bl);
     decode(bucket, bl);
     decode(zones, bl);
+    decode(all_zones, bl);
     DECODE_FINISH(bl);
   }
 
@@ -275,10 +277,8 @@ public:
 
   bool match_zone(const string& zone) const {
     if (all_zones) {
-      return  true;
-    }
-
-    if (!zones) { /* all zones */
+      return true;
+    } else if (!zones) {
       return false;
     }
 
@@ -288,6 +288,8 @@ public:
   rgw_bucket get_bucket() const {
     return bucket.value_or(rgw_bucket());
   }
+
+  static string bucket_key(std::optional<rgw_bucket> b);
 };
 WRITE_CLASS_ENCODER(rgw_sync_bucket_entity)
 
@@ -329,6 +331,11 @@ public:
     return (source.match_zone(zone) || dest.match_zone(zone));
   }
 
+  bool contains_zone_bucket(const string& zone, std::optional<rgw_bucket> b) const {
+    return ((source.match_zone(zone) && source.match_bucket(b)) ||
+            (dest.match_zone(zone) && dest.match_bucket(b)));
+  }
+
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);