class RGWRunBucketSyncCoroutine : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
rgw_bucket_sync_pair_info sync_pair;
rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_shard_sync_info sync_status;
const std::string status_oid;
- boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
- boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
RGWSyncTraceNodeRef tn;
public:
- RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair,
+ RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+ const rgw_bucket_sync_pair_info& _sync_pair,
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
- sync_pair(_sync_pair), progress(progress),
+ lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
}
- ~RGWRunBucketSyncCoroutine() override {
- if (lease_cr) {
- lease_cr->abort();
- }
- }
int operate() override;
};
class RGWRunBucketSourcesSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
std::optional<rgw_bucket_shard> target_bs;
std::optional<rgw_bucket_shard> source_bs;
rgw_bucket_sync_pair_info sync_pair;
- boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
- boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
-
RGWSyncTraceNodeRef tn;
ceph::real_time* progress;
std::vector<ceph::real_time> shard_progress;
public:
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress);
- ~RGWRunBucketSourcesSyncCR() override {
- if (lease_cr) {
- lease_cr->abort();
- }
- }
int operate() override;
};
uint32_t obligation_counter = 0;
RGWDataSyncShardMarkerTrack *marker_tracker;
boost::intrusive_ptr<RGWOmapAppend> error_repo;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
RGWSyncTraceNodeRef tn;
ceph::real_time progress;
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, rgw::bucket_sync::Handle state,
rgw_data_sync_obligation obligation,
RGWDataSyncShardMarkerTrack *_marker_tracker,
- RGWOmapAppend *_error_repo, const RGWSyncTraceNodeRef& _tn_parent)
+ RGWOmapAppend *_error_repo,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
+ const RGWSyncTraceNodeRef& _tn_parent)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
state(std::move(state)), obligation(std::move(obligation)),
- marker_tracker(_marker_tracker), error_repo(_error_repo) {
+ marker_tracker(_marker_tracker), error_repo(_error_repo),
+ lease_cr(std::move(lease_cr)) {
set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
}
ldout(cct, 4) << "starting sync on " << bucket_shard_str{state->key}
<< ' ' << *state->obligation << dendl;
- yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */
+ yield call(new RGWRunBucketSourcesSyncCR(sc, lease_cr,
+ std::nullopt, /* target_bs */
state->key, tn, &progress));
if (retcode < 0) {
break;
auto state = bucket_shard_cache->get(src);
auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
- &*marker_tracker, error_repo, tn);
+ &*marker_tracker, error_repo,
+ lease_cr.get(), tn);
}
public:
RGWDataSyncShardCR(RGWDataSyncCtx *_sc, rgw_pool& _pool,
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
rgw_bucket_shard& bs;
- boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
rgw_bucket_shard_sync_info& sync_info;
RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
const std::string& status_oid,
- RGWContinuousLeaseCR *lease_cr,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef tn_parent,
RGWObjVersionTracker& objv_tracker)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
- lease_cr(lease_cr), sync_info(sync_info),
+ lease_cr(std::move(lease_cr)), sync_info(sync_info),
status_oid(status_oid),
tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
SSTR(bucket_shard_str{bs}))),
total_entries = sync_info.full_marker.count;
do {
- if (!lease_cr->is_locked()) {
+ if (lease_cr && !lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
}
}
entries_iter = list_result.entries.begin();
for (; entries_iter != list_result.entries.end(); ++entries_iter) {
- if (!lease_cr->is_locked()) {
+ if (lease_cr && !lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
}
}
}
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
- if (!lease_cr->is_locked()) {
+ if (lease_cr && !lease_cr->is_locked()) {
return set_cr_error(-ECANCELED);
}
/* update sync state to incremental */
rgw_bucket_sync_pipe& sync_pipe;
RGWBucketSyncFlowManager::pipe_rules_ref rules;
rgw_bucket_shard& bs;
- boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
list<rgw_bi_log_entry> list_result;
list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
const std::string& status_oid,
- RGWContinuousLeaseCR *lease_cr,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent,
RGWObjVersionTracker& objv_tracker,
ceph::real_time* stable_timestamp)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
- lease_cr(lease_cr), sync_info(sync_info),
+ lease_cr(std::move(lease_cr)), sync_info(sync_info),
zone_id(sync_env->svc->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
SSTR(bucket_shard_str{bs}))),
int ret;
reenter(this) {
do {
- if (!lease_cr->is_locked()) {
+ if (lease_cr && !lease_cr->is_locked()) {
drain_all();
tn->log(0, "ERROR: lease is not taken, abort");
return set_cr_error(-ECANCELED);
entries_iter = list_result.begin();
for (; entries_iter != entries_end; ++entries_iter) {
- if (!lease_cr->is_locked()) {
+ if (lease_cr && !lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
}
}
RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
+ boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
const RGWSyncTraceNodeRef& _tn_parent,
ceph::real_time* progress)
: RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
- target_bs(_target_bs), source_bs(_source_bs),
+ lease_cr(std::move(lease_cr)), target_bs(_target_bs), source_bs(_source_bs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
progress(progress)
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
- yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn,
+ yield spawn(new RGWRunBucketSyncCoroutine(sc, lease_cr, sync_pair, tn,
&*cur_shard_progress), false);
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
set_status() << "num_spawned() > spawn_window";
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
- yield {
- set_status("acquiring sync lock");
- auto store = sync_env->store;
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, status_oid),
- "sync_lock",
- cct->_conf->rgw_sync_lease_period,
- this));
- lease_stack.reset(spawn(lease_cr.get(), false));
- }
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- tn->log(5, "failed to take lease");
- set_status("lease lock failed, early abort");
- drain_all();
- return set_cr_error(lease_cr->get_ret_status());
- }
- set_sleeping(true);
- yield;
- }
-
- tn->log(10, "took lease");
yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
&sync_pipe.source_bucket_attrs, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
&sync_pipe.dest_bucket_attrs, tn));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to retrieve bucket info for bucket=" << bucket_str{sync_pair.source_bs.bucket}));
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
- lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
- lease_cr->wakeup();
- lease_cr.reset();
drain_all();
return set_cr_done();
}
if (retcode < 0) {
tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
- status_oid, lease_cr.get(),
+ status_oid, lease_cr,
sync_status, tn, objv_tracker));
if (retcode < 0) {
tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
- status_oid, lease_cr.get(),
+ status_oid, lease_cr,
sync_status, tn,
objv_tracker, progress));
if (retcode < 0) {
tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
// loop back to previous states unless incremental sync returns normally
} while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
- lease_cr->go_down();
drain_all();
return set_cr_done();
}
return nullptr;
}
- return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
+ return new RGWRunBucketSyncCoroutine(&sc, nullptr, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
}
int RGWBucketPipeSyncStatusManager::init()