boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
bool lost_lock = false;
+ bool lost_bid = false;
bool *reset_backoff;
oid = full_sync_index_shard_oid(shard_id);
can_adjust_marker = true;
/* grab lock */
+
+ if (!sync_env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(10, "not the highest bidder");
+ return -EBUSY;
+ }
+
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
/* sync! */
do {
if (!lease_cr->is_locked()) {
- if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
- yield {
- uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
- string lock_name = "sync_lock";
- rgw::sal::RadosStore* store = sync_env->store;
-
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr));
- lease_stack.reset(spawn(lease_cr.get(), false));
- lost_lock = false;
- }
- } else {
- //lost the bid. exit gracefully.
- lost_lock = true;
- sync_marker.marker = marker;
- ldpp_dout(sync_env->dpp, 5) << *this << ": saving marker " << marker << dendl;
-
- using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
- yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
- rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- sync_marker));
-
- if (retcode < 0) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
- yield lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
- yield {
- set_status("writing sync info");
- rgw::sal::RadosStore* store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->dpp, store,
- rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
- sync_info));
- }
-
- if (retcode < 0) {
- set_status("failed to write sync status");
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
- yield lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
-
- yield lease_cr->go_down();
-
- lease_cr.reset();
-
- drain_all();
- tn->log(5, "failed to take lease");
- return lease_cr->get_ret_status();
- set_sleeping(true);
- yield;
+ tn->log(1, "lease is lost, abort");
+ lost_lock = true;
+ break;
}
+
+ if (!sync_env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(1, "lost bid");
+ lost_bid = true;
+ break;
}
omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
collect_children();
}
- if (!lost_lock) {
+ if (lost_bid) {
+ yield call(marker_tracker->flush());
+ } else if (!lost_lock) {
/* update marker to reflect we're done with full sync */
if (can_adjust_marker) {
// apply updates to a temporary marker, or operate() will send us
return -EAGAIN;
}
- if (lost_lock) {
+ if (lost_lock || lost_bid) {
return -EBUSY;
}
tn->log(10, "start incremental sync");
can_adjust_marker = true;
/* grab lock */
+
+ if (!sync_env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(10, "not the highest bidder");
+ return -EBUSY;
+ }
+
if (!lease_cr) { /* could have had a lease_cr lock from previous state */
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
/* inc sync */
do {
if (!lease_cr->is_locked()) {
- if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
- yield {
- uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
- string lock_name = "sync_lock";
- rgw::sal::RadosStore* store = sync_env->store;
-
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr));
- lease_stack.reset(spawn(lease_cr.get(), false));
- lost_lock = false;
- }
- } else {
- //lost the bid. exit gracefully.
- lost_lock = true;
- sync_marker.marker = marker;
- ldpp_dout(sync_env->dpp, 5) << *this << ": saving marker " << marker << dendl;
-
- using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
- yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
- rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- sync_marker));
-
- if (retcode < 0) {
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
- yield lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
- yield {
- set_status("writing sync info");
- rgw::sal::RadosStore* store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->dpp, store,
- rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
- sync_info));
- }
-
- if (retcode < 0) {
- set_status("failed to write sync status");
- ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
- yield lease_cr->go_down();
- drain_all();
- return set_cr_error(retcode);
- }
-
- yield lease_cr->go_down();
-
- lease_cr.reset();
+ lost_lock = true;
+ tn->log(1, "lease is lost, abort");
+ break;
+ }
- drain_all();
- tn->log(5, "failed to take lease");
- return lease_cr->get_ret_status();
- set_sleeping(true);
- yield;
- }
+ if (!sync_env->bid_manager->is_highest_bidder(shard_id)) {
+ tn->log(1, "lost bid");
+ lost_bid = true;
+ break;
}
+
#define INCREMENTAL_MAX_ENTRIES 100
ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << " truncated=" << truncated << dendl;
if (!period_marker.empty() && period_marker <= mdlog_marker) {
drain_all();
- if (lost_lock) {
+ if (lost_lock || lost_bid) {
return -EBUSY;
}
using ControlCRRef = boost::intrusive_ptr<RGWMetaSyncShardControlCR>;
using StackRef = boost::intrusive_ptr<RGWCoroutinesStack>;
using RefPair = std::pair<ControlCRRef, StackRef>;
+ RGWCoroutinesStack *notify_stack;
map<int, RefPair> shard_crs;
int ret{0};
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
yield {
- //create watch object for sync lock bids
- const auto& control_pool = sync_env->store->svc()->zone->get_zone_params().control_pool;
- auto control_obj = rgw_raw_obj{control_pool, meta_sync_bids_oid};
-
- auto bid_manager = rgw::sync_fairness::create_rados_bid_manager(
- sync_env->store, control_obj, sync_status.sync_info.num_shards);
- ret = bid_manager->start();
- if (ret < 0) {
- tn->log(0, SSTR("ERROR: failed to start bidding manager " << ret));
- return ret;
- }
-
- //cr for broadcasting sync lock notifications in the background
- sync_env->bid_manager = bid_manager.get();
- call(new RGWMetaSyncShardNotifyCR(sync_env, tn));
- if (retcode < 0) {
- tn->log(5, SSTR("ERROR: failed to notify bidding information" << retcode));
- continue;
- }
+ ldpp_dout(dpp, 10) << "broadcast sync lock notify" << dendl;
+ notify_stack = spawn(new RGWMetaSyncShardNotifyCR(sync_env, tn), false);
}
// loop through one period at a time
}
}
// wait for each shard to complete
- while (ret == 0 && num_spawned() > 0) {
+ while (ret == 0 && num_spawned() > 1) {
yield wait_for_child();
collect(&ret, nullptr);
}
- drain_all();
+ drain_all_but_stack(notify_stack);
{
// drop shard cr refs under lock
std::lock_guard<std::mutex> lock(mutex);
rgw_raw_obj(pool, sync_env->status_oid()),
sync_status.sync_info));
}
+ notify_stack->cancel();
+
+ drain_all();
}
return 0;
}
if (num_shards != mdlog_info.num_shards) {
ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
return -EINVAL;
- }
+ }
+
+ // construct and start the bid manager for sync fairness
+ const auto& control_pool = store->svc()->zone->get_zone_params().control_pool;
+ auto control_obj = rgw_raw_obj{control_pool, meta_sync_bids_oid};
+
+ auto bid_manager = rgw::sync_fairness::create_rados_bid_manager(
+ store, control_obj, num_shards);
+ r = bid_manager->start();
+ if (r < 0) {
+ return r;
+ }
+ sync_env.bid_manager = bid_manager.get();
RGWPeriodHistory::Cursor cursor;
do {