]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: remove per-bucket-shard sync leases
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:23:27 +0000 (09:23 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 18:08:19 +0000 (14:08 -0400)
bucket sync now gets a const pointer to the DataSyncShard's lease to
check whether the lease has expired

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_data_sync.cc

index 918395f65c860ab5074f9f98ee5dcfb6370d0a8d..a8ecbcecf28a04cf01cd85835cfaaba24304b9d3 100644 (file)
@@ -990,6 +990,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
 class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   rgw_bucket_sync_pair_info sync_pair;
   rgw_bucket_sync_pipe sync_pipe;
   rgw_bucket_shard_sync_info sync_status;
@@ -999,26 +1000,20 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
 
   const std::string status_oid;
 
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
   RGWSyncTraceNodeRef tn;
 
 public:
-  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair,
+  RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc,
+                            boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+                            const rgw_bucket_sync_pair_info& _sync_pair,
                             const RGWSyncTraceNodeRef& _tn_parent,
                             ceph::real_time* progress)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
-      sync_pair(_sync_pair), progress(progress),
+      lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
       status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
                                          SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
   }
-  ~RGWRunBucketSyncCoroutine() override {
-    if (lease_cr) {
-      lease_cr->abort();
-    }
-  }
 
   int operate() override;
 };
@@ -1208,6 +1203,7 @@ public:
 class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
 
   std::optional<rgw_bucket_shard> target_bs;
   std::optional<rgw_bucket_shard> source_bs;
@@ -1220,9 +1216,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
   rgw_bucket_sync_pair_info sync_pair;
 
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
-  boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
   RGWSyncTraceNodeRef tn;
   ceph::real_time* progress;
   std::vector<ceph::real_time> shard_progress;
@@ -1242,15 +1235,11 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
 
 public:
   RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+                            boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                             std::optional<rgw_bucket_shard> _target_bs,
                             std::optional<rgw_bucket_shard> _source_bs,
                             const RGWSyncTraceNodeRef& _tn_parent,
                             ceph::real_time* progress);
-  ~RGWRunBucketSourcesSyncCR() override {
-    if (lease_cr) {
-      lease_cr->abort();
-    }
-  }
 
   int operate() override;
 };
@@ -1264,6 +1253,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   uint32_t obligation_counter = 0;
   RGWDataSyncShardMarkerTrack *marker_tracker;
   boost::intrusive_ptr<RGWOmapAppend> error_repo;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   RGWSyncTraceNodeRef tn;
 
   ceph::real_time progress;
@@ -1272,10 +1262,13 @@ public:
   RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
                            rgw_data_sync_obligation obligation,
                            RGWDataSyncShardMarkerTrack *_marker_tracker,
-                           RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent)
+                           RGWOmapAppend *_error_repo,
+                           boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+                           const RGWSyncTraceNodeRef& _tn_parent)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       state(std::move(state)), obligation(std::move(obligation)),
-      marker_tracker(_marker_tracker), error_repo(_error_repo) {
+      marker_tracker(_marker_tracker), error_repo(_error_repo),
+      lease_cr(std::move(lease_cr)) {
     set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
     tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
   }
@@ -1310,7 +1303,8 @@ public:
 
           ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
               << ' ' << *state->obligation << dendl;
-          yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */
+          yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
+                                                   std::nullopt, /* target_bs */
                                                    state->key, tn, &progress));
           if (retcode < 0) {
             break;
@@ -1455,7 +1449,8 @@ class RGWDataSyncShardCR : public RGWCoroutine {
     auto state = bucket_shard_cache->get(src);
     auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
     return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
-                                        &*marker_tracker, error_repo, tn);
+                                        &*marker_tracker, error_repo,
+                                        lease_cr.get(), tn);
   }
 public:
   RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
@@ -3683,7 +3678,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   RGWDataSyncEnv *sync_env;
   rgw_bucket_sync_pipe& sync_pipe;
   rgw_bucket_shard& bs;
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   bucket_list_result list_result;
   list<bucket_list_entry>::iterator entries_iter;
   rgw_bucket_shard_sync_info& sync_info;
@@ -3749,13 +3744,13 @@ public:
   RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
                            rgw_bucket_sync_pipe& _sync_pipe,
                            const std::string& status_oid,
-                           RGWContinuousLeaseCR *lease_cr,
+                           boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                            rgw_bucket_shard_sync_info& sync_info,
                            RGWSyncTraceNodeRef tn_parent,
                            RGWObjVersionTracker& objv_tracker)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
-      lease_cr(lease_cr), sync_info(sync_info),
+      lease_cr(std::move(lease_cr)), sync_info(sync_info),
       status_oid(status_oid),
       tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
                                          SSTR(bucket_shard_str{bs}))),
@@ -3776,7 +3771,7 @@ int RGWBucketShardFullSyncCR::operate()
 
     total_entries = sync_info.full_marker.count;
     do {
-      if (!lease_cr->is_locked()) {
+      if (lease_cr && !lease_cr->is_locked()) {
         drain_all();
         return set_cr_error(-ECANCELED);
       }
@@ -3801,7 +3796,7 @@ int RGWBucketShardFullSyncCR::operate()
       }
       entries_iter = list_result.entries.begin();
       for (; entries_iter != list_result.entries.end(); ++entries_iter) {
-        if (!lease_cr->is_locked()) {
+        if (lease_cr && !lease_cr->is_locked()) {
           drain_all();
           return set_cr_error(-ECANCELED);
         }
@@ -3855,7 +3850,7 @@ int RGWBucketShardFullSyncCR::operate()
       }
     }
     tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
-    if (!lease_cr->is_locked()) {
+    if (lease_cr && !lease_cr->is_locked()) {
       return set_cr_error(-ECANCELED);
     }
     /* update sync state to incremental */
@@ -3894,7 +3889,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   rgw_bucket_sync_pipe& sync_pipe;
   RGWBucketSyncFlowManager::pipe_rules_ref rules;
   rgw_bucket_shard& bs;
-  boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
   list<rgw_bi_log_entry> list_result;
   list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
   map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
@@ -3917,14 +3912,14 @@ public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
                                   rgw_bucket_sync_pipe& _sync_pipe,
                                   const std::string& status_oid,
-                                  RGWContinuousLeaseCR *lease_cr,
+                                  boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                                   rgw_bucket_shard_sync_info& sync_info,
                                   RGWSyncTraceNodeRef& _tn_parent,
                                   RGWObjVersionTracker& objv_tracker,
                                   ceph::real_time* stable_timestamp)
     : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
       sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
-      lease_cr(lease_cr), sync_info(sync_info),
+      lease_cr(std::move(lease_cr)), sync_info(sync_info),
       zone_id(sync_env->svc->zone->get_zone().id),
       tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
                                          SSTR(bucket_shard_str{bs}))),
@@ -3957,7 +3952,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
   int ret;
   reenter(this) {
     do {
-      if (!lease_cr->is_locked()) {
+      if (lease_cr && !lease_cr->is_locked()) {
         drain_all();
         tn->log(0, "ERROR: lease is not taken, abort");
         return set_cr_error(-ECANCELED);
@@ -4006,7 +4001,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
 
       entries_iter = list_result.begin();
       for (; entries_iter != entries_end; ++entries_iter) {
-        if (!lease_cr->is_locked()) {
+        if (lease_cr && !lease_cr->is_locked()) {
           drain_all();
           return set_cr_error(-ECANCELED);
         }
@@ -4326,12 +4321,13 @@ std::ostream& operator<<(std::ostream& out, std::optional<rgw_bucket_shard>& bs)
 }
 
 RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+                                                     boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
                                                      std::optional<rgw_bucket_shard> _target_bs,
                                                      std::optional<rgw_bucket_shard> _source_bs,
                                                      const RGWSyncTraceNodeRef& _tn_parent,
                                                      ceph::real_time* progress)
   : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
-    target_bs(_target_bs), source_bs(_source_bs),
+    lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
     tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
                                        SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
     progress(progress)
@@ -4402,7 +4398,7 @@ int RGWRunBucketSourcesSyncCR::operate()
 
         ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
 
-        yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn,
+        yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
                                                   &*cur_shard_progress), false);
         while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
           set_status() << "num_spawned() > spawn_window";
@@ -4728,32 +4724,9 @@ int RGWRunBucketsSyncBySourceCR::operate()
 int RGWRunBucketSyncCoroutine::operate()
 {
   reenter(this) {
-    yield {
-      set_status("acquiring sync lock");
-      auto store = sync_env->store;
-      lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                              rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
-                                              "sync_lock",
-                                              cct->_conf->rgw_sync_lease_period,
-                                              this));
-      lease_stack.reset(spawn(lease_cr.get(), false));
-    }
-    while (!lease_cr->is_locked()) {
-      if (lease_cr->is_done()) {
-        tn->log(5, "failed to take lease");
-        set_status("lease lock failed, early abort");
-        drain_all();
-        return set_cr_error(lease_cr->get_ret_status());
-      }
-      set_sleeping(true);
-      yield;
-    }
-
-    tn->log(10, "took lease");
     yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
-      lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
@@ -4764,7 +4737,6 @@ int RGWRunBucketSyncCoroutine::operate()
                                           &sync_pipe.source_bucket_attrs, tn));
     if (retcode < 0) {
       tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
-      lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
@@ -4773,7 +4745,6 @@ int RGWRunBucketSyncCoroutine::operate()
                                           &sync_pipe.dest_bucket_attrs, tn));
     if (retcode < 0) {
       tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
-      lease_cr->go_down();
       drain_all();
       return set_cr_error(retcode);
     }
@@ -4786,15 +4757,11 @@ int RGWRunBucketSyncCoroutine::operate()
         yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
         if (retcode == -ENOENT) {
           tn->log(0, "bucket sync disabled");
-          lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
-          lease_cr->wakeup();
-          lease_cr.reset();
           drain_all();
           return set_cr_done();
         }
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
-          lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
@@ -4805,11 +4772,10 @@ int RGWRunBucketSyncCoroutine::operate()
 
       if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
         yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
-                                                status_oid, lease_cr.get(),
+                                                status_oid, lease_cr,
                                                 sync_status, tn, objv_tracker));
         if (retcode < 0) {
           tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
-          lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
@@ -4817,12 +4783,11 @@ int RGWRunBucketSyncCoroutine::operate()
 
       if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
         yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
-                                                       status_oid, lease_cr.get(),
+                                                       status_oid, lease_cr,
                                                        sync_status, tn,
                                                        objv_tracker, progress));
         if (retcode < 0) {
           tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
-          lease_cr->go_down();
           drain_all();
           return set_cr_error(retcode);
         }
@@ -4830,7 +4795,6 @@ int RGWRunBucketSyncCoroutine::operate()
       // loop back to previous states unless incremental sync returns normally
     } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
 
-    lease_cr->go_down();
     drain_all();
     return set_cr_done();
   }
@@ -4844,7 +4808,7 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num)
     return nullptr;
   }
 
-  return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
+  return new RGWRunBucketSyncCoroutine(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
 }
 
 int RGWBucketPipeSyncStatusManager::init()