From: Casey Bodley Date: Tue, 31 Mar 2020 13:23:27 +0000 (-0400) Subject: rgw: remove per-bucket-shard sync leases X-Git-Tag: v16.1.0~2586^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=52fa773270d1398b72ace58ec89367bb1077d0a2;p=ceph.git rgw: remove per-bucket-shard sync leases bucket sync now gets a const pointer to the DataSyncShard's lease to check whether the lease has expired Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 918395f65c86..a8ecbcecf28a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -990,6 +990,7 @@ std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; + boost::intrusive_ptr lease_cr; rgw_bucket_sync_pair_info sync_pair; rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; @@ -999,26 +1000,20 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { const std::string status_oid; - boost::intrusive_ptr lease_cr; - boost::intrusive_ptr lease_stack; - RGWSyncTraceNodeRef tn; public: - RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, + RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, + boost::intrusive_ptr lease_cr, + const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), - sync_pair(_sync_pair), progress(progress), + lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress), status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { } - ~RGWRunBucketSyncCoroutine() override { - if (lease_cr) { - lease_cr->abort(); - } - } int operate() override; }; @@ -1208,6 +1203,7 @@ public: class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; + boost::intrusive_ptr lease_cr; std::optional target_bs; std::optional source_bs; @@ -1220,9 +1216,6 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { rgw_bucket_sync_pair_info sync_pair; - boost::intrusive_ptr lease_cr; - boost::intrusive_ptr lease_stack; - RGWSyncTraceNodeRef tn; ceph::real_time* progress; std::vector shard_progress; @@ -1242,15 +1235,11 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { public: RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, + boost::intrusive_ptr lease_cr, std::optional _target_bs, std::optional _source_bs, const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress); - ~RGWRunBucketSourcesSyncCR() override { - if (lease_cr) { - lease_cr->abort(); - } - } int operate() override; }; @@ -1264,6 +1253,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { uint32_t obligation_counter = 0; RGWDataSyncShardMarkerTrack *marker_tracker; boost::intrusive_ptr error_repo; + boost::intrusive_ptr lease_cr; RGWSyncTraceNodeRef tn; ceph::real_time progress; @@ -1272,10 +1262,13 @@ public: RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state, rgw_data_sync_obligation obligation, RGWDataSyncShardMarkerTrack *_marker_tracker, - RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent) + RGWOmapAppend *_error_repo, + boost::intrusive_ptr lease_cr, + const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), state(std::move(state)), obligation(std::move(obligation)), - marker_tracker(_marker_tracker), error_repo(_error_repo) { + marker_tracker(_marker_tracker), error_repo(_error_repo), + lease_cr(std::move(lease_cr)) { set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation; tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key); } @@ -1310,7 +1303,8 @@ public: ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key} << ' ' << *state->obligation << dendl; - yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */ + yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr, + std::nullopt, /* target_bs */ state->key, tn, &progress)); if (retcode < 0) { break; @@ -1455,7 +1449,8 @@ class RGWDataSyncShardCR : public RGWCoroutine { auto state = bucket_shard_cache->get(src); auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry}; return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation), - &*marker_tracker, error_repo, tn); + &*marker_tracker, error_repo, + lease_cr.get(), tn); } public: RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool, @@ -3683,7 +3678,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; rgw_bucket_shard& bs; - boost::intrusive_ptr lease_cr; + boost::intrusive_ptr lease_cr; bucket_list_result list_result; list::iterator entries_iter; rgw_bucket_shard_sync_info& sync_info; @@ -3749,13 +3744,13 @@ public: RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, - RGWContinuousLeaseCR *lease_cr, + boost::intrusive_ptr lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef tn_parent, RGWObjVersionTracker& objv_tracker) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), - lease_cr(lease_cr), sync_info(sync_info), + lease_cr(std::move(lease_cr)), sync_info(sync_info), status_oid(status_oid), tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync", SSTR(bucket_shard_str{bs}))), @@ -3776,7 +3771,7 @@ int RGWBucketShardFullSyncCR::operate() total_entries = sync_info.full_marker.count; do { - if (!lease_cr->is_locked()) { + if (lease_cr && !lease_cr->is_locked()) { drain_all(); return set_cr_error(-ECANCELED); } @@ -3801,7 +3796,7 @@ int RGWBucketShardFullSyncCR::operate() } entries_iter = list_result.entries.begin(); for (; entries_iter != list_result.entries.end(); ++entries_iter) { - if (!lease_cr->is_locked()) { + if (lease_cr && !lease_cr->is_locked()) { drain_all(); return set_cr_error(-ECANCELED); } @@ -3855,7 +3850,7 @@ int RGWBucketShardFullSyncCR::operate() } } tn->unset_flag(RGW_SNS_FLAG_ACTIVE); - if (!lease_cr->is_locked()) { + if (lease_cr && !lease_cr->is_locked()) { return set_cr_error(-ECANCELED); } /* update sync state to incremental */ @@ -3894,7 +3889,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { rgw_bucket_sync_pipe& sync_pipe; RGWBucketSyncFlowManager::pipe_rules_ref rules; rgw_bucket_shard& bs; - boost::intrusive_ptr lease_cr; + boost::intrusive_ptr lease_cr; list list_result; list::iterator entries_iter, entries_end; map, pair > squash_map; @@ -3917,14 +3912,14 @@ public: RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, const std::string& status_oid, - RGWContinuousLeaseCR *lease_cr, + boost::intrusive_ptr lease_cr, rgw_bucket_shard_sync_info& sync_info, RGWSyncTraceNodeRef& _tn_parent, RGWObjVersionTracker& objv_tracker, ceph::real_time* stable_timestamp) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), - lease_cr(lease_cr), sync_info(sync_info), + lease_cr(std::move(lease_cr)), sync_info(sync_info), zone_id(sync_env->svc->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", SSTR(bucket_shard_str{bs}))), @@ -3957,7 +3952,7 @@ int RGWBucketShardIncrementalSyncCR::operate() int ret; reenter(this) { do { - if (!lease_cr->is_locked()) { + if (lease_cr && !lease_cr->is_locked()) { drain_all(); tn->log(0, "ERROR: lease is not taken, abort"); return set_cr_error(-ECANCELED); @@ -4006,7 +4001,7 @@ int RGWBucketShardIncrementalSyncCR::operate() entries_iter = list_result.begin(); for (; entries_iter != entries_end; ++entries_iter) { - if (!lease_cr->is_locked()) { + if (lease_cr && !lease_cr->is_locked()) { drain_all(); return set_cr_error(-ECANCELED); } @@ -4326,12 +4321,13 @@ std::ostream& operator<<(std::ostream& out, std::optional& bs) } RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, + boost::intrusive_ptr lease_cr, std::optional _target_bs, std::optional _source_bs, const RGWSyncTraceNodeRef& _tn_parent, ceph::real_time* progress) : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env), - target_bs(_target_bs), source_bs(_source_bs), + lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))), progress(progress) @@ -4402,7 +4398,7 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; - yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn, + yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn, &*cur_shard_progress), false); while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { set_status() << "num_spawned() > spawn_window"; @@ -4728,32 +4724,9 @@ int RGWRunBucketsSyncBySourceCR::operate() int RGWRunBucketSyncCoroutine::operate() { reenter(this) { - yield { - set_status("acquiring sync lock"); - auto store = sync_env->store; - lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid), - "sync_lock", - cct->_conf->rgw_sync_lease_period, - this)); - lease_stack.reset(spawn(lease_cr.get(), false)); - } - while (!lease_cr->is_locked()) { - if (lease_cr->is_done()) { - tn->log(5, "failed to take lease"); - set_status("lease lock failed, early abort"); - drain_all(); - return set_cr_error(lease_cr->get_ret_status()); - } - set_sleeping(true); - yield; - } - - tn->log(10, "took lease"); yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4764,7 +4737,6 @@ int RGWRunBucketSyncCoroutine::operate() &sync_pipe.source_bucket_attrs, tn)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4773,7 +4745,6 @@ int RGWRunBucketSyncCoroutine::operate() &sync_pipe.dest_bucket_attrs, tn)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket})); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4786,15 +4757,11 @@ int RGWRunBucketSyncCoroutine::operate() yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker)); if (retcode == -ENOENT) { tn->log(0, "bucket sync disabled"); - lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock - lease_cr->wakeup(); - lease_cr.reset(); drain_all(); return set_cr_done(); } if (retcode < 0) { tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode)); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4805,11 +4772,10 @@ int RGWRunBucketSyncCoroutine::operate() if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe, - status_oid, lease_cr.get(), + status_oid, lease_cr, sync_status, tn, objv_tracker)); if (retcode < 0) { tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode)); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4817,12 +4783,11 @@ int RGWRunBucketSyncCoroutine::operate() if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, - status_oid, lease_cr.get(), + status_oid, lease_cr, sync_status, tn, objv_tracker, progress)); if (retcode < 0) { tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); - lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } @@ -4830,7 +4795,6 @@ int RGWRunBucketSyncCoroutine::operate() // loop back to previous states unless incremental sync returns normally } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync); - lease_cr->go_down(); drain_all(); return set_cr_done(); } @@ -4844,7 +4808,7 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num) return nullptr; } - return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr); + return new RGWRunBucketSyncCoroutine(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr); } int RGWBucketPipeSyncStatusManager::init()