}
int operate(const DoutPrefixProvider *dpp) override {
- int ret;
+ int ret = 0;
reenter(this) {
if (!lease_cr->is_locked()) {
drain_all();
sync_status(sync_status), tn(std::move(tn)), objv_tracker(objv_tracker)
{}
-
RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_status.full.position = new_marker;
sync_status.full.count = index_pos;
-
tn->log(20, SSTR("updating marker oid=" << status_obj.oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteCR<rgw_bucket_sync_status>(
sync_env->dpp, sync_env->driver,
- status_obj, sync_status, &objv_tracker);
+ status_obj, sync_status, &objv_tracker);
}
RGWOrderCallCR *allocate_order_control_cr() override {
int RGWBucketShardIncrementalSyncCR::operate(const DoutPrefixProvider *dpp)
{
- int ret;
+ int ret = 0;
reenter(this) {
do {
if (lease_cr && !lease_cr->is_locked()) {
const rgw_bucket_sync_pair_info& sync_pair,
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
- ceph::real_time* progress);
+ ceph::real_time* progress,
+ bool no_lease = false);
RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
ldpp_dout(dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
yield_spawn_window(sync_bucket_shard_cr(sc, lease_cr, sync_pair,
- gen, tn, &*cur_shard_progress),
+ gen, tn, &*cur_shard_progress,
+ false),
sc->lcc.adj_concurrency(cct->_conf->rgw_bucket_sync_spawn_window),
[&](uint64_t stack_id, int ret) {
if (ret < 0) {
bool bucket_stopped = false;
RGWObjVersionTracker objv;
bool init_check_compat = false;
+ bool no_lease{false};
rgw_bucket_index_marker_info info;
rgw_raw_obj error_repo;
rgw_bucket_shard source_bs;
const rgw_bucket_sync_pair_info& _sync_pair,
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& _tn_parent,
- ceph::real_time* progress)
+ ceph::real_time* progress,
+ bool no_lease = false)
: RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair),
gen(gen), progress(progress),
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
sync_pair.dest_bucket)),
+ no_lease(no_lease),
tn(env->sync_tracer->add_node(_tn_parent, "bucket",
SSTR(bucket_str{_sync_pair.dest_bucket} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
}
const rgw_bucket_sync_pair_info& sync_pair,
std::optional<uint64_t> gen,
const RGWSyncTraceNodeRef& tn,
- ceph::real_time* progress)
+ ceph::real_time* progress,
+ bool no_lease)
{
return new RGWSyncBucketCR(sc, std::move(lease), sync_pair,
- gen, tn, progress);
+ gen, tn, progress, no_lease);
}
#define RELEASE_LOCK(cr) \
if (bucket_status.state != BucketSyncState::Incremental ||
bucket_stopped) {
- if (!bucket_lease_cr) {
+ if (!no_lease && !bucket_lease_cr) {
bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
lock_name, lock_duration, this, &sc->lcc));
yield spawn(bucket_lease_cr.get(), false);
if (bucket_status.state != BucketSyncState::Incremental) {
// if the state wasn't Incremental, take a bucket-wide lease to prevent
- // different shards from duplicating the init and full sync
- if (!bucket_lease_cr) {
+ // different shards from duplicating the init and full sync.
+ // when no_lease is true (e.g. 'bucket sync run'), skip lease acquisition
+ // so the command is never blocked by a background sync process holding the lock.
+ if (!no_lease && !bucket_lease_cr) {
bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->driver, status_obj,
lock_name, lock_duration, this, &sc->lcc));
yield spawn(bucket_lease_cr.get(), false);
bucket_status.state == BucketSyncState::Full);
if (bucket_status.state == BucketSyncState::Full) {
- assert(bucket_lease_cr);
yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
bucket_lease_cr, bucket_status,
- tn, objv));
+ tn, objv, no_lease));
if (retcode < 0) {
tn->log(20, SSTR("ERROR: full sync failed. error: " << retcode));
RELEASE_LOCK(bucket_lease_cr);
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
- // Since all errors (except ECANCELED) are considered retryable,
- // retry other errors so long as we're making progress.
+ // all errors are retryable so long as we're making progress.
for (retries = 0u, retcode = -EDOM;
(retries < allowed_retries) && (retcode != 0);
++retries) {
<< dendl;
yield call(sync_bucket_shard_cr(&sc, nullptr, pair, gen,
sc.env->sync_tracer->root_node,
- &progress));
+ &progress,
+ true /* no_lease: bucket sync run skips
+ lock acquisition so it is never
+ blocked by a background sync process*/));
- if (retcode == -ECANCELED) {
- ldpp_dout(dpp, -1) << "ERROR: Got -ECANCELED for "
- << pair.source_bs << dendl;
- drain_all();
- return set_cr_error(retcode);
- } else if (retcode < 0) {
- ldpp_dout(dpp, 5) << "WARNING: Got error, retcode=" << retcode << " for "
- << pair.source_bs << "on retry "
+ if (retcode < 0) {
+ ldpp_dout(dpp, 5) << "WARNING: Got retcode=" << retcode << " for "
+ << pair.source_bs << " on retry "
<< retries + 1 << " of " << allowed_retries
<< " allowed" << dendl;
- // Reset the retry counter if we made any progress
- if (progress != prev_progress) {
+ if (retcode == -ECANCELED) {
+ // -ECANCELED means a background sync process updated
+ // the sync status. re-read the status and resume from the new marker.
+ retries = 0;
+ } else if (progress != prev_progress) {
+ // Reset the retry counter if we made any progress
retries = 0;
}
prev_progress = progress;
}
}
+
if (retcode < 0) {
ldpp_dout(dpp, -1) << "ERROR: Exhausted retries for "
<< pair.source_bs << " retcode="