rgw_bucket_sync_pipe sync_pipe;
ceph::real_time* progress;
+ const std::string lock_name = "bucket sync";
+ const uint32_t lock_duration;
const rgw_raw_obj status_obj;
rgw_bucket_sync_status bucket_status;
RGWObjVersionTracker objv;
ceph::real_time* progress)
: RGWCoroutine(_sc->cct), sc(_sc), env(_sc->env),
data_lease_cr(std::move(lease_cr)), sync_pair(_sync_pair), progress(progress),
+ lock_duration(cct->_conf->rgw_sync_lease_period),
status_obj(env->svc->zone->get_zone_params().log_pool,
RGWBucketPipeSyncStatusManager::full_status_oid(sc->source_zone,
sync_pair.source_bs.bucket,
sync_pipe.info = sync_pair;
// read bucket sync status
- using CR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
- yield call(new CR(env->async_rados, env->svc->sysobj,
- status_obj, &bucket_status, true, &objv));
+ using ReadCR = RGWSimpleRadosReadCR<rgw_bucket_sync_status>;
+ yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+ status_obj, &bucket_status, false, &objv));
+ if (retcode == -ENOENT) {
+ // use exclusive create to set state=Init
+ objv.generate_new_write_ver(cct);
+ using WriteCR = RGWSimpleRadosWriteCR<rgw_bucket_sync_status>;
+ yield call(new WriteCR(env->async_rados, env->svc->sysobj,
+ status_obj, bucket_status, &objv, true));
+ if (retcode == -EEXIST) {
+ // raced with another create, read its status
+ yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+ status_obj, &bucket_status, false, &objv));
+ }
+ }
if (retcode < 0) {
return set_cr_error(retcode);
}
do {
tn->log(20, SSTR("sync status for source bucket: " << bucket_status.state));
+ // if the state wasn't Incremental, take a bucket-wide lease to prevent
+ // different shards from duplicating the init and full sync
+ if (bucket_status.state != BucketSyncState::Incremental) {
+ assert(!bucket_lease_cr);
+ bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
+ lock_name, lock_duration, this));
+ yield spawn(bucket_lease_cr.get(), false);
+ while (!bucket_lease_cr->is_locked()) {
+ if (bucket_lease_cr->is_done()) {
+ tn->log(5, "failed to take bucket lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(bucket_lease_cr->get_ret_status());
+ }
+ tn->log(5, "waiting on bucket lease");
+ yield set_sleeping(true);
+ }
+ // reread the status after acquiring the lock
+ yield call(new ReadCR(env->async_rados, env->svc->sysobj,
+ status_obj, &bucket_status, false, &objv));
+ if (retcode < 0) {
+ bucket_lease_cr->go_down();
+ drain_all();
+ bucket_lease_cr.reset();
+ return set_cr_error(retcode);
+ }
+ }
+
if (bucket_status.state == BucketSyncState::Init ||
bucket_status.state == BucketSyncState::Stopped) {
+ assert(bucket_lease_cr);
// init sync status
yield {
init_check_compat = objv.read_version.ver <= 1; // newly-created
init_check_compat));
}
if (retcode < 0) {
+ bucket_lease_cr->go_down();
+ drain_all();
+ bucket_lease_cr.reset();
return set_cr_error(retcode);
}
}
if (bucket_status.state == BucketSyncState::Full) {
- // take a bucket-wide lease for full sync to prevent the bucket shards
- // from duplicating the work
- yield {
- const std::string lock_name = "full sync";
- const uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
- bucket_lease_cr.reset(new RGWContinuousLeaseCR(env->async_rados, env->store, status_obj,
- lock_name, lock_duration, this));
- spawn(bucket_lease_cr.get(), false);
- }
- while (!bucket_lease_cr->is_locked()) {
- if (bucket_lease_cr->is_done()) {
- tn->log(5, "failed to take bucket lease");
- set_status("lease lock failed, early abort");
- drain_all();
- return set_cr_error(bucket_lease_cr->get_ret_status());
- }
- tn->log(5, "waiting on bucket lease");
- yield set_sleeping(true);
- }
-
+ assert(bucket_lease_cr);
yield call(new RGWBucketFullSyncCR(sc, sync_pipe, status_obj,
bucket_lease_cr, bucket_status,
tn, objv));
- bucket_lease_cr->go_down();
- drain_all();
- bucket_lease_cr.reset();
if (retcode < 0) {
+ bucket_lease_cr->go_down();
+ drain_all();
+ bucket_lease_cr.reset();
return set_cr_error(retcode);
}
}
if (bucket_status.state == BucketSyncState::Incremental) {
+ // lease not required for incremental sync
+ if (bucket_lease_cr) {
+ bucket_lease_cr->go_down();
+ drain_all();
+ bucket_lease_cr.reset();
+ }
yield call(new RGWSyncBucketShardCR(sc, data_lease_cr, sync_pair,
sync_pipe, bucket_status.state,
tn, progress));