]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: data sync: resolve pipes by bucket sync hints
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 7 Nov 2019 19:00:18 +0000 (11:00 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc

index e040624b98ec2ffe963b4cb83d9c1b999a642daf..63109879cb20a3725c301e4cd1b5244560d52d2f 100644 (file)
@@ -226,6 +226,73 @@ class RGWSimpleAsyncCR : public RGWSimpleCoroutine {
   }
 };
 
+class RGWGenericAsyncCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  rgw::sal::RGWRadosStore *store;
+
+
+public:
+  class Action {
+  public:
+    virtual ~Action() {}
+    virtual int operate() = 0;
+  };
+
+private:
+  std::shared_ptr<Action> action;
+
+  class Request : public RGWAsyncRadosRequest {
+    std::shared_ptr<Action> action;
+  protected:
+    int _send_request() override {
+      if (!action) {
+       return 0;
+      }
+      return action->operate();
+    }
+  public:
+    Request(RGWCoroutine *caller,
+            RGWAioCompletionNotifier *cn,
+            std::shared_ptr<Action>& _action) : RGWAsyncRadosRequest(caller, cn),
+                                           action(_action) {}
+  } *req{nullptr};
+
+ public:
+  RGWGenericAsyncCR(CephContext *_cct,
+                   RGWAsyncRadosProcessor *_async_rados,
+                   std::shared_ptr<Action>& _action) : RGWSimpleCoroutine(_cct),
+                                                  async_rados(_async_rados),
+                                                  action(_action) {}
+  template<typename T>
+  RGWGenericAsyncCR(CephContext *_cct,
+                   RGWAsyncRadosProcessor *_async_rados,
+                   std::shared_ptr<T>& _action) : RGWSimpleCoroutine(_cct),
+                                                  async_rados(_async_rados),
+                                                  action(std::static_pointer_cast<Action>(_action)) {}
+
+  ~RGWGenericAsyncCR() override {
+    request_cleanup();
+  }
+  void request_cleanup() override {
+    if (req) {
+      req->finish();
+      req = NULL;
+    }
+  }
+
+  int send_request() override {
+    req = new Request(this,
+                      stack->create_completion_notifier(),
+                      action);
+
+    async_rados->queue(req);
+    return 0;
+  }
+  int request_complete() override {
+    return req->get_ret_status();
+  }
+};
+
 
 class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest {
   RGWSysObjectCtx obj_ctx;
index 2bc6594e4c8051dee115e5308aef68d8970104f4..3b28f934b82e4e975a89169a149754b9efeb93cc 100644 (file)
@@ -47,8 +47,6 @@ static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
 static string bucket_status_oid_prefix = "bucket.sync-status";
 static string object_status_oid_prefix = "bucket.sync-status";
-static string bucket_sync_sources_oid_prefix = "bucket.sync-sources";
-static string bucket_sync_targets_oid_prefix = "bucket.sync-targets";
 
 
 void rgw_datalog_info::decode_json(JSONObj *obj) {
@@ -1231,8 +1229,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   std::optional<rgw_bucket> target_bucket;
   std::optional<rgw_bucket> source_bucket;
 
-  rgw_raw_obj sources_obj;
-
   rgw_sync_pipe_info_set pipes;
   rgw_sync_pipe_info_set::iterator siter;
 
@@ -3549,24 +3545,6 @@ int RGWBucketShardIncrementalSyncCR::operate()
   return 0;
 }
 
-class RGWBucketSyncPeersManager {
-public:
-  static string sync_sources_oid(const rgw_bucket& bucket) {
-    return bucket_sync_sources_oid_prefix + "." + bucket.get_key();
-  }
-
-  static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { 
-    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket));
-  }
-  static string sync_targets_oid(const string& source_zone, const rgw_bucket& source_bucket) {
-    return bucket_sync_targets_oid_prefix + "." + source_zone + "." + source_bucket.get_key();
-  }
-
-  static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const string& source_zone, const rgw_bucket& source_bucket) { 
-    return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(source_zone, source_bucket));
-  }
-};
-
 struct rgw_bucket_sync_source_local_info {
   string id;
   string type;
@@ -3637,9 +3615,6 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
 
   rgw_sync_pipe_info_set::iterator siter;
 
-  rgw_bucket_sync_sources_local_info sources_local_info;
-  rgw_bucket_sync_sources_local_info targets_local_info;
-
   rgw_bucket_get_sync_policy_params get_policy_params;
   std::shared_ptr<rgw_bucket_get_sync_policy_result> source_policy;
   std::shared_ptr<rgw_bucket_get_sync_policy_result> target_policy;
@@ -3709,6 +3684,31 @@ class RGWGetBucketPeersCR : public RGWCoroutine {
   void update_from_target_bucket_policy();
   void update_from_source_bucket_policy();
 
+  struct GetHintTargets : public RGWGenericAsyncCR::Action {
+    RGWDataSyncEnv *sync_env;
+    rgw_bucket source_bucket;
+    std::set<rgw_bucket> targets;
+    
+    GetHintTargets(RGWDataSyncEnv *_sync_env,
+                   const rgw_bucket& _source_bucket) : sync_env(_sync_env),
+                                                       source_bucket(_source_bucket) {}
+    int operate() override {
+      int r = sync_env->svc->bucket_sync->get_bucket_sync_hints(source_bucket,
+                                                                nullptr,
+                                                                &targets,
+                                                                null_yield);
+      if (r < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): failed to fetch bucket sync hints for bucket=" << source_bucket << dendl;
+        return r;
+      }
+
+      return 0;
+    }
+  };
+
+  std::shared_ptr<GetHintTargets> get_hint_targets_action;
+  std::set<rgw_bucket>::iterator hiter;
+
 public:
   RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env,
                       std::optional<rgw_bucket> _target_bucket,
@@ -3759,7 +3759,6 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
 {
   if (target_bs) {
     target_bucket = target_bs->bucket;
-    sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket);
   }
   if (source_bs) {
     source_bucket = source_bs->bucket;
@@ -3984,17 +3983,9 @@ int RGWGetBucketPeersCR::operate()
       pipes->clear();
     }
     if (target_bucket) {
-      yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
-                                                                              sync_env->svc->sysobj,
-                                                                              RGWBucketSyncPeersManager::sync_sources_obj(sync_env->svc->zone, *target_bucket),
-                                                                              &sources_local_info));
-      if (retcode < 0 &&
-          retcode != -ENOENT) {
-        return set_cr_error(retcode);
-      }
-
       get_policy_params.zone = nullopt;
       get_policy_params.bucket = *target_bucket;
+      target_policy = make_shared<rgw_bucket_get_sync_policy_result>();
       yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
                                                      sync_env->store,
                                                      get_policy_params,
@@ -4003,18 +3994,11 @@ int RGWGetBucketPeersCR::operate()
           retcode != -ENOENT) {
         return set_cr_error(retcode);
       }
+
+      update_from_target_bucket_policy();
     }
 
     if (source_bucket && source_zone) {
-      yield call(new RGWSimpleRadosReadCR<rgw_bucket_sync_sources_local_info>(sync_env->async_rados,
-                                                                              sync_env->svc->sysobj,
-                                                                              RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket),
-                                                                              &targets_local_info));
-      if (retcode < 0 &&
-          retcode != -ENOENT) {
-        return set_cr_error(retcode);
-      }
-
       get_policy_params.zone = source_zone;
       get_policy_params.bucket = *source_bucket;
       yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
@@ -4027,10 +4011,40 @@ int RGWGetBucketPeersCR::operate()
       }
 
       source_bucket_info = source_policy->policy_handler->get_bucket_info();
+
+      if (!target_bucket) {
+        get_hint_targets_action = make_shared<GetHintTargets>(sync_env, *source_bucket);
+
+        yield call(new RGWGenericAsyncCR(cct, sync_env->async_rados,
+                                         get_hint_targets_action));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+
+        /* hints might have incomplete bucket ids,
+         * in which case we need to figure out the current
+         * bucket_id
+         */
+        for (hiter = get_hint_targets_action->targets.begin();
+             hiter != get_hint_targets_action->targets.end();
+             ++hiter) {
+          ldpp_dout(sync_env->dpp, 20) << "Got sync hint for bucket=" << *source_bucket << ": " << hiter->get_key() << dendl;
+
+          get_policy_params.zone = nullopt;
+          get_policy_params.bucket = *hiter;
+          yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados,
+                                                         sync_env->store,
+                                                         get_policy_params,
+                                                         target_policy));
+          if (retcode < 0 &&
+              retcode != -ENOENT) {
+            return set_cr_error(retcode);
+          }
+          update_from_target_bucket_policy();
+        }
+      }
     }
-#warning update local copy of sources and act on changes
 
-    update_from_target_bucket_policy();
     update_from_source_bucket_policy();
 
     for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) {