]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: identify potential related (for sync) buckets on bucket update
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 4 Nov 2019 23:20:53 +0000 (15:20 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Generate a diff off old and new bucket infos.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
13 files changed:
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_service.cc
src/rgw/rgw_sync_module.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h
src/rgw/services/svc_bucket_sobj.cc
src/rgw/services/svc_bucket_sobj.h
src/rgw/services/svc_bucket_sync.h
src/rgw/services/svc_bucket_sync_sobj.cc
src/rgw/services/svc_bucket_sync_sobj.h

index ff2829b60696f9fa54e12d990bce07089c2c83e5..16adc9df3b4fff4956f90b02fb6b1750290b79ff 100644 (file)
@@ -3056,8 +3056,9 @@ int RGWMetadataHandlerPut_BucketInstance::put_post()
   objv_tracker = bci.info.objv_tracker;
 
   int ret = bihandler->svc.bi->init_index(bci.info);
-  if (ret < 0)
+  if (ret < 0) {
     return ret;
+  }
 
   return STATUS_APPLIED;
 }
@@ -3072,7 +3073,6 @@ public:
   }
 };
 
-
 RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
                            RGWSI_Bucket *bucket_svc,
                            RGWSI_Bucket_Sync *bucket_sync_svc,
index 520a1ad80ce8d5ae0e53c548f8cad020f91862c7..50120cd6ebe5d258b7f2cf5b727af5b6c26c3d47 100644 (file)
@@ -192,7 +192,6 @@ public:
   virtual void init(RGWSI_Zone *zone_svc,
                     RGWSI_Bucket *bucket_svc,
                     RGWSI_BucketIndex *bi_svc) = 0;
-
 };
 
 class RGWBucketMetaHandlerAllocator {
index d0011f848c25a6fcbccb5ce26b0de55a25cfefca..2bc6594e4c8051dee115e5308aef68d8970104f4 100644 (file)
@@ -2020,7 +2020,7 @@ public:
   RGWMetadataHandler *alloc_bucket_meta_handler() override {
     return RGWArchiveBucketMetaHandlerAllocator::alloc();
   }
-  RGWMetadataHandler *alloc_bucket_instance_meta_handler() override {
+  RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler() override {
     return RGWArchiveBucketInstanceMetaHandlerAllocator::alloc();
   }
 };
@@ -3638,7 +3638,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
   rgw_sync_pipe_info_set::iterator siter;
 
   rgw_bucket_sync_sources_local_info sources_local_info;
-  rgw_bucket_sync_sources_local_info expected_local_info;
+  rgw_bucket_sync_sources_local_info targets_local_info;
 
   rgw_bucket_get_sync_policy_params get_policy_params;
   std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
@@ -4009,7 +4009,7 @@ int RGWGetBucketPeersCR::operate()
       yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
                                                                               sync_env->svc->sysobj,
                                                                               RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
-                                                                              &sources_local_info));
+                                                                              &targets_local_info));
       if (retcode < 0 &&
           retcode != -ENOENT) {
         return set_cr_error(retcode);
index 0909188ae149fa7de50aa39daa59e0fc09188f84..c81817f546c2435f46fb02289ad42d73d7d2bd39 100644 (file)
@@ -81,7 +81,7 @@ int RGWServices_Def::init(CephContext *cct,
   bilog_rados->init(bi_rados.get());
   bucket_sobj->init(zone.get(), sysobj.get(), sysobj_cache.get(),
                     bi_rados.get(), meta.get(), meta_be_sobj.get(),
-                    sync_modules.get());
+                    sync_modules.get(), bucket_sync_sobj.get());
   bucket_sync_sobj->init(zone.get(), sysobj_cache.get(),
                          bucket_sobj.get());
   cls->init(zone.get(), rados.get());
index 2693166168184c14629a913a05ea2500b0f50ae9..7100646098f9b64b61fcc78f43a9cb1f071d5307 100644 (file)
@@ -22,7 +22,7 @@ RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_meta_handler()
   return RGWBucketMetaHandlerAllocator::alloc();
 }
 
-RGWMetadataHandler *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler()
+RGWBucketInstanceMetadataHandlerBase *RGWSyncModuleInstance::alloc_bucket_instance_meta_handler()
 {
   return RGWBucketInstanceMetaHandlerAllocator::alloc();
 }
index a82ba21d1c98b283487d4612b95628f1e85b2088..dfab8ee0e426fb5372c7449b4a338c2d4c67ab12 100644 (file)
@@ -39,6 +39,7 @@ public:
 
 class RGWRESTMgr;
 class RGWMetadataHandler;
+class RGWBucketInstanceMetadataHandlerBase;
 
 class RGWSyncModuleInstance {
 public:
@@ -52,7 +53,7 @@ public:
     return false;
   }
   virtual RGWMetadataHandler *alloc_bucket_meta_handler();
-  virtual RGWMetadataHandler *alloc_bucket_instance_meta_handler();
+  virtual RGWBucketInstanceMetadataHandlerBase *alloc_bucket_instance_meta_handler();
 
   // indication whether the sync module start with full sync (default behavior)
   // incremental sync would follow anyway
index d2485aa25625bf1597a8673aaf98e6b196a9e92b..5e8d24f6f8fbfccca89976bc1ba127a68aeb2fd0 100644 (file)
@@ -168,6 +168,27 @@ std::vector<rgw_sync_bucket_pipe> rgw_sync_bucket_pipes::expand() const
 }
 
 
+void rgw_sync_bucket_pipes::get_potential_related_buckets(const rgw_bucket& bucket,
+                                                          std::set<rgw_bucket> *sources,
+                                                          std::set<rgw_bucket> *dests) const
+{
+  if (dest.match_bucket(bucket)) {
+    auto expanded_sources = source.expand();
+
+    for (auto& s : expanded_sources) {
+      sources->insert(*s.bucket);
+    }
+  }
+
+  if (source.match_bucket(bucket)) {
+    auto expanded_dests = dest.expand();
+
+    for (auto& d : expanded_dests) {
+      dests->insert(*d.bucket);
+    }
+  }
+}
+
 bool rgw_sync_data_flow_group::find_symmetrical(const string& flow_id, bool create, rgw_sync_symmetric_group **flow_group)
 {
   if (!symmetrical) {
@@ -319,3 +340,22 @@ void rgw_sync_policy_group::remove_pipe(const string& pipe_id)
     }
   }
 }
+
+void rgw_sync_policy_group::get_potential_related_buckets(const rgw_bucket& bucket,
+                                                          std::set<rgw_bucket> *sources,
+                                                          std::set<rgw_bucket> *dests) const
+{
+  for (auto& pipe : pipes) {
+    pipe.get_potential_related_buckets(bucket, sources, dests);
+  }
+}
+
+void rgw_sync_policy_info::get_potential_related_buckets(const rgw_bucket& bucket,
+                                                         std::set<rgw_bucket> *sources,
+                                                         std::set<rgw_bucket> *dests) const
+{
+  for (auto& entry : groups) {
+    auto& group = entry.second;
+    group.get_potential_related_buckets(bucket, sources, dests);
+  }
+}
index 979785deb30f5936b284382e80848eb1de918ec7..1233084e2ff6780f7c5e366c0a178dfd03232aab 100644 (file)
@@ -309,6 +309,10 @@ struct rgw_sync_bucket_pipes {
   void decode_json(JSONObj *obj);
 
   std::vector<rgw_sync_bucket_pipe> expand() const;
+
+  void get_potential_related_buckets(const rgw_bucket& bucket,
+                                     std::set<rgw_bucket> *sources,
+                                     std::set<rgw_bucket> *dests) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_bucket_pipes)
 
@@ -412,6 +416,11 @@ struct rgw_sync_policy_group {
 
   bool find_pipe(const string& pipe_id, bool create, rgw_sync_bucket_pipes **pipe);
   void remove_pipe(const string& pipe_id);
+
+  void get_potential_related_buckets(const rgw_bucket& bucket,
+                                     std::set<rgw_bucket> *sources,
+                                     std::set<rgw_bucket> *dests) const;
+
 };
 WRITE_CLASS_ENCODER(rgw_sync_policy_group)
 
@@ -436,6 +445,10 @@ struct rgw_sync_policy_info {
   bool empty() const {
     return groups.empty();
   }
+
+  void get_potential_related_buckets(const rgw_bucket& bucket,
+                                     std::set<rgw_bucket> *sources,
+                                     std::set<rgw_bucket> *dests) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_policy_info)
 
index e59ecd60184b91c99b4d3f4b8c3479de0a4e805d..0b87485fdebf50ec4d21f2c2d4a6cd3034a43d08 100644 (file)
@@ -145,7 +145,8 @@ RGWSI_Bucket_SObj::~RGWSI_Bucket_SObj() {
 void RGWSI_Bucket_SObj::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
                              RGWSI_SysObj_Cache *_cache_svc, RGWSI_BucketIndex *_bi,
                              RGWSI_Meta *_meta_svc, RGWSI_MetaBackend *_meta_be_svc,
-                             RGWSI_SyncModules *_sync_modules_svc)
+                             RGWSI_SyncModules *_sync_modules_svc,
+                             RGWSI_Bucket_Sync *_bucket_sync_svc)
 {
   svc.bucket = this;
   svc.zone = _zone_svc;
@@ -155,6 +156,7 @@ void RGWSI_Bucket_SObj::init(RGWSI_Zone *_zone_svc, RGWSI_SysObj *_sysobj_svc,
   svc.meta = _meta_svc;
   svc.meta_be = _meta_be_svc;
   svc.sync_modules = _sync_modules_svc;
+  svc.bucket_sync = _bucket_sync_svc;
 }
 
 int RGWSI_Bucket_SObj::do_start()
@@ -298,6 +300,12 @@ int RGWSI_Bucket_SObj::read_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
                                   &ci, refresh_version, y);
   *info = e.info;
 
+#warning FIXME: use unique_ptr and implement RGWBucketInfo copy constructor, or other better solution
+  if (info->sync_policy) { /* fork policy off cache */
+    auto policy = make_shared<rgw_sync_policy_info>(*info->sync_policy);
+    info->sync_policy = std::const_pointer_cast<const rgw_sync_policy_info>(policy);
+  }
+
   if (ret < 0) {
     if (ret != -ENOENT) {
       lderr(cct) << "ERROR: do_read_bucket_instance_info failed: " << ret << dendl;
@@ -522,7 +530,14 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
   RGWSI_MBSObj_PutParams params(bl, pattrs, mtime, exclusive);
 
   int ret = svc.meta_be->put(ctx.get(), key, params, &info.objv_tracker, y);
-  if (ret == -EEXIST) {
+
+  if (ret >= 0) {
+    int r = svc.bucket_sync->handle_bi_update(info,
+                                              orig_info.value_or(nullptr));
+    if (r < 0) {
+      return r;
+    }
+  } else if (ret == -EEXIST) {
     /* well, if it's exclusive we shouldn't overwrite it, because we might race with another
      * bucket operation on this specific bucket (e.g., being synced from the master), but
      * since bucket instace meta object is unique for this specific bucket instace, we don't
index 868fa537f1c0999547f6753ff24fb448466b60c3..80695ba2f7384ce767b39abf2f12a6f54fa2fd70 100644 (file)
@@ -28,6 +28,7 @@ class RGWSI_SysObj;
 class RGWSI_SysObj_Cache;
 class RGWSI_Meta;
 class RGWSI_SyncModules;
+class RGWSI_Bucket_Sync;
 
 struct rgw_cache_entry_info;
 
@@ -75,6 +76,7 @@ public:
     RGWSI_Meta *meta{nullptr};
     RGWSI_MetaBackend *meta_be{nullptr};
     RGWSI_SyncModules *sync_modules{nullptr};
+    RGWSI_Bucket_Sync *bucket_sync{nullptr};
   } svc;
 
   RGWSI_Bucket_SObj(CephContext *cct);
@@ -94,7 +96,8 @@ public:
             RGWSI_BucketIndex *_bi,
             RGWSI_Meta *_meta_svc,
             RGWSI_MetaBackend *_meta_be_svc,
-           RGWSI_SyncModules *_sync_modules);
+           RGWSI_SyncModules *_sync_modules_svc,
+           RGWSI_Bucket_Sync *_bucket_sync_svc);
 
 
   int read_bucket_entrypoint_info(RGWSI_Bucket_EP_Ctx& ctx,
index 1f2a93d2c5380009c7970f64abaa2df318876922..ed709620276ab1b47575b86a17a89b52ea5f590d 100644 (file)
@@ -35,6 +35,8 @@ public:
                                  std::optional<rgw_bucket> bucket,
                                  RGWBucketSyncPolicyHandlerRef *handler,
                                  optional_yield y) = 0;
+  virtual int handle_bi_update(RGWBucketInfo& bucket_info,
+                               RGWBucketInfo *orig_bucket_info) = 0;
 };
 
 
index 71e364be35a2f80f690501473877ba99d7a0260f..935e0ea776a5f72ca56450a4f753003f307d76f3 100644 (file)
@@ -28,6 +28,7 @@ int RGWSI_Bucket_Sync_SObj::do_start()
   return 0;
 }
 
+
 int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
                                                std::optional<string> zone,
                                                std::optional<rgw_bucket> _bucket,
@@ -87,3 +88,71 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx,
   return 0;
 }
 
+static void diff_sets(std::set<rgw_bucket>& orig_set,
+                      std::set<rgw_bucket>& new_set,
+                      vector<rgw_bucket> *added,
+                      vector<rgw_bucket> *removed)
+{
+  auto oiter = orig_set.begin();
+  auto niter = new_set.begin();
+
+  while (oiter != orig_set.end() &&
+         niter != new_set.end()) {
+    if (*oiter == *niter) {
+      ++oiter;
+      ++niter;
+      continue;
+    }
+    while (*oiter < *niter) {
+      removed->push_back(*oiter);
+      ++oiter;
+    }
+    while (*niter < *oiter) {
+      added->push_back(*niter);
+      ++niter;
+    }
+  }
+  for (; oiter != orig_set.end(); ++oiter) {
+    removed->push_back(*oiter);
+  }
+  for (; niter != new_set.end(); ++niter) {
+    added->push_back(*niter);
+  }
+}
+
+int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info,
+                                             RGWBucketInfo *orig_bucket_info)
+{
+  std::set<rgw_bucket> orig_sources;
+  std::set<rgw_bucket> orig_dests;
+
+  if (orig_bucket_info &&
+      orig_bucket_info->sync_policy) {
+    orig_bucket_info->sync_policy->get_potential_related_buckets(bucket_info.bucket,
+                                                                &orig_sources,
+                                                                &orig_dests);
+  }
+
+  std::set<rgw_bucket> sources;
+  std::set<rgw_bucket> dests;
+  if (bucket_info.sync_policy) {
+    bucket_info.sync_policy->get_potential_related_buckets(bucket_info.bucket,
+                                                           &sources,
+                                                           &dests);
+  }
+
+  std::vector<rgw_bucket> removed_sources;
+  std::vector<rgw_bucket> added_sources;
+  diff_sets(orig_sources, sources, &added_sources, &removed_sources);
+  ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_sources=" << orig_sources << " new_sources=" << sources << dendl;
+  ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ":  potential sources added=" << added_sources << " removed=" << removed_sources << dendl;
+  
+  std::vector<rgw_bucket> removed_dests;
+  std::vector<rgw_bucket> added_dests;
+  diff_sets(orig_dests, dests, &added_dests, &removed_dests);
+  ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ": orig_dests=" << orig_dests << " new_dests=" << dests << dendl;
+  ldout(cct, 20) << __func__ << "(): bucket=" << bucket_info.bucket << ":  potential dests added=" << added_dests << " removed=" << removed_dests << dendl;
+
+  return 0;
+
+}
index 967cb987708036866d845c76ba9a86db25a00212..0271d9faff66b25aee96062ffed32f6a7ba0b0af 100644 (file)
@@ -39,7 +39,6 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync
   unique_ptr<RGWChainedCacheImpl_bucket_sync_policy_cache_entry> sync_policy_cache;
 
   int do_start() override;
-
 public:
   struct Svc {
     RGWSI_Zone *zone{nullptr};
@@ -60,5 +59,8 @@ public:
                          std::optional<rgw_bucket> bucket,
                          RGWBucketSyncPolicyHandlerRef *handler,
                          optional_yield y) override;
+
+  int handle_bi_update(RGWBucketInfo& bucket_info,
+                       RGWBucketInfo *orig_bucket_info) override;
 };