RGWBucketInfo *bucket_info;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
list<rgw_bi_log_entry> list_result;
- list<rgw_bi_log_entry>::iterator entries_iter;
+ list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
rgw_bucket_shard_sync_info& sync_info;
rgw_obj_key key;
bool updated_status{false};
const string& status_oid;
const string& zone_id;
- ceph::real_time sync_modify_time;
string cur_id;
set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
&list_result));
- if (retcode < 0 && retcode != -ENOENT ) {
+ if (retcode < 0 && retcode != -ENOENT) {
+ /* wait for all operations to complete */
drain_all();
- if (!syncstopped) {
- /* wait for all operations to complete */
- return set_cr_error(retcode);
- } else {
- /* no need to retry */
- break;
- }
+ return set_cr_error(retcode);
}
squash_map.clear();
- for (auto& e : list_result) {
- if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP && (sync_modify_time < e.timestamp)) {
- ldout(sync_env->cct, 20) << " syncstop on " << e.timestamp << dendl;
- sync_modify_time = e.timestamp;
+ entries_iter = list_result.begin();
+ entries_end = list_result.end();
+ for (; entries_iter != entries_end; ++entries_iter) {
+ auto e = *entries_iter;
+ if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
+ ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
syncstopped = true;
- continue;
+ entries_end = entries_iter; // dont sync past here
+ break;
}
- if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC && (sync_modify_time < e.timestamp)) {
- ldout(sync_env->cct, 20) << " resync on " << e.timestamp << dendl;
- sync_modify_time = e.timestamp;
- syncstopped = false;
+ if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
continue;
}
if (e.op == CLS_RGW_OP_CANCEL) {
}
entries_iter = list_result.begin();
- for (; entries_iter != list_result.end(); ++entries_iter) {
+ for (; entries_iter != entries_end; ++entries_iter) {
if (!lease_cr->is_locked()) {
drain_all();
return set_cr_error(-ECANCELED);
sync_info.inc_marker.position = cur_id;
if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
- ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << " , skipping entry" << dendl;
+ ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
}
}
}
- } while (!list_result.empty() && sync_status == 0);
-
- if (syncstopped) {
- drain_all();
-
- yield {
- const string& oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
- RGWRados *store = sync_env->store;
- call(new RGWRadosRemoveCR(store, rgw_raw_obj{store->get_zone_params().log_pool, oid}));
- }
- lease_cr->abort();
- return set_cr_done();
- }
+ } while (!list_result.empty() && sync_status == 0 && !syncstopped);
while (num_spawned()) {
yield wait_for_child();
/* not waiting for child here */
}
}
-
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
+
+ if (syncstopped) {
+ // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
+ // still disabled, we'll delete the sync status object. otherwise we'll
+ // restart full sync to catch any changes that happened while sync was
+ // disabled
+ sync_info.state = rgw_bucket_shard_sync_info::StateInit;
+ return set_cr_done();
+ }
+
yield call(marker_tracker.flush());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: marker_tracker.flush() returned retcode=" << retcode));
}
if (sync_status < 0) {
tn->log(0, SSTR("ERROR: failure in sync, backing out (sync_status=" << sync_status<< ")"));
- }
-
- /* wait for all operations to complete */
- drain_all();
-
- if (sync_status < 0) {
return set_cr_error(sync_status);
}
-
return set_cr_done();
}
return 0;
return set_cr_error(retcode);
}
- if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
- if (retcode == -ENOENT) {
- tn->log(0, "bucket sync disabled");
- lease_cr->go_down();
- drain_all();
- return set_cr_done();
- }
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ do {
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
+ if (retcode == -ENOENT) {
+ tn->log(0, "bucket sync disabled");
+ lease_cr->abort(); // deleted lease object, abort instead of unlock
+ drain_all();
+ return set_cr_done();
+ }
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: init sync on bucket failed, retcode=" << retcode));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- }
- if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
- status_oid, lease_cr.get(),
- sync_status, tn));
- if (retcode < 0) {
- tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
+ yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+ status_oid, lease_cr.get(),
+ sync_status, tn));
+ if (retcode < 0) {
+ tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- }
- if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
- status_oid, lease_cr.get(),
- sync_status, tn));
- if (retcode < 0) {
- tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
- lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
+ if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
+ yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+ status_oid, lease_cr.get(),
+ sync_status, tn));
+ if (retcode < 0) {
+ tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
}
- }
+ // loop back to previous states unless incremental sync returns normally
+ } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
lease_cr->go_down();
drain_all();