class RGWMetaSyncShardCR : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
-
+ rgw_meta_sync_info sync_info;
const rgw_pool& pool;
const std::string& period; //< currently syncing period id
const epoch_t realm_epoch; //< realm_epoch of period
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr, shard_id, sync_env->bid_manager));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
lost_lock = false;
}
set_sleeping(true);
yield;
}
-
tn->log(10, "took lease");
//work_period_end = ceph::coarse_mono_clock::now() + work_duration;
/* sync! */
work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- drain_all();
- tn->log(5, "failed to take lease");
- return lease_cr->get_ret_status();
+ if (!lease_cr->is_locked()) {
+ if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
+ yield {
+ uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
+ string lock_name = "sync_lock";
+ rgw::sal::RadosStore* store = sync_env->store;
+
+ lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
+ rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
+ lock_name, lock_duration, this, nullptr));
+ lease_stack.reset(spawn(lease_cr.get(), false));
+ lost_lock = false;
+ }
+ } else {
+ //lost the bid. exit gracefully.
+ lost_lock = true;
+ sync_marker.marker = marker;
+ ldpp_dout(sync_env->dpp, 5) << *this << ": saving marker " << marker << dendl;
+
+ using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
+ yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
+ rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
+ sync_marker));
+
+ if (retcode < 0) {
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ yield lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ yield {
+ set_status("writing sync info");
+ rgw::sal::RadosStore* store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->dpp, store,
+ rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
+ sync_info));
+ }
+
+ if (retcode < 0) {
+ set_status("failed to write sync status");
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ yield lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ yield lease_cr->go_down();
+
+ lease_cr.reset();
+
+ drain_all();
+ tn->log(5, "failed to take lease");
+ return lease_cr->get_ret_status();
+ set_sleeping(true);
+ yield;
+ }
}
- set_sleeping(true);
- yield;
- }
omapkeys = std::make_shared<RGWRadosGetOmapKeysCR::Result>();
yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid),
* if we reached here, it means that lost_lock is true, otherwise the state
* change in the previous block will prevent us from reaching here
*/
+
yield lease_cr->go_down();
lease_cr.reset();
tn->log(10, "start incremental sync");
can_adjust_marker = true;
/* grab lock */
- if (!lease_cr) { /* could have had a lock lease from
- * previous state */
+ if (!lease_cr) { /* could have had a lease_cr lock from previous state */
yield {
- tn->log(20, "creating RGWContinuousLeaseCR");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
rgw::sal::RadosStore* store = sync_env->store;
lease_cr.reset( new RGWContinuousLeaseCR(sync_env->async_rados, store,
rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr, shard_id, sync_env->bid_manager));
+ lock_name, lock_duration, this, nullptr));
lease_stack.reset(spawn(lease_cr.get(), false));
-
lost_lock = false;
}
while (!lease_cr->is_locked()) {
if (lease_cr->is_done()) {
drain_all();
- tn->log(10, "failed to take lease");
+ tn->log(5, "failed to take lease");
return lease_cr->get_ret_status();
}
set_sleeping(true);
yield;
}
}
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- drain_all();
- tn->log(10, "failed to take lease");
- return lease_cr->get_ret_status();
- }
- tn->log(20, "do not yet have lock; will try again");
- set_sleeping(true);
- yield;
- }
+ tn->log(10, "took lease");
+ // if the period has advanced, we can't use the existing marker
/*
if (work_period_end_unset == work_period_end) {
work_period_end = ceph::coarse_mono_clock::now() + work_duration;
work_period_end = ceph::coarse_mono_clock::now() + work_duration;
do {
if (!lease_cr->is_locked()) {
- lost_lock = true;
- tn->log(10, "lost lease due to expiration");
- break;
+ if (sync_env->bid_manager->is_highest_bidder(shard_id, ceph::coarse_mono_clock::now())) {
+ yield {
+ uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
+ string lock_name = "sync_lock";
+ rgw::sal::RadosStore* store = sync_env->store;
+
+ lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
+ rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
+ lock_name, lock_duration, this, nullptr));
+ lease_stack.reset(spawn(lease_cr.get(), false));
+ lost_lock = false;
+ }
+ } else {
+ //lost the bid. exit gracefully.
+ lost_lock = true;
+ sync_marker.marker = marker;
+ ldpp_dout(sync_env->dpp, 5) << *this << ": saving marker " << marker << dendl;
+
+ using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_meta_sync_marker>;
+ yield call(new WriteMarkerCR(sync_env->dpp, sync_env->store,
+ rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
+ sync_marker));
+
+ if (retcode < 0) {
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ yield lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+ yield {
+ set_status("writing sync info");
+ rgw::sal::RadosStore* store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->dpp, store,
+ rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
+ sync_info));
+ }
+
+ if (retcode < 0) {
+ set_status("failed to write sync status");
+ ldpp_dout(sync_env->dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ yield lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
+ yield lease_cr->go_down();
+
+ lease_cr.reset();
+
+ drain_all();
+ tn->log(5, "failed to take lease");
+ return lease_cr->get_ret_status();
+ set_sleeping(true);
+ yield;
+ }
}
#define INCREMENTAL_MAX_ENTRIES 100
ldpp_dout(sync_env->dpp, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << " truncated=" << truncated << dendl;
class RGWMetaSyncShardNotifyCR : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
- const utime_t interval;
+ RGWSyncTraceNodeRef tn;
public:
- RGWMetaSyncShardNotifyCR(RGWMetaSyncEnv *_sync_env,
- const utime_t interval)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- interval(interval) {}
+ RGWMetaSyncShardNotifyCR(RGWMetaSyncEnv *_sync_env, RGWSyncTraceNodeRef& _tn)
+ : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env), tn(_tn) {}
int operate(const DoutPrefixProvider* dpp) override
{
reenter(this) {
for (;;) {
- set_status('sync lock notification');
- yield call(sync_env.bid_manager->notify_cr());
+ set_status("sync lock notification");
+ yield call(sync_env->bid_manager->notify_cr());
if (retcode < 0) {
tn->log(5, SSTR("ERROR: failed to notify bidding information" << retcode));
return set_cr_error(retcode);
}
- set_status('sleeping');
- yield wait(interval);
+ set_status("sleeping");
+ yield wait(utime_t(cct->_conf->rgw_sync_lease_period, 0));
}
-/*
- if (sync_env->bid_manager->is_highest_bidder(shard_id, current_time)) {
- yield {
- uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
- string lock_name = "sync_lock";
- rgw::sal::RadosStore* store = sync_env->store;
-
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- rgw_raw_obj(pool, sync_env->shard_obj_name(shard_id)),
- lock_name, lock_duration, this, nullptr));
- lease_stack.reset(spawn(lease_cr.get(), false));
- lost_lock = false;
- }
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- drain_all();
- tn->log(5, "failed to take lease");
- return lease_cr->get_ret_status();
- }
- set_sleeping(true);
- yield;
- }
- } else {
- yield {
- set_status("writing sync status");
- rgw::sal::RadosStore* store = sync_env->store;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(dpp, store,
- rgw_raw_obj(store->svc()->zone->get_zone_params().log_pool, sync_env->status_oid()),
- sync_status));
- }
-
- if (retcode < 0) {
- set_status("failed to write sync status");
- ldpp_dout(dpp, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
- yield lease_cr->go_down();
- return set_cr_error(retcode);
- }
-
- yield lease_cr->go_down();
-
- lease_cr.reset();
-
- drain_all();
-
- }
-*/
}
return 0;
}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
+ yield {
+ //create watch object for sync lock bids
+ const auto& control_pool = sync_env->store->svc()->zone->get_zone_params().control_pool;
+ auto control_obj = rgw_raw_obj{control_pool, meta_sync_bids_oid};
+
+ auto bid_manager = rgw::sync_fairness::create_rados_bid_manager(
+ sync_env->store, control_obj, sync_status.sync_info.num_shards);
+ ret = bid_manager->start();
+ if (ret < 0) {
+ tn->log(0, SSTR("ERROR: failed to start bidding manager " << ret));
+ return ret;
+ }
+
+ //cr for broadcasting sync lock notifications in the background
+ sync_env->bid_manager = bid_manager.get();
+ call(new RGWMetaSyncShardNotifyCR(sync_env, tn));
+ if (retcode < 0) {
+ tn->log(5, SSTR("ERROR: failed to notify bidding information" << retcode));
+ continue;
+ }
+ }
+
// loop through one period at a time
tn->log(1, "start");
for (;;) {
continue;
}
}
+
using ShardCR = RGWMetaSyncShardControlCR;
auto cr = new ShardCR(sync_env, pool, period_id, realm_epoch,
mdlog, shard_id, marker,
if (num_shards != mdlog_info.num_shards) {
ldpp_dout(dpp, -1) << "ERROR: can't sync, mismatch between num shards, master num_shards=" << mdlog_info.num_shards << " local num_shards=" << num_shards << dendl;
return -EINVAL;
- }
- const auto& control_pool = store->svc()->zone->get_zone_params().control_pool;
- auto control_obj = rgw_raw_obj{control_pool, meta_sync_bids_oid};
-
- auto bid_manager = rgw::sync_fairness::create_rados_bid_manager(
- store, control_obj, num_shards);
- r = bid_manager->start();
- if (r < 0) {
- tn->log(0, SSTR("ERROR: failed to start bidding manager " << r));
- return r;
- }
- sync_env.bid_manager = bid_manager.get();
- auto notifyrgws = sync_env.bid_manager->notify_cr();
- r = run(dpp, notifyrgws);
- if(r < 0) {
- tn->log(0, SSTR("ERROR: failed to sync bidding information " << r));
- return r;
}
RGWPeriodHistory::Cursor cursor;