]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: RGWSyncBucketCR holds lease over Init state too
authorCasey Bodley <cbodley@redhat.com>
Tue, 10 Nov 2020 20:00:40 +0000 (15:00 -0500)
committerCasey Bodley <cbodley@redhat.com>
Thu, 4 Feb 2021 21:11:30 +0000 (16:11 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 8a23b363a8b72039a1cae498711005882136a877..5ab040b999e821246e6f3f60cab7b4dad4aa4d9e 100644 (file)
@@ -4956,6 +4956,8 @@ class RGWSyncBucketCR : public RGWCoroutine {
   rgw_bucket_sync_pipe sync_pipe;
   ceph::real_time* progress;
 
+  const std::string lock_name = "bucket sync";
+  const uint32_t lock_duration;
   const rgw_raw_obj status_obj;
   rgw_bucket_sync_status bucket_status;
   RGWObjVersionTracker objv;
@@ -4971,6 +4973,7 @@ public:
                   ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
       data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+      lock_duration(cct->_conf->rgw_sync_lease_period),
       status_obj(env->svc->zone->get_zone_params().log_pool,
                  RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
                                                                  sync_pair.source_bs.bucket,
@@ -5012,9 +5015,21 @@ int RGWSyncBucketCR::operate()
     sync_pipe.info = sync_pair;
 
     // read bucket sync status
-    using CR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
-    yield call(new CR(env->async_rados, env->svc->sysobj,
-                      status_obj, &bucket_status, true, &objv));
+    using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+    yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+                          status_obj, &bucket_status, false, &objv));
+    if (retcode == -ENOENT) {
+      // use exclusive create to set state=Init
+      objv.generate_new_write_ver(cct);
+      using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+      yield call(new WriteCR(env->async_rados, env->svc->sysobj,
+                             status_obj, bucket_status, &objv, true));
+      if (retcode == -EEXIST) {
+        // raced with another create, read its status
+        yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+                              status_obj, &bucket_status, false, &objv));
+      }
+    }
     if (retcode < 0) {
       return set_cr_error(retcode);
     }
@@ -5022,8 +5037,37 @@ int RGWSyncBucketCR::operate()
     do {
       tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
 
+      // if the state wasn't Incremental, take a bucket-wide lease to prevent
+      // different shards from duplicating the init and full sync
+      if (bucket_status.state != BucketSyncState::Incremental) {
+        assert(!bucket_lease_cr);
+        bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+                                                       lock_name, lock_duration, this));
+        yield spawn(bucket_lease_cr.get(), false);
+        while (!bucket_lease_cr->is_locked()) {
+          if (bucket_lease_cr->is_done()) {
+            tn->log(5, "failed to take bucket lease");
+            set_status("lease lock failed, early abort");
+            drain_all();
+            return set_cr_error(bucket_lease_cr->get_ret_status());
+          }
+          tn->log(5, "waiting on bucket lease");
+          yield set_sleeping(true);
+        }
+        // reread the status after acquiring the lock
+        yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+                              status_obj, &bucket_status, false, &objv));
+        if (retcode < 0) {
+          bucket_lease_cr->go_down();
+          drain_all();
+          bucket_lease_cr.reset();
+          return set_cr_error(retcode);
+        }
+      }
+
       if (bucket_status.state == BucketSyncState::Init ||
           bucket_status.state == BucketSyncState::Stopped) {
+        assert(bucket_lease_cr);
         // init sync status
         yield {
           init_check_compat = objv.read_version.ver <= 1; // newly-created
@@ -5033,43 +5077,33 @@ int RGWSyncBucketCR::operate()
                                               init_check_compat));
         }
         if (retcode < 0) {
+          bucket_lease_cr->go_down();
+          drain_all();
+          bucket_lease_cr.reset();
           return set_cr_error(retcode);
         }
       }
 
       if (bucket_status.state == BucketSyncState::Full) {
-        // take a bucket-wide lease for full sync to prevent the bucket shards
-        // from duplicating the work
-        yield {
-          const std::string lock_name = "full sync";
-          const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
-          bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
-                                                         lock_name, lock_duration, this));
-          spawn(bucket_lease_cr.get(), false);
-        }
-        while (!bucket_lease_cr->is_locked()) {
-          if (bucket_lease_cr->is_done()) {
-            tn->log(5, "failed to take bucket lease");
-            set_status("lease lock failed, early abort");
-            drain_all();
-            return set_cr_error(bucket_lease_cr->get_ret_status());
-          }
-          tn->log(5, "waiting on bucket lease");
-          yield set_sleeping(true);
-        }
-
+        assert(bucket_lease_cr);
         yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
                                            bucket_lease_cr, bucket_status,
                                            tn, objv));
-        bucket_lease_cr->go_down();
-        drain_all();
-        bucket_lease_cr.reset();
         if (retcode < 0) {
+          bucket_lease_cr->go_down();
+          drain_all();
+          bucket_lease_cr.reset();
           return set_cr_error(retcode);
         }
       }
 
       if (bucket_status.state == BucketSyncState::Incremental) {
+        // lease not required for incremental sync
+        if (bucket_lease_cr) {
+          bucket_lease_cr->go_down();
+          drain_all();
+          bucket_lease_cr.reset();
+        }
         yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
                                             sync_pipe, bucket_status.state,
                                             tn, progress));