RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
+ RGWObjVersionTracker* objv_tracker;
std::vector<RGWObjVersionTracker>& objvs;
public:
RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc,
- rgw_data_sync_status *_status, std::vector<RGWObjVersionTracker>& objvs)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env),
- sync_status(_status), objvs(objvs)
+ rgw_data_sync_status *_status,
+ RGWObjVersionTracker* objv_tracker,
+ std::vector<RGWObjVersionTracker>& objvs)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status),
+ objv_tracker(objv_tracker), objvs(objvs)
{}
int operate(const DoutPrefixProvider *dpp) override;
};
bool empty_on_enoent = false; // fail on ENOENT
call(new ReadInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
- &sync_status->sync_info, empty_on_enoent));
+ &sync_status->sync_info, empty_on_enoent, objv_tracker));
}
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to read sync status info with "
rgw_data_sync_status* const status;
RGWSyncTraceNodeRef tn;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ RGWObjVersionTracker& objv_tracker;
+ std::vector<RGWObjVersionTracker>& objvs;
const rgw_pool& pool{ sync_env->svc->zone->get_zone_params().log_pool };
const string sync_status_oid{
RGWDataSyncStatusManager::sync_status_oid(sc->source_zone) };
- std::vector<RGWObjVersionTracker>& objvs;
map<int, RGWDataChangesLogInfo> shards_info;
RGWDataSyncCtx* _sc, uint32_t num_shards, uint64_t instance_id,
const RGWSyncTraceNodeRef& tn_parent, rgw_data_sync_status* status,
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr,
+ RGWObjVersionTracker& objv_tracker,
std::vector<RGWObjVersionTracker>& objvs)
: RGWCoroutine(_sc->cct), sc(_sc), num_shards(num_shards), status(status),
tn(sync_env->sync_tracer->add_node(tn_parent, "init_data_sync_status")),
- lease_cr(std::move(lease_cr)), objvs(objvs) {
+ lease_cr(std::move(lease_cr)), objv_tracker(objv_tracker), objvs(objvs) {
status->sync_info.instance_id = instance_id;
}
using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj{pool, sync_status_oid},
- status->sync_info));
+ status->sync_info, &objv_tracker));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
return set_cr_error(retcode);
status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
yield call(new WriteInfoCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj{pool, sync_status_oid},
- status->sync_info));
+ status->sync_info, &objv_tracker));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to write sync status info with " << retcode));
return set_cr_error(retcode);
int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status)
{
// cannot run concurrently with run_sync(), so run in a separate manager
- std::vector<RGWObjVersionTracker> objvs;
+ RGWObjVersionTracker objv;
+ std::vector<RGWObjVersionTracker> shard_objvs;
RGWCoroutinesManager crs(cct, cr_registry);
RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
- ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status, objvs));
+ ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status,
+ &objv, shard_objvs));
http_manager.stop();
return ret;
}
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
+ RGWObjVersionTracker objv_tracker;
+
public:
DataSyncInitCR(RGWDataSyncCtx* sc, uint32_t num_shards, uint64_t instance_id,
yield set_sleeping(true);
}
tn->log(5, "acquired data sync status lease");
+ objv_tracker.generate_new_write_ver(sc->cct);
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id,
- tn, sync_status, lease_cr, objvs));
+ tn, sync_status, lease_cr,
+ objv_tracker, objvs));
lease_cr->go_down();
lease_cr.reset();
drain_all();
RGWSyncTraceNodeRef tn;
RGWDataSyncModule *data_sync_module{nullptr};
- RGWObjVersionTracker objv;
boost::intrusive_ptr<RGWContinuousLeaseCR> init_lease;
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
+ RGWObjVersionTracker obj_version;
public:
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
reenter(this) {
/* read sync status */
- yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, objvs));
+ yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status,
+ &obj_version, objvs));
data_sync_module = sync_env->sync_module->get_data_handler();
tn->log(5, "acquired data sync status lease");
// Reread sync status now that we've acquired the lock!
- yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, objvs));
+ obj_version.clear();
+ yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, &obj_version, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to fetch sync status, retcode=" << retcode));
return set_cr_error(retcode);
uint64_t instance_id;
instance_id = ceph::util::generate_random_number<uint64_t>();
yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn,
- &sync_status, init_lease, objvs));
+ &sync_status, init_lease, obj_version, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
init_lease->go_down();
RGWCoroutine *set_sync_info_cr() {
return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sc->source_zone)),
- sync_status.sync_info);
+ sync_status.sync_info, &obj_version);
}
void wakeup(int shard_id, bc::flat_set<rgw_data_notify_entry>& entries) {