static const string bucket_status_oid_prefix = "bucket.sync-status";
static const string object_status_oid_prefix = "bucket.sync-status";
+static const string data_sync_bids_oid = "data-sync-bids";
+
void rgw_datalog_info::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("num_objects", num_shards, obj);
}
}
};
+class RGWDataSyncShardNotifyCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWSyncTraceNodeRef tn;
+
+public:
+ RGWDataSyncShardNotifyCR(RGWDataSyncEnv *_sync_env, RGWSyncTraceNodeRef& _tn)
+ : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env), tn(_tn) {}
+
+ int operate(const DoutPrefixProvider* dpp) override
+ {
+ reenter(this) {
+ for (;;) {
+ set_status("sync lock notification");
+ yield call(sync_env->bid_manager->notify_cr());
+ if (retcode < 0) {
+ tn->log(5, SSTR("ERROR: failed to notify bidding information" << retcode));
+ return set_cr_error(retcode);
+ }
+
+ set_status("sleeping");
+ yield wait(utime_t(cct->_conf->rgw_sync_lease_period, 0));
+ }
+
+ }
+ return 0;
+ }
+};
+
class RGWDataSyncCR : public RGWCoroutine {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+ boost::intrusive_ptr<RGWCoroutinesStack> notify_stack;
RGWObjVersionTracker obj_version;
public:
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
+ yield {
+ ldpp_dout(dpp, 10) << "broadcast sync lock notify" << dendl;
+ notify_stack.reset(spawn(new RGWDataSyncShardNotifyCR(sync_env, tn), false));
+ }
+
/* read sync status */
yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status,
&obj_version, objvs));
int RGWRemoteDataLog::run_sync(const DoutPrefixProvider *dpp, int num_shards)
{
+ // construct and start bid manager for data sync fairness
+ const auto& control_pool = sc.env->driver->svc()->zone->get_zone_params().control_pool;
+ auto control_obj = rgw_raw_obj{control_pool, data_sync_bids_oid};
+
+ auto bid_manager = rgw::sync_fairness::create_rados_bid_manager(
+ driver, control_obj, num_shards);
+ int r = bid_manager->start();
+ if (r < 0) {
+ return r;
+ }
+ sc.env->bid_manager = bid_manager.get();
+
lock.lock();
data_sync_cr = new RGWDataSyncControlCR(&sc, num_shards, tn);
data_sync_cr->get(); // run() will drop a ref, so take another
lock.unlock();
- int r = run(dpp, data_sync_cr);
+ r = run(dpp, data_sync_cr);
lock.lock();
data_sync_cr->put();