]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: use hints when determining sync pipes
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 8 Nov 2019 00:56:52 +0000 (16:56 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_common.h
src/rgw/rgw_data_sync.cc
src/rgw/services/svc_bucket_sync_sobj.cc
src/rgw/services/svc_bucket_sync_sobj.h

index ce206281146d1babe72056632ae0c6cfd427f902..0df1d90ec15b318fe228356e8e4788a4bcb0eba6 100644 (file)
@@ -1178,6 +1178,14 @@ struct rgw_bucket {
   rgw_bucket(const rgw_bucket&) = default;
   rgw_bucket(rgw_bucket&&) = default;
 
+  bool match(const rgw_bucket& b) const {
+    return (tenant == b.tenant &&
+           name == b.name &&
+           (bucket_id == b.bucket_id ||
+            bucket_id.empty() ||
+            b.bucket_id.empty()));
+  }
+
   void convert(cls_user_bucket *b) const {
     b->name = name;
     b->marker = marker;
index 3b28f934b82e4e975a89169a149754b9efeb93cc..4455a48538bff0b7eccf6554686d1a303756e931 100644 (file)
@@ -3650,7 +3650,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
           continue;
         }
         if (source_bucket &&
-            *source_bucket != *pipe.source.bucket) {
+            !source_bucket->match(*pipe.source.bucket)) {
           continue;
         }
         ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
@@ -3671,7 +3671,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
       for (auto& pipe : i->second.pipes) {
         if (target_bucket &&
             pipe.dest.bucket &&
-            *target_bucket != *pipe.dest.bucket) {
+            !target_bucket->match(*pipe.dest.bucket)) {
           ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
           continue;
         }
@@ -3726,12 +3726,6 @@ public:
                                          SSTR( "target=" << target_bucket.value_or(rgw_bucket())
                                                << ":source=" << target_bucket.value_or(rgw_bucket())
                                                << ":source_zone=" << source_zone.value_or("*")))) {
-        if (target_bucket) {
-          target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
-        }
-        if (source_bucket) {
-          source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
-        }
       }
 
   int operate() override;
@@ -4001,6 +3995,7 @@ int RGWGetBucketPeersCR::operate()
     if (source_bucket && source_zone) {
       get_policy_params.zone = source_zone;
       get_policy_params.bucket = *source_bucket;
+      source_policy = make_shared<rgw_bucket_get_sync_policy_result>();
       yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                      sync_env->store,
                                                      get_policy_params,
@@ -4032,6 +4027,7 @@ int RGWGetBucketPeersCR::operate()
 
           get_policy_params.zone = nullopt;
           get_policy_params.bucket = *hiter;
+          target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
           yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                          sync_env->store,
                                                          get_policy_params,
index 0244b57c95b7e75672fe649c597bc24a5156453b..0f488c1cc3e2acf58174bc924e738fa2e73e9107 100644 (file)
 static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints";
 static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints";
 
+class RGWSI_Bucket_Sync_SObj_HintIndexManager {
+  CephContext *cct;
+
+  struct {
+    RGWSI_Zone *zone;
+    RGWSI_SysObj *sysobj;
+  } svc;
+
+public:
+  RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc,
+                                          RGWSI_SysObj *_sysobj_svc) {
+    svc.zone = _zone_svc;
+    svc.sysobj = _sysobj_svc;
+
+    cct = svc.zone->ctx();
+  }
+
+  rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
+  rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
+
+  template <typename C1, typename C2>
+  int update_hints(const RGWBucketInfo& bucket_info,
+                   C1& added_dests,
+                   C2& removed_dests,
+                   C1& added_sources,
+                   C2& removed_sources,
+                   optional_yield y);
+};
+
+RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {
+}
 RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() {
 }
 
@@ -24,8 +55,7 @@ void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc,
   svc.cache = _cache_svc;
   svc.bucket_sobj = bucket_sobj_svc;
 
-  hint_index_mgr.init(svc.zone,
-                      svc.sysobj);
+  hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj));
 }
 
 int RGWSI_Bucket_Sync_SObj::do_start()
@@ -117,11 +147,13 @@ static bool diff_sets(std::set<rgw_bucket>& orig_set,
       ++niter;
       continue;
     }
-    while (*oiter < *niter) {
+    while (*oiter < *niter &&
+          oiter != orig_set.end()) {
       removed->push_back(*oiter);
       ++oiter;
     }
-    while (*niter < *oiter) {
+    while (*niter < *oiter
+          && niter != new_set.end()) {
       added->push_back(*niter);
       ++niter;
     }
@@ -319,17 +351,19 @@ public:
     svc.sysobj = _sysobj_svc;
   }
 
+  template <typename C1, typename C2>
   int update(const rgw_bucket& entity,
              const RGWBucketInfo& info_source,
-             std::optional<std::vector<rgw_bucket> > add,
-             std::optional<std::vector<rgw_bucket> > remove,
+             C1 *add,
+             C2 *remove,
              optional_yield y);
 
 private:
+  template <typename C1, typename C2>
   void update_entries(const rgw_bucket& info_source,
                       const obj_version& info_source_ver,
-                      std::optional<std::vector<rgw_bucket> > add,
-                      std::optional<std::vector<rgw_bucket> > remove,
+                      C1 *add,
+                      C2 *remove,
                       single_instance_info *instance);
 
   int read(optional_yield y);
@@ -349,10 +383,11 @@ WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry)
 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info)
 WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map)
 
+template <typename C1, typename C2>
 int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity,
                                        const RGWBucketInfo& info_source,
-                                       std::optional<std::vector<rgw_bucket> > add,
-                                       std::optional<std::vector<rgw_bucket> > remove,
+                                       C1 *add,
+                                       C2 *remove,
                                        optional_yield y)
 {
   int r = 0;
@@ -396,10 +431,11 @@ int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity,
   return -EIO;
 }
 
+template <typename C1, typename C2>
 void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source,
                                                 const obj_version& info_source_ver,
-                                                std::optional<std::vector<rgw_bucket> > add,
-                                                std::optional<std::vector<rgw_bucket> > remove,
+                                                C1 *add,
+                                                C2 *remove,
                                                 single_instance_info *instance)
 {
   if (remove) {
@@ -468,7 +504,7 @@ int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) {
   return 0;
 }
 
-rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
+rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const
 {
   rgw_bucket b = bucket;
   b.bucket_id.clear();
@@ -476,7 +512,7 @@ rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_
                      bucket_sync_sources_oid_prefix + "." + b.get_key());
 }
 
-rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
+rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const
 {
   rgw_bucket b = bucket;
   b.bucket_id.clear();
@@ -484,39 +520,39 @@ rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bu
                      bucket_sync_targets_oid_prefix + "." + b.get_key());
 }
 
-int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
-                                            std::vector<rgw_bucket>& added_dests,
-                                            std::vector<rgw_bucket>& removed_dests,
-                                            std::vector<rgw_bucket>& added_sources,
-                                            std::vector<rgw_bucket>& removed_sources,
-                                            optional_yield y)
+template <typename C1, typename C2>
+int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const RGWBucketInfo& bucket_info,
+                                                          C1& added_dests,
+                                                          C2& removed_dests,
+                                                          C1& added_sources,
+                                                          C2& removed_sources,
+                                                          optional_yield y)
 {
-  std::vector<rgw_bucket> self_entity;
-  self_entity.push_back(bucket_info.bucket);
+  C1 self_entity = { bucket_info.bucket };
 
   if (!added_dests.empty() ||
       !removed_dests.empty()) {
     /* update our dests */
     RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
-                                     hint_index_mgr.get_dests_obj(bucket_info.bucket));
+                                     get_dests_obj(bucket_info.bucket));
     int r = index.update(bucket_info.bucket,
                          bucket_info,
-                         added_dests,
-                         removed_dests,
+                         &added_dests,
+                         &removed_dests,
                          y);
     if (r < 0) {
       ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
       return r;
     }
 
-    /* update added dest buckets */
+    /* update dest buckets */
     for (auto& dest_bucket : added_dests) {
       RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
-                                           hint_index_mgr.get_sources_obj(dest_bucket));
+                                           get_sources_obj(dest_bucket));
       int r = dep_index.update(dest_bucket,
                                bucket_info,
-                               self_entity,
-                               std::nullopt,
+                               &self_entity,
+                               static_cast<C2 *>(nullptr),
                                y);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
@@ -526,11 +562,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
     /* update removed dest buckets */
     for (auto& dest_bucket : removed_dests) {
       RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
-                                           hint_index_mgr.get_sources_obj(dest_bucket));
+                                           get_sources_obj(dest_bucket));
       int r = dep_index.update(dest_bucket,
                                bucket_info,
-                               std::nullopt,
-                               self_entity,
+                               static_cast<C1 *>(nullptr),
+                               &self_entity,
                                y);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl;
@@ -539,15 +575,15 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
     }
   }
 
-  if (!added_dests.empty() ||
-      !removed_dests.empty()) {
+  if (!added_sources.empty() ||
+      !removed_sources.empty()) {
     RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
-                                     hint_index_mgr.get_sources_obj(bucket_info.bucket));
+                                     get_sources_obj(bucket_info.bucket));
     /* update our sources */
     int r = index.update(bucket_info.bucket,
                          bucket_info,
-                         added_sources,
-                         removed_sources,
+                         &added_sources,
+                         &removed_sources,
                          y);
     if (r < 0) {
       ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl;
@@ -557,11 +593,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
     /* update added sources buckets */
     for (auto& source_bucket : added_sources) {
       RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
-                                           hint_index_mgr.get_dests_obj(source_bucket));
+                                           get_dests_obj(source_bucket));
       int r = dep_index.update(source_bucket,
                                bucket_info,
-                               self_entity,
-                               std::nullopt,
+                               &self_entity,
+                               static_cast<C2 *>(nullptr),
                                y);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
@@ -571,11 +607,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info,
     /* update removed dest buckets */
     for (auto& source_bucket : removed_sources) {
       RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj,
-                                           hint_index_mgr.get_dests_obj(source_bucket));
+                                           get_dests_obj(source_bucket));
       int r = dep_index.update(source_bucket,
                                bucket_info,
-                               std::nullopt,
-                               self_entity,
+                               static_cast<C1 *>(nullptr),
+                               &self_entity,
                                y);
       if (r < 0) {
         ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl;
@@ -614,12 +650,12 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info,
   std::vector<rgw_bucket> added_sources;
   std::vector<rgw_bucket> added_dests;
 
-  return do_update_hints(bucket_info,
-                         added_dests,
-                         removed_dests,
-                         added_sources,
-                         removed_sources,
-                         y);
+  return hint_index_mgr->update_hints(bucket_info,
+                                      added_dests,
+                                      removed_dests,
+                                      added_sources,
+                                      removed_sources,
+                                      y);
 }
 
 int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
@@ -661,12 +697,12 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
     return 0;
   }
 
-  return do_update_hints(bucket_info,
-                         added_dests,
-                         removed_dests,
-                         added_sources,
-                         removed_sources,
-                         y);
+  return hint_index_mgr->update_hints(bucket_info,
+                                      dests, /* set all dests, not just the ones that were added */
+                                      removed_dests,
+                                      sources, /* set all sources, not just that the ones that were added */
+                                      removed_sources,
+                                      y);
 }
 
 int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
@@ -680,7 +716,7 @@ int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
 
   if (sources) {
     RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
-                                     hint_index_mgr.get_sources_obj(bucket));
+                                     hint_index_mgr->get_sources_obj(bucket));
     int r = index.read(y);
     if (r < 0) {
       ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl;
@@ -698,7 +734,7 @@ int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket,
 
   if (dests) {
     RGWSI_BS_SObj_HintIndexObj index(svc.sysobj,
-                                     hint_index_mgr.get_dests_obj(bucket));
+                                     hint_index_mgr->get_dests_obj(bucket));
     int r = index.read(y);
     if (r < 0) {
       ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl;
index a0305e12b4d93e7afeab2ec81c63ea6cd79a8d2c..02fa66b01c13cfc9f75b0983df22843fee3480a3 100644 (file)
@@ -29,6 +29,8 @@ class RGWSI_Bucket_SObj;
 template <class T>
 class RGWChainedCacheImpl;
 
+class RGWSI_Bucket_Sync_SObj_HintIndexManager;
+
 class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
 {
   struct bucket_sync_policy_cache_entry {
@@ -38,33 +40,10 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
   using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl<bucket_sync_policy_cache_entry>;
   unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
 
-  class HintIndexManager {
-    struct {
-      RGWSI_Zone *zone;
-      RGWSI_SysObj *sysobj;
-    } svc;
-
-  public:
-    HintIndexManager() {}
-
-    void init(RGWSI_Zone *_zone_svc,
-              RGWSI_SysObj *_sysobj_svc) {
-      svc.zone = _zone_svc;
-      svc.sysobj = _sysobj_svc;
-    }
-
-    rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const;
-    rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const;
-  } hint_index_mgr;
+  std::unique_ptr<RGWSI_Bucket_Sync_SObj_HintIndexManager> hint_index_mgr;
 
   int do_start() override;
 
-  int do_update_hints(const RGWBucketInfo& bucket_info,
-                      std::vector<rgw_bucket>& added_dests,
-                      std::vector<rgw_bucket>& removed_dests,
-                      std::vector<rgw_bucket>& added_sources,
-                      std::vector<rgw_bucket>& removed_sources,
-                      optional_yield y);
 public:
   struct Svc {
     RGWSI_Zone *zone{nullptr};
@@ -73,7 +52,7 @@ public:
     RGWSI_Bucket_SObj *bucket_sobj{nullptr};
   } svc;
 
-  RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {}
+  RGWSI_Bucket_Sync_SObj(CephContext *cct);
   ~RGWSI_Bucket_Sync_SObj();
 
   void init(RGWSI_Zone *_zone_svc,