]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: resolve hints when initializing policy handler
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 14 Nov 2019 03:26:33 +0000 (19:26 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket_sync.h
src/rgw/services/svc_bucket_sync.h
src/rgw/services/svc_bucket_sync_sobj.cc
src/rgw/services/svc_bucket_sync_sobj.h

index 6dcb75bf32c97d189b626413be2e032c8ce1a188..a2d4580cbaf46f2f7f03baa42b77755ff73d0865 100644 (file)
@@ -64,10 +64,7 @@ bool rgw_zone_set::exists(const string& zone, std::optional<string> location_key
 
 void encode_json(const char *name, const rgw_zone_set& zs, ceph::Formatter *f)
 {
-  Formatter::ArraySection as(*f, name);
-  for (auto& e : zs.entries) {
-    encode_json("entry", e, f);
-  }
+  encode_json(name, zs.entries, f);
 }
 
 void decode_json_obj(rgw_zone_set& zs, JSONObj *obj)
index 8cc5c823bfc014a8d78ea46bca988dc3ff940286..db8a3bcd18a6f0f949f03f6029ee95f5f73d44b4 100644 (file)
@@ -67,7 +67,6 @@ struct rgw_zone_set {
     /* no DECODE_START, DECODE_END for backward compatibility */
     ceph::decode(entries, bl);
   }
-  void decode_json(JSONObj *obj);
 
   void insert(const string& zone, std::optional<string> location_key);
   bool exists(const string& zone, std::optional<string> location_key) const;
index a433b30ba18f0eedf6673b07872b5e86a9e772f1..4086f0c858b4ab67ed6ddeaef317b8c88ce37720 100644 (file)
@@ -3669,7 +3669,7 @@ int RGWBucketCtl::get_sync_policy_handler(std::optional<string> zone,
                                           optional_yield y)
 {
   int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
-    return svc.bucket_sync->get_policy_handler(ctx.bi, zone, bucket, phandler, y);
+    return svc.bucket_sync->get_policy_handler(ctx, zone, bucket, phandler, y);
   });
   if (r < 0) {
     ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
index 23446bffadda50b61b72be9a9da04436f6a2c94c..8f8ea1cb32d2527c51293411ac934dbf75b7af34 100644 (file)
@@ -277,6 +277,9 @@ class RGWBucketSyncPolicyHandler {
 
   std::set<rgw_bucket> source_hints;
   std::set<rgw_bucket> target_hints;
+  std::set<rgw_sync_bucket_pipe> resolved_sources;
+  std::set<rgw_sync_bucket_pipe> resolved_dests;
+
 
   bool bucket_is_sync_source() const {
     return !targets.empty();
@@ -312,6 +315,20 @@ public:
                std::set<string> *ptarget_zones,
                bool only_enabled) const;
 
+  void set_resolved_hints(std::set<rgw_sync_bucket_pipe>&& _resolved_sources,
+                          std::set<rgw_sync_bucket_pipe>&& _resolved_dests) {
+    resolved_sources = std::move(_resolved_sources);
+    resolved_dests = std::move(_resolved_dests);
+  }
+
+  const std::set<rgw_sync_bucket_pipe>& get_resolved_source_hints() {
+    return resolved_sources;
+  }
+
+  const std::set<rgw_sync_bucket_pipe>& get_resolved_dest_hints() {
+    return resolved_dests;
+  }
+
   const std::set<string>& get_source_zones() const {
     return source_zones;
   }
index 250e874f0068501d3ad9ee8bf273e79cb5c8300e..c0d6bdd9a8a35253bde0b1b5cb9ada7aa423698c 100644 (file)
@@ -30,7 +30,7 @@ class RGWSI_Bucket_Sync : public RGWServiceInstance
 public:
   RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {}
 
-  virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+  virtual int get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
                                  std::optional<string> zone,
                                  std::optional<rgw_bucket> bucket,
                                  RGWBucketSyncPolicyHandlerRef *handler,
index 0f488c1cc3e2acf58174bc924e738fa2e73e9107..31fa3ca1539527bdcd5b1df82ad85dec19ef0c9c 100644 (file)
@@ -5,6 +5,7 @@
 
 #include "rgw/rgw_bucket_sync.h"
 #include "rgw/rgw_zone.h"
+#include "rgw/rgw_sync_policy.h"
 
 #define dout_subsys ceph_subsys_rgw
 
@@ -66,12 +67,97 @@ int RGWSI_Bucket_Sync_SObj::do_start()
   return 0;
 }
 
-
-int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
-                                               std::optional<string> zone,
-                                               std::optional<rgw_bucket> _bucket,
-                                               RGWBucketSyncPolicyHandlerRef *handler,
+void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
+                                               const std::set<string>& zone_names,
+                                               const std::set<rgw_bucket>& buckets,
+                                               std::set<rgw_sync_bucket_entity> *hint_entities,
                                                optional_yield y)
+{
+  for (auto& zone : zone_names) {
+    for (auto& b : buckets) {
+      string zid;
+      if (!svc.zone->find_zone_id_by_name(zone, &zid)) {
+       cerr << "WARNING: cannot find zone id for zone=" << zone << ", skippping" << std::endl;
+       continue;
+      }
+
+      RGWBucketInfo hint_bucket_info;
+      int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info,
+                                                  nullptr, nullptr, boost::none,
+                                                  y);
+      if (ret < 0) {
+       ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl;
+       continue;
+      }
+
+      hint_entities->insert(rgw_sync_bucket_entity(zid, hint_bucket_info.bucket));
+    }
+  }
+}
+
+int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
+                                                 rgw_sync_bucket_entity& self_entity,
+                                                 RGWBucketSyncPolicyHandlerRef& handler,
+                                                 RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
+                                                 std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+                                                 optional_yield y)
+{
+  set<string> source_zones;
+  set<string> target_zones;
+
+  zone_policy_handler->reflect(nullptr, nullptr,
+                               nullptr, nullptr,
+                               &source_zones,
+                               &target_zones,
+                               false); /* relaxed: also get all zones that we allow to sync to/from */
+
+  std::set<rgw_sync_bucket_entity> hint_entities;
+
+  get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y);
+  get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y);
+
+  std::set<rgw_sync_bucket_pipe> resolved_sources;
+  std::set<rgw_sync_bucket_pipe> resolved_dests;
+
+  for (auto& hint_entity : hint_entities) {
+    if (!hint_entity.zone ||
+       !hint_entity.bucket) {
+      continue; /* shouldn't really happen */
+    }
+
+    auto& zid = *hint_entity.zone;
+    auto& hint_bucket = *hint_entity.bucket;
+
+    RGWBucketSyncPolicyHandlerRef hint_bucket_handler;
+
+    auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket));
+    if (iter != temp_map.end()) {
+      hint_bucket_handler = iter->second;
+    } else {
+      int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y);
+      if (r < 0) {
+        ldout(cct, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl;
+        continue;
+      }
+    }
+
+    hint_bucket_handler->get_pipes(&resolved_dests,
+                                   &resolved_sources,
+                                   self_entity); /* flipping resolved dests and sources as these are
+                                                    relative to the remote entity */
+  }
+
+  handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests));
+
+  return 0;
+}
+
+int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+                                                  std::optional<string> zone,
+                                                  std::optional<rgw_bucket> _bucket,
+                                                  std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+                                                  RGWBucketSyncPolicyHandlerRef *handler,
+                                                  optional_yield y)
 {
   if (!_bucket) {
     *handler = svc.zone->get_sync_policy_handler(zone);
@@ -101,7 +187,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
 
   RGWBucketInfo bucket_info;
 
-  int r = svc.bucket_sobj->read_bucket_instance_info(ctx,
+  int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi,
                                                      bucket_key,
                                                      &bucket_info,
                                                      nullptr,
@@ -115,7 +201,9 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
     return r;
   }
 
-  e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info));
+  auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone);
+
+  e.handler.reset(zone_policy_handler->alloc_child(bucket_info));
 
   r = e.handler->init(y);
   if (r < 0) {
@@ -123,6 +211,19 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
     return r;
   }
 
+  temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler);
+
+  rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket);
+
+  r = resolve_policy_hints(ctx, self_entity,
+                           e.handler,
+                           zone_policy_handler,
+                           temp_map, y);
+  if (r < 0) {
+    ldout(cct, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl;
+    return r;
+  }
+
   if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) {
     ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl;
   }
@@ -132,6 +233,16 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
   return 0;
 }
 
+int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+                                               std::optional<string> zone,
+                                               std::optional<rgw_bucket> _bucket,
+                                               RGWBucketSyncPolicyHandlerRef *handler,
+                                               optional_yield y)
+{
+  std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef> temp_map;
+  return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y);
+}
+
 static bool diff_sets(std::set<rgw_bucket>& orig_set,
                       std::set<rgw_bucket>& new_set,
                       vector<rgw_bucket> *added,
index 02fa66b01c13cfc9f75b0983df22843fee3480a3..ae497f3b580c8e61b60a9ccf71e31138975722fd 100644 (file)
@@ -31,6 +31,8 @@ class RGWChainedCacheImpl;
 
 class RGWSI_Bucket_Sync_SObj_HintIndexManager;
 
+struct rgw_sync_bucket_entity;
+
 class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
 {
   struct bucket_sync_policy_cache_entry {
@@ -44,6 +46,41 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
 
   int do_start() override;
 
+  struct optional_zone_bucket {
+    optional<string> zone;
+    optional<rgw_bucket> bucket;
+
+    optional_zone_bucket(const optional<string>& _zone,
+                         const optional<rgw_bucket>& _bucket) : zone(_zone), bucket(_bucket) {}
+
+    bool operator<(const optional_zone_bucket& ozb) const {
+      if (zone < ozb.zone) {
+        return true;
+      }
+      if (zone > ozb.zone) {
+        return false;
+      }
+      return bucket < ozb.bucket;
+    }
+  };
+
+  void get_hint_entities(RGWSI_Bucket_X_Ctx& ctx,
+                         const std::set<string>& zone_names,
+                         const std::set<rgw_bucket>& buckets,
+                         std::set<rgw_sync_bucket_entity> *hint_entities,
+                         optional_yield y);
+  int resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx,
+                           rgw_sync_bucket_entity& self_entity,
+                           RGWBucketSyncPolicyHandlerRef& handler,
+                           RGWBucketSyncPolicyHandlerRef& zone_policy_handler,
+                           std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+                           optional_yield y);
+  int do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
+                            std::optional<string> zone,
+                            std::optional<rgw_bucket> _bucket,
+                            std::map<optional_zone_bucket, RGWBucketSyncPolicyHandlerRef>& temp_map,
+                            RGWBucketSyncPolicyHandlerRef *handler,
+                            optional_yield y);
 public:
   struct Svc {
     RGWSI_Zone *zone{nullptr};
@@ -61,11 +98,11 @@ public:
             RGWSI_Bucket_SObj *_bucket_sobj_svc);
 
 
-  int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
+  int get_policy_handler(RGWSI_Bucket_X_Ctx& ctx,
                          std::optional<string> zone,
                          std::optional<rgw_bucket> bucket,
                          RGWBucketSyncPolicyHandlerRef *handler,
-                         optional_yield y) override;
+                         optional_yield y);
 
   int handle_bi_update(RGWBucketInfo& bucket_info,
                        RGWBucketInfo *orig_bucket_info,