]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: split SyncBucket from SyncBucketShard
authorCasey Bodley <cbodley@redhat.com>
Tue, 6 Oct 2020 21:59:29 +0000 (17:59 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 13 Sep 2021 16:27:48 +0000 (12:27 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 9c2790244f06d7ff8d3aaddb17c483fc6591aae6..bdedc70ee063536683613f240a57d55943ab29ba 100644 (file)
@@ -4900,13 +4900,12 @@ class RGWSyncBucketShardCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
-  rgw_bucket_sync_pipe sync_pipe;
-  rgw_bucket_shard_sync_info sync_status;
-  RGWMetaSyncEnv meta_sync_env;
-  RGWObjVersionTracker objv_tracker;
+  rgw_bucket_sync_pipe& sync_pipe;
   ceph::real_time* progress;
 
   const std::string status_oid;
+  rgw_bucket_shard_sync_info sync_status;
+  RGWObjVersionTracker objv_tracker;
 
   RGWSyncTraceNodeRef tn;
 
@@ -4914,27 +4913,18 @@ public:
   RGWSyncBucketShardCR(RGWDataSyncCtx *_sc,
                        boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                        const rgw_bucket_sync_pair_info& _sync_pair,
-                       const RGWSyncTraceNodeRef& _tn_parent,
+                       rgw_bucket_sync_pipe& sync_pipe,
+                       const RGWSyncTraceNodeRef& tn,
                        ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), sync_pipe(sync_pipe), progress(progress),
       status_oid(RGWBucketPipeSyncStatusManager::inc_status_oid(sc->source_zone, sync_pair)),
-      tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
-                                         SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
+      tn(tn) {
   }
 
   int operate(const DoutPrefixProvider *dpp) override;
 };
 
-static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
-                                          boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
-                                          const rgw_bucket_sync_pair_info& sync_pair,
-                                          const RGWSyncTraceNodeRef& tn,
-                                          ceph::real_time* progress)
-{
-  return new RGWSyncBucketShardCR(sc, std::move(lease), sync_pair, tn, progress);
-}
-
 int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
 {
   reenter(this) {
@@ -4947,24 +4937,6 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
 
     tn->log(20, SSTR("sync status for source bucket: " << sync_status.state));
 
-    yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
-                                          &sync_pipe.source_bucket_attrs, tn));
-    if (retcode < 0) {
-      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
-      drain_all();
-      return set_cr_error(retcode);
-    }
-
-    yield call(new RGWSyncGetBucketInfoCR(sync_env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info, 
-                                          &sync_pipe.dest_bucket_attrs, tn));
-    if (retcode < 0) {
-      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
-      drain_all();
-      return set_cr_error(retcode);
-    }
-
-    sync_pipe.info = sync_pair;
-
     do {
       if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
           sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
@@ -5016,6 +4988,110 @@ int RGWSyncBucketShardCR::operate(const DoutPrefixProvider *dpp)
   return 0;
 }
 
+class RGWSyncBucketCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *env;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
+  rgw_bucket_sync_pair_info sync_pair;
+  rgw_bucket_sync_pipe sync_pipe;
+  ceph::real_time* progress;
+
+  const rgw_raw_obj status_obj;
+  rgw_bucket_sync_status bucket_status;
+  RGWObjVersionTracker objv;
+
+  RGWSyncTraceNodeRef tn;
+
+public:
+  RGWSyncBucketCR(RGWDataSyncCtx *_sc,
+                  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+                  const rgw_bucket_sync_pair_info& _sync_pair,
+                  const RGWSyncTraceNodeRef& _tn_parent,
+                  ceph::real_time* progress)
+    : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
+      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+      status_obj(env->svc->zone->get_zone_params().log_pool,
+                 RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
+                                                                 sync_pair.source_bs.bucket,
+                                                                 sync_pair.dest_bs.bucket)),
+      tn(env->sync_tracer->add_node(_tn_parent, "bucket",
+                                    SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
+  }
+
+  int operate(const DoutPrefixProvider *dpp) override;
+};
+
+static RGWCoroutine* sync_bucket_shard_cr(RGWDataSyncCtx* sc,
+                                          boost::intrusive_ptr<const RGWContinuousLeaseCR> lease,
+                                          const rgw_bucket_sync_pair_info& sync_pair,
+                                          const RGWSyncTraceNodeRef& tn,
+                                          ceph::real_time* progress)
+{
+  return new RGWSyncBucketCR(sc, std::move(lease), sync_pair, tn, progress);
+}
+
+int RGWSyncBucketCR::operate(const DoutPrefixProvider *dpp)
+{
+  reenter(this) {
+    // read source/destination bucket info
+    yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.source_bs.bucket, &sync_pipe.source_bucket_info,
+                                          &sync_pipe.source_bucket_attrs, tn));
+    if (retcode < 0) {
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
+      return set_cr_error(retcode);
+    }
+
+    yield call(new RGWSyncGetBucketInfoCR(env, sync_pair.dest_bs.bucket, &sync_pipe.dest_bucket_info,
+                                          &sync_pipe.dest_bucket_attrs, tn));
+    if (retcode < 0) {
+      tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
+      return set_cr_error(retcode);
+    }
+
+    sync_pipe.info = sync_pair;
+
+    // read bucket sync status
+    using CR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+    yield call(new CR(dpp, env->async_rados, env->svc->sysobj,
+                      status_obj, &bucket_status, true, &objv));
+    if (retcode < 0) {
+      return set_cr_error(retcode);
+    }
+
+    if (bucket_status.state == BucketSyncState::Init) {
+      // init sync status
+      yield {
+       // use exclusive create if it didn't exist. if we lose the race to
+       // create it, we'll fail with EEXIST and RGWSyncBucketCR() will come
+       // back later and read the new status
+       const bool exclusive = objv.version_for_check() == nullptr;
+        int num_shards = sync_pipe.dest_bucket_info.layout.current_index.layout.normal.num_shards;
+        call(new InitBucketFullSyncStatusCR(sc, sync_pair, status_obj,
+                                            bucket_status, objv, num_shards,
+                                            exclusive));
+      }
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+    }
+
+    if (bucket_status.state == BucketSyncState::Full) {
+      // TODO: full sync
+    }
+
+    if (bucket_status.state == BucketSyncState::Incremental) {
+      yield call(new RGWSyncBucketShardCR(sc, lease_cr, sync_pair,
+                                          sync_pipe, tn, progress));
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+    }
+    return set_cr_done();
+  }
+
+  return 0;
+}
+
 RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
 {
   if ((size_t)num >= sync_pairs.size()) {