}
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
- static constexpr uint32_t lock_duration = 30;
- RGWDataSyncCtx *sc;
- RGWDataSyncEnv *sync_env;
- rgw::sal::RadosStore* driver; // RGWDataSyncEnv also has a pointer to driver
- const rgw_pool& pool;
+ static constexpr auto lock_name{ "sync_lock"sv };
+ RGWDataSyncCtx* const sc;
+ RGWDataSyncEnv* const sync_env{ sc->env };
const uint32_t num_shards;
+ rgw_data_sync_status* const status;
+ RGWSyncTraceNodeRef tn;
+ boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
- string sync_status_oid;
+ const rgw_pool& pool{ sync_env->svc->zone->get_zone_params().log_pool };
+ const string sync_status_oid{
+ RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
- string lock_name;
- string cookie;
- rgw_data_sync_status *status;
std::vector<RGWObjVersionTracker>& objvs;
map<int, RGWDataChangesLogInfo> shards_info;
- RGWSyncTraceNodeRef tn;
-public:
- RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
- uint64_t instance_id,
- RGWSyncTraceNodeRef& _tn_parent,
- rgw_data_sync_status *status,
- std::vector<RGWObjVersionTracker>& objvs)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver),
- pool(sync_env->svc->zone->get_zone_params().log_pool),
- num_shards(num_shards), status(status), objvs(objvs),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
- lock_name = "sync_lock";
+public:
+ RGWInitDataSyncStatusCoroutine(
+ RGWDataSyncCtx* _sc, uint32_t num_shards, uint64_t instance_id,
+ const RGWSyncTraceNodeRef& tn_parent, rgw_data_sync_status* status,
+ boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
+ std::vector<RGWObjVersionTracker>& objvs)
+ : RGWCoroutine(_sc->cct), sc(_sc), num_shards(num_shards), status(status),
+ tn(sync_env->sync_tracer->add_node(tn_parent, "init_data_sync_status")),
+ lease_cr(std::move(lease_cr)), objvs(objvs) {
status->sync_info.instance_id = instance_id;
+ }
-#define COOKIE_LEN 16
- char buf[COOKIE_LEN + 1];
-
- gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
- cookie = buf;
-
- sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sc->source_zone);
-
+ static auto continuous_lease_cr(RGWDataSyncCtx* const sc,
+ RGWCoroutine* const caller) {
+ auto lock_duration = sc->cct->_conf->rgw_sync_lease_period;
+ return new RGWContinuousLeaseCR(
+ sc->env->async_rados, sc->env->driver,
+ { sc->env->svc->zone->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) },
+ string(lock_name), lock_duration, caller);
}
int operate(const DoutPrefixProvider *dpp) override {
int ret;
reenter(this) {
- using LockCR = RGWSimpleRadosLockCR;
- yield call(new LockCR(sync_env->async_rados, driver,
- rgw_raw_obj{pool, sync_status_oid},
- lock_name, cookie, lock_duration));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
- return set_cr_error(retcode);
+ if (!lease_cr->is_locked()) {
+ drain_all();
+ return set_cr_error(-ECANCELED);
}
+
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj{pool, sync_status_oid},
return set_cr_error(retcode);
}
- /* take lock again, we just recreated the object */
- yield call(new LockCR(sync_env->async_rados, driver,
- rgw_raw_obj{pool, sync_status_oid},
- lock_name, cookie, lock_duration));
- if (retcode < 0) {
- tn->log(0, SSTR("ERROR: failed to take a lock on " << sync_status_oid));
- return set_cr_error(retcode);
- }
-
- tn->log(10, "took lease");
+ // In the original code we reacquired the lock. Since
+ // RGWSimpleRadosWriteCR doesn't appear to touch the attributes
+ // and cls_version works across it, this should be unnecessary.
+ // Putting a note here just in case. If we see ECANCELED where
+ // we expect EBUSY, we can revisit this.
/* fetch current position in logs */
yield {
tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
return set_cr_error(retcode);
}
- yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, driver,
- rgw_raw_obj{pool, sync_status_oid},
- lock_name, cookie));
return set_cr_done();
}
return 0;
return ret;
}
+namespace RGWRDL {
+class DataSyncInitCR : public RGWCoroutine {
+ RGWDataSyncCtx* const sc;
+ const uint32_t num_shards;
+ uint64_t instance_id;
+ const RGWSyncTraceNodeRef& tn;
+ rgw_data_sync_status* const sync_status;
+ std::vector<RGWObjVersionTracker>& objvs;
+
+ boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+
+public:
+
+ DataSyncInitCR(RGWDataSyncCtx* sc, uint32_t num_shards, uint64_t instance_id,
+ const RGWSyncTraceNodeRef& tn,
+ rgw_data_sync_status* sync_status,
+ std::vector<RGWObjVersionTracker>& objvs)
+ : RGWCoroutine(sc->cct), sc(sc), num_shards(num_shards),
+ instance_id(instance_id), tn(tn),
+ sync_status(sync_status), objvs(objvs) {}
+
+ ~DataSyncInitCR() override {
+ if (lease_cr) {
+ lease_cr->abort();
+ }
+ }
+
+ int operate(const DoutPrefixProvider *dpp) override {
+ reenter(this) {
+ lease_cr.reset(
+ RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
+
+ yield spawn(lease_cr.get(), false);
+ while (!lease_cr->is_locked()) {
+ if (lease_cr->is_done()) {
+ tn->log(5, "ERROR: failed to take data sync status lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(lease_cr->get_ret_status());
+ }
+ tn->log(5, "waiting on data sync status lease");
+ yield set_sleeping(true);
+ }
+ tn->log(5, "acquired data sync status lease");
+ yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
+ tn, sync_status, lease_cr, objvs));
+ lease_cr->go_down();
+ lease_cr.reset();
+ drain_all();
+ if (retcode < 0) {
+ set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+}
+
int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
{
rgw_data_sync_status sync_status;
auto instance_id = ceph::util::generate_random_number<uint64_t>();
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
- ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs));
+ ret = crs.run(dpp, new RGWRDL::DataSyncInitCR(&sc_local, num_shards,
+ instance_id, tn, &sync_status, objvs));
http_manager.stop();
return ret;
}
RGWDataSyncModule *data_sync_module{nullptr};
RGWObjVersionTracker objv;
+
+ boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
+ boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+
public:
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
for (auto iter : shard_crs) {
iter.second->put();
}
+ if (init_lease) {
+ init_lease->abort();
+ }
}
int operate(const DoutPrefixProvider *dpp) override {
return set_cr_error(retcode);
}
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state !=
+ rgw_data_sync_info::StateSync) {
+ init_lease.reset(
+ RGWInitDataSyncStatusCoroutine::continuous_lease_cr(sc, this));
+ yield lease_stack.reset(spawn(init_lease.get(), false));
+
+ while (!init_lease->is_locked()) {
+ if (init_lease->is_done()) {
+ tn->log(5, "ERROR: failed to take data sync status lease");
+ set_status("lease lock failed, early abort");
+ drain_all();
+ return set_cr_error(init_lease->get_ret_status());
+ }
+ tn->log(5, "waiting on data sync status lease");
+ yield set_sleeping(true);
+ }
+ tn->log(5, "acquired data sync status lease");
+ }
+
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
tn->log(20, SSTR("init"));
sync_status.sync_info.num_shards = num_shards;
uint64_t instance_id;
instance_id = ceph::util::generate_random_number<uint64_t>();
- yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs));
+ yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn,
+ &sync_status, init_lease, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
+ init_lease->go_down();
+ drain_all();
return set_cr_error(retcode);
}
// sets state = StateBuildingFullSyncMaps
tn->log(0, SSTR("ERROR: sync module init_sync() failed, retcode=" << retcode));
return set_cr_error(retcode);
}
+
+ if (!init_lease->is_locked()) {
+ init_lease->go_down();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
/* state: building full sync maps */
yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
if (retcode < 0) {
}
sync_status.sync_info.state = rgw_data_sync_info::StateSync;
+ if (!init_lease->is_locked()) {
+ init_lease->go_down();
+ drain_all();
+ return set_cr_error(-ECANCELED);
+ }
/* update new state */
yield call(set_sync_info_cr());
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to start sync, retcode=" << retcode));
return set_cr_error(retcode);
}
-
- yield {
- if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+
+ if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
+ if (init_lease) {
+ init_lease->go_down();
+ drain_all();
+ init_lease.reset();
+ lease_stack.reset();
+ }
+ yield {
tn->log(10, SSTR("spawning " << num_shards << " shards sync"));
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {