std::map<std::string, bufferlist> entries;
std::map<std::string, bufferlist>::iterator iter;
string error_marker;
+ bool lost_lock = false;
+ bool lost_bid = false;
public:
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
if (!lease_cr->is_locked()) {
- drain_all();
tn->log(1, "lease is lost, abort");
- return set_cr_error(-ECANCELED);
+ lost_lock = true;
+ break;
+ }
+
+ if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(1, "lost bid");
+ lost_bid = true;
+ break;
}
+
omapvals = std::make_shared<RGWRadosGetOmapValsCR::Result>();
yield call(new RGWRadosGetOmapValsCR(sc->env->driver,
rgw_raw_obj(pool, oid),
} while (omapvals->more);
omapvals.reset();
- drain_all();
-
tn->unset_flag(RGW_SNS_FLAG_ACTIVE);
- /* update marker to reflect we're done with full sync */
- sync_marker.state = rgw_data_sync_marker::IncrementalSync;
- sync_marker.marker = sync_marker.next_step_marker;
- sync_marker.next_step_marker.clear();
- yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
- sc->env->dpp, sc->env->driver,
- rgw_raw_obj(pool, status_oid), sync_marker, &objv));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
- return set_cr_error(retcode);
+ if (lost_bid) {
+ yield call(marker_tracker->flush());
+ } else if (!lost_lock) {
+ /* update marker to reflect we're done with full sync */
+ sync_marker.state = rgw_data_sync_marker::IncrementalSync;
+ sync_marker.marker = sync_marker.next_step_marker;
+ sync_marker.next_step_marker.clear();
+ yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
+ sc->env->dpp, sc->env->driver,
+ rgw_raw_obj(pool, status_oid), sync_marker, &objv));
+ if (retcode < 0) {
+ tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
+ return set_cr_error(retcode);
+ }
+
+ // clean up full sync index, ignoring errors
+ yield call(new RGWRadosRemoveCR(sc->env->driver, {pool, oid}));
+
+ // transition to incremental sync
+ return set_cr_done();
}
- // clean up full sync index, ignoring errors
- yield call(new RGWRadosRemoveCR(sc->env->driver, {pool, oid}));
+ if (lost_lock || lost_bid) {
+ return set_cr_error(-EBUSY);
+ }
- // transition to incremental sync
- return set_cr_done();
- }
- return 0;
+ } return 0;
}
};
decltype(log_entries)::iterator log_iter;
bool truncated = false;
int cbret = 0;
+ bool lost_lock = false;
+ bool lost_bid = false;
utime_t get_idle_interval() const {
ceph::timespan interval = std::chrono::seconds(cct->_conf->rgw_data_sync_poll_interval);
marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
do {
if (!lease_cr->is_locked()) {
- drain_all();
+ lost_lock = true;
tn->log(1, "lease is lost, abort");
- return set_cr_error(-ECANCELED);
+ break;
+ }
+
+ if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(1, "lost bid");
+ lost_bid = true;
+ break;
}
{
current_modified.clear();
modified_iter != current_modified.end();
++modified_iter) {
if (!lease_cr->is_locked()) {
- drain_all();
- yield call(marker_tracker->flush());
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
- return set_cr_error(retcode);
- }
- return set_cr_error(-ECANCELED);
+ tn->log(1, "lease is lost, abort");
+ lost_lock = true;
+ break;
}
retcode = parse_bucket_key(modified_iter->key, source_bs);
if (retcode < 0) {
iter = error_entries.begin();
for (; iter != error_entries.end(); ++iter) {
if (!lease_cr->is_locked()) {
- drain_all();
- yield call(marker_tracker->flush());
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
- return set_cr_error(retcode);
- }
- return set_cr_error(-ECANCELED);
+ tn->log(1, "lease is lost, abort");
+ lost_lock = true;
+ break;
}
error_marker = iter->first;
entry_timestamp = rgw::error_repo::decode_value(iter->second);
log_iter != log_entries.end();
++log_iter) {
if (!lease_cr->is_locked()) {
- drain_all();
- yield call(marker_tracker->flush());
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: data sync marker_tracker.flush() returned retcode=" << retcode));
- return set_cr_error(retcode);
- }
- return set_cr_error(-ECANCELED);
+ tn->log(1, "lease is lost, abort");
+ lost_lock = true;
+ break;
}
tn->log(20, SSTR("shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key));
yield wait(get_idle_interval());
}
} while (true);
+
+ if (lost_bid) {
+ return set_cr_error(-EBUSY);
+ } else if (lost_lock) {
+ drain_all();
+ yield marker_tracker->flush();
+ return set_cr_error(-ECANCELED);
+ }
+
}
return 0;
}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
+
+ if (!sc->env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(10, "not the highest bidder");
+ return set_cr_error(-EBUSY);
+ }
+
yield init_lease_cr();
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
}
*reset_backoff = true;
tn->log(10, "took lease");
- /* Reread data sync status to fech latest marker and objv */
+ /* Reread data sync status to fetch latest marker and objv */
objv.clear();
yield call(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->driver,
rgw_raw_obj(pool, status_oid),
if (init_lease->is_done()) {
tn->log(5, "ERROR: failed to take data sync status lease");
set_status("lease lock failed, early abort");
- drain_all();
+ drain_all_but_stack(notify_stack.get());
return set_cr_error(init_lease->get_ret_status());
}
tn->log(5, "waiting on data sync status lease");
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
init_lease->go_down();
- drain_all();
+ drain_all_but_stack(notify_stack.get());
return set_cr_error(retcode);
}
// sets state = StateBuildingFullSyncMaps
if (!init_lease->is_locked()) {
init_lease->go_down();
- drain_all();
+ drain_all_but_stack(notify_stack.get());
return set_cr_error(-ECANCELED);
}
/* state: building full sync maps */
if (!init_lease->is_locked()) {
init_lease->go_down();
- drain_all();
+ drain_all_but_stack(notify_stack.get());
return set_cr_error(-ECANCELED);
}
/* update new state */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
if (init_lease) {
init_lease->go_down();
- drain_all();
+ drain_all_but_stack(notify_stack.get());
init_lease.reset();
lease_stack.reset();
}
}
}
+ notify_stack.get()->cancel();
+
return set_cr_done();
}
return 0;