]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: Pull lock out of RGWInitDataSyncStatusCoroutine
authorAdam C. Emerson <aemerson@redhat.com>
Thu, 11 Aug 2022 21:54:58 +0000 (17:54 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 11 Jan 2023 06:32:21 +0000 (01:32 -0500)
RGWDataSyncCR manages the lock instead, holding it through StateInit
and StateBuildingFullSyncMaps but releasing it by StateSync.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_data_sync.cc

index cb4d227a39d5a2a273d3980564a763d459033e97..d11838495bf86c4aed665cac0cac0e937d944678 100644 (file)
@@ -521,57 +521,52 @@ bool RGWListRemoteDataLogCR::spawn_next() {
 }
 
 class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
-  static constexpr uint32_t lock_duration = 30;
-  RGWDataSyncCtx *sc;
-  RGWDataSyncEnv *sync_env;
-  rgw::sal::RadosStore* driver; // RGWDataSyncEnv also has a pointer to driver
-  const rgw_pool& pool;
+  static constexpr auto lock_name{ "sync_lock"sv };
+  RGWDataSyncCtx* const sc;
+  RGWDataSyncEnv* const sync_env{ sc->env };
   const uint32_t num_shards;
+  rgw_data_sync_status* const status;
+  RGWSyncTraceNodeRef tn;
+  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
 
-  string sync_status_oid;
+  const rgw_pool& pool{ sync_env->svc->zone->get_zone_params().log_pool };
+  const string sync_status_oid{
+    RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
 
-  string lock_name;
-  string cookie;
-  rgw_data_sync_status *status;
   std::vector<RGWObjVersionTracker>& objvs;
   map<int, RGWDataChangesLogInfo> shards_info;
 
-  RGWSyncTraceNodeRef tn;
-public:
-  RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
-                                 uint64_t instance_id,
-                                 RGWSyncTraceNodeRef& _tn_parent,
-                                 rgw_data_sync_status *status,
-                                 std::vector<RGWObjVersionTracker>& objvs)
-    : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver),
-      pool(sync_env->svc->zone->get_zone_params().log_pool),
-      num_shards(num_shards), status(status), objvs(objvs),
-      tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
-    lock_name = "sync_lock";
 
+public:
+  RGWInitDataSyncStatusCoroutine(
+    RGWDataSyncCtx* _sc, uint32_t num_shards, uint64_t instance_id,
+    const RGWSyncTraceNodeRef& tn_parent, rgw_data_sync_status* status,
+    boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
+    std::vector<RGWObjVersionTracker>& objvs)
+    : RGWCoroutine(_sc->cct), sc(_sc), num_shards(num_shards), status(status),
+      tn(sync_env->sync_tracer->add_node(tn_parent, "init_data_sync_status")),
+      lease_cr(std::move(lease_cr)), objvs(objvs) {
     status->sync_info.instance_id = instance_id;
+  }
 
-#define COOKIE_LEN 16
-    char buf[COOKIE_LEN + 1];
-
-    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
-    cookie = buf;
-
-    sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sc->source_zone);
-
+  static auto continuous_lease_cr(RGWDataSyncCtx* const sc,
+                                 RGWCoroutine* const caller) {
+    auto lock_duration = sc->cct->_conf->rgw_sync_lease_period;
+    return new RGWContinuousLeaseCR(
+      sc->env->async_rados, sc->env->driver,
+      { sc->env->svc->zone->get_zone_params().log_pool,
+       RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
+      string(lock_name), lock_duration, caller);
   }
 
   int operate(const DoutPrefixProvider *dpp) override {
     int ret;
     reenter(this) {
-      using LockCR = RGWSimpleRadosLockCR;
-      yield call(new LockCR(sync_env->async_rados, driver,
-                            rgw_raw_obj{pool, sync_status_oid},
-                            lock_name, cookie, lock_duration));
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
-        return set_cr_error(retcode);
+      if (!lease_cr->is_locked()) {
+       drain_all();
+       return set_cr_error(-ECANCELED);
       }
+
       using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
       yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
                                  rgw_raw_obj{pool, sync_status_oid},
@@ -581,16 +576,11 @@ public:
         return set_cr_error(retcode);
       }
 
-      /* take lock again, we just recreated the object */
-      yield call(new LockCR(sync_env->async_rados, driver,
-                            rgw_raw_obj{pool, sync_status_oid},
-                            lock_name, cookie, lock_duration));
-      if (retcode < 0) {
-        tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
-        return set_cr_error(retcode);
-      }
-
-      tn->log(10, "took lease");
+      // In the original code we reacquired the lock. Since
+      // RGWSimpleRadosWriteCR doesn't appear to touch the attributes
+      // and cls_version works across it, this should be unnecessary.
+      // Putting a note here just in case. If we see ECANCELED where
+      // we expect EBUSY, we can revisit this.
 
       /* fetch current position in logs */
       yield {
@@ -641,9 +631,6 @@ public:
         tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
         return set_cr_error(retcode);
       }
-      yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, driver,
-                                            rgw_raw_obj{pool, sync_status_oid},
-                                            lock_name, cookie));
       return set_cr_done();
     }
     return 0;
@@ -781,6 +768,65 @@ int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, cons
   return ret;
 }
 
+namespace RGWRDL {
+class DataSyncInitCR : public RGWCoroutine {
+  RGWDataSyncCtx* const sc;
+  const uint32_t num_shards;
+  uint64_t instance_id;
+  const RGWSyncTraceNodeRef& tn;
+  rgw_data_sync_status* const sync_status;
+  std::vector<RGWObjVersionTracker>& objvs;
+
+  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+
+public:
+
+  DataSyncInitCR(RGWDataSyncCtx* sc, uint32_t num_shards, uint64_t instance_id,
+                const RGWSyncTraceNodeRef& tn,
+                rgw_data_sync_status* sync_status,
+                std::vector<RGWObjVersionTracker>& objvs)
+    : RGWCoroutine(sc->cct), sc(sc), num_shards(num_shards),
+      instance_id(instance_id), tn(tn),
+      sync_status(sync_status), objvs(objvs) {}
+
+  ~DataSyncInitCR() override {
+    if (lease_cr) {
+      lease_cr->abort();
+    }
+  }
+
+  int operate(const DoutPrefixProvider *dpp) override {
+    reenter(this) {
+      lease_cr.reset(
+       RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
+
+      yield spawn(lease_cr.get(), false);
+      while (!lease_cr->is_locked()) {
+       if (lease_cr->is_done()) {
+         tn->log(5, "ERROR: failed to take data sync status lease");
+         set_status("lease lock failed, early abort");
+         drain_all();
+         return set_cr_error(lease_cr->get_ret_status());
+       }
+       tn->log(5, "waiting on data sync status lease");
+       yield set_sleeping(true);
+      }
+      tn->log(5, "acquired data sync status lease");
+      yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
+                                                   tn, sync_status, lease_cr, objvs));
+      lease_cr->go_down();
+      lease_cr.reset();
+      drain_all();
+      if (retcode < 0) {
+       set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+}
+
 int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
 {
   rgw_data_sync_status sync_status;
@@ -799,7 +845,8 @@ int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_sh
   auto instance_id = ceph::util::generate_random_number<uint64_t>();
   RGWDataSyncCtx sc_local = sc;
   sc_local.env = &sync_env_local;
-  ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs));
+  ret = crs.run(dpp, new RGWRDL::DataSyncInitCR(&sc_local, num_shards,
+                                               instance_id, tn, &sync_status, objvs));
   http_manager.stop();
   return ret;
 }
@@ -2192,6 +2239,10 @@ class RGWDataSyncCR : public RGWCoroutine {
 
   RGWDataSyncModule *data_sync_module{nullptr};
   RGWObjVersionTracker objv;
+
+  boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
+  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+
 public:
   RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
                                                       sc(_sc), sync_env(_sc->env),
@@ -2204,6 +2255,9 @@ public:
     for (auto iter : shard_crs) {
       iter.second->put();
     }
+    if (init_lease) {
+      init_lease->abort();
+    }
   }
 
   int operate(const DoutPrefixProvider *dpp) override {
@@ -2219,15 +2273,37 @@ public:
         return set_cr_error(retcode);
       }
 
+      if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state !=
+         rgw_data_sync_info::StateSync) {
+       init_lease.reset(
+         RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
+       yield lease_stack.reset(spawn(init_lease.get(), false));
+
+       while (!init_lease->is_locked()) {
+         if (init_lease->is_done()) {
+           tn->log(5, "ERROR: failed to take data sync status lease");
+           set_status("lease lock failed, early abort");
+           drain_all();
+           return set_cr_error(init_lease->get_ret_status());
+         }
+         tn->log(5, "waiting on data sync status lease");
+         yield set_sleeping(true);
+       }
+       tn->log(5, "acquired data sync status lease");
+      }
+
       /* state: init status */
       if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
         tn->log(20, SSTR("init"));
         sync_status.sync_info.num_shards = num_shards;
         uint64_t instance_id;
         instance_id = ceph::util::generate_random_number<uint64_t>();
-        yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs));
+        yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn,
+                                                      &sync_status, init_lease, objvs));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
+         init_lease->go_down();
+         drain_all();
           return set_cr_error(retcode);
         }
         // sets state = StateBuildingFullSyncMaps
@@ -2246,6 +2322,12 @@ public:
           tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
           return set_cr_error(retcode);
         }
+
+        if (!init_lease->is_locked()) {
+          init_lease->go_down();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
         /* state: building full sync maps */
         yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
         if (retcode < 0) {
@@ -2254,6 +2336,11 @@ public:
         }
         sync_status.sync_info.state = rgw_data_sync_info::StateSync;
 
+        if (!init_lease->is_locked()) {
+          init_lease->go_down();
+          drain_all();
+          return set_cr_error(-ECANCELED);
+        }
         /* update new state */
         yield call(set_sync_info_cr());
         if (retcode < 0) {
@@ -2269,9 +2356,15 @@ public:
         tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
         return set_cr_error(retcode);
       }
-      
-      yield {
-        if  ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+
+      if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+        if (init_lease) {
+          init_lease->go_down();
+          drain_all();
+          init_lease.reset();
+          lease_stack.reset();
+        }
+        yield {
           tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
           for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
                iter != sync_status.sync_markers.end(); ++iter) {