int shard_id{0};;
map<uint32_t, rgw_data_sync_marker>& markers;
+ std::vector<RGWObjVersionTracker>& objvs;
int handle_result(int r) override {
if (r == -ENOENT) { // ENOENT is not a fatal error
}
public:
RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards,
- map<uint32_t, rgw_data_sync_marker>& markers)
+ map<uint32_t, rgw_data_sync_marker>& markers,
+ std::vector<RGWObjVersionTracker>& objvs)
: RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS),
- sc(sc), env(sc->env), num_shards(num_shards), markers(markers)
+ sc(sc), env(sc->env), num_shards(num_shards), markers(markers), objvs(objvs)
{}
bool spawn_next() override;
};
using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj,
rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
- &markers[shard_id]),
+ &markers[shard_id], true, &objvs[shard_id]),
false);
shard_id++;
return true;
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_data_sync_status *sync_status;
+ std::vector<RGWObjVersionTracker>& objvs;
public:
RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc,
- rgw_data_sync_status *_status)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status)
+ rgw_data_sync_status *_status, std::vector<RGWObjVersionTracker>& objvs)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env),
+ sync_status(_status), objvs(objvs)
{}
int operate(const DoutPrefixProvider *dpp) override;
};
return set_cr_error(retcode);
}
// read shard markers
+ objvs.resize(sync_status->sync_info.num_shards);
using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards,
- sync_status->sync_markers));
+ sync_status->sync_markers, objvs));
if (retcode < 0) {
ldpp_dout(dpp, 4) << "failed to read sync status markers with "
<< cpp_strerror(retcode) << dendl;
string lock_name;
string cookie;
rgw_data_sync_status *status;
+ std::vector<RGWObjVersionTracker>& objvs;
map<int, RGWDataChangesLogInfo> shards_info;
RGWSyncTraceNodeRef tn;
RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards,
uint64_t instance_id,
RGWSyncTraceNodeRef& _tn_parent,
- rgw_data_sync_status *status)
+ rgw_data_sync_status *status,
+ std::vector<RGWObjVersionTracker>& objvs)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver),
pool(sync_env->svc->zone->get_zone_params().log_pool),
- num_shards(num_shards), status(status),
+ num_shards(num_shards), status(status), objvs(objvs),
tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) {
lock_name = "sync_lock";
yield;
}
yield {
+ objvs.resize(num_shards);
for (uint32_t i = 0; i < num_shards; i++) {
RGWDataChangesLogInfo& info = shards_info[i];
auto& marker = status->sync_markers[i];
marker.next_step_marker = info.marker;
marker.timestamp = info.last_update;
const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i);
+ auto& objv = objvs[i];
+ objv.generate_new_write_ver(cct);
using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj,
- rgw_raw_obj{pool, oid}, marker), true);
+ rgw_raw_obj{pool, oid}, marker, &objv), true);
}
}
while (collect(&ret, NULL)) {
int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status)
{
// cannot run concurrently with run_sync(), so run in a separate manager
+ std::vector<RGWObjVersionTracker> objvs;
RGWCoroutinesManager crs(cct, cr_registry);
RGWHTTPManager http_manager(cct, crs.get_completion_mgr());
int ret = http_manager.start();
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
- ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status));
+ ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status, objvs));
http_manager.stop();
return ret;
}
int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards)
{
rgw_data_sync_status sync_status;
+ std::vector<RGWObjVersionTracker> objvs;
sync_status.sync_info.num_shards = num_shards;
RGWCoroutinesManager crs(cct, cr_registry);
auto instance_id = ceph::util::generate_random_number<uint64_t>();
RGWDataSyncCtx sc_local = sc;
sc_local.env = &sync_env_local;
- ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status));
+ ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs));
http_manager.stop();
return ret;
}
rgw::sal::RadosStore* driver = sync_env->driver;
rgw_data_sync_status *sync_status;
+ std::vector<RGWObjVersionTracker>& objvs;
int req_ret = 0;
int ret = 0;
public:
RGWListBucketIndexesCR(RGWDataSyncCtx* sc,
- rgw_data_sync_status* sync_status)
- : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {}
+ rgw_data_sync_status* sync_status, std::vector<RGWObjVersionTracker>& objvs)
+ : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status), objvs(objvs) {}
~RGWListBucketIndexesCR() override { }
int operate(const DoutPrefixProvider *dpp) override {
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool,
RGWDataSyncStatusManager::shard_obj_name(
sc->source_zone, shard_id)),
- marker),
+ marker, &objvs[shard_id]),
true);
}
} else {
string marker_oid;
rgw_data_sync_marker sync_marker;
RGWSyncTraceNodeRef tn;
+ RGWObjVersionTracker& objv;
public:
RGWDataSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
const rgw_data_sync_marker& _marker,
- RGWSyncTraceNodeRef& _tn) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
+ RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
sc(_sc), sync_env(_sc->env),
marker_oid(_marker_oid),
sync_marker(_marker),
- tn(_tn) {}
+ tn(_tn), objv(objv) {}
RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.marker = new_marker;
return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
- sync_marker);
+ sync_marker, &objv);
}
RGWOrderCallCR *allocate_order_control_cr() override {
const rgw_raw_obj& error_repo;
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr;
const rgw_data_sync_status& sync_status;
+ RGWObjVersionTracker& objv;
boost::intrusive_ptr<rgw::bucket_sync::Cache> bucket_shard_cache;
std::optional<RGWDataSyncShardMarkerTrack> marker_tracker;
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
const rgw_data_sync_status& sync_status,
+ RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
: RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
sync_marker(sync_marker), tn(tn), status_oid(status_oid),
error_repo(error_repo), lease_cr(std::move(lease_cr)),
- sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {}
+ sync_status(sync_status), objv(objv),
+ bucket_shard_cache(bucket_shard_cache) {}
};
class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- const rgw_data_sync_status& sync_status,
+ const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
- sync_status, bucket_shard_cache) {}
+ sync_status, objv, bucket_shard_cache) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
tn->log(10, "start full sync");
oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id);
- marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
total_entries = sync_marker.pos;
entry_timestamp = sync_marker.timestamp; // time when full sync started
do {
sync_marker.next_step_marker.clear();
yield call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(
sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj,
- rgw_raw_obj(pool, status_oid), sync_marker));
+ rgw_raw_obj(pool, status_oid), sync_marker, &objv));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode));
return set_cr_error(retcode);
rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn,
const string& status_oid, const rgw_raw_obj& error_repo,
boost::intrusive_ptr<const RGWContinuousLeaseCR> lease_cr,
- const rgw_data_sync_status& sync_status,
+ const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv,
const boost::intrusive_ptr<rgw::bucket_sync::Cache>& bucket_shard_cache,
ceph::mutex& inc_lock,
bc::flat_set<rgw_data_notify_entry>& modified_shards)
: RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn,
status_oid, error_repo, std::move(lease_cr),
- sync_status, bucket_shard_cache),
+ sync_status, objv, bucket_shard_cache),
inc_lock(inc_lock), modified_shards(modified_shards) {}
int operate(const DoutPrefixProvider *dpp) override {
reenter(this) {
tn->log(10, "start incremental sync");
- marker_tracker.emplace(sc, status_oid, sync_marker, tn);
+ marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv);
do {
if (!lease_cr->is_locked()) {
drain_all();
rgw_data_sync_marker& sync_marker;
rgw_data_sync_status sync_status;
const RGWSyncTraceNodeRef tn;
+ RGWObjVersionTracker& objv;
bool *reset_backoff;
ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock");
RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool,
const uint32_t shard_id, rgw_data_sync_marker& marker,
const rgw_data_sync_status& sync_status,
- RGWSyncTraceNodeRef& tn, bool *reset_backoff)
+ RGWSyncTraceNodeRef& tn, RGWObjVersionTracker& objv, bool *reset_backoff)
: RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id),
sync_marker(marker), sync_status(sync_status), tn(tn),
- reset_backoff(reset_backoff) {
+ objv(objv), reset_backoff(reset_backoff) {
set_description() << "data sync shard source_zone=" << sc->source_zone
<< " shard_id=" << shard_id;
}
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
- bucket_shard_cache));
+ objv, bucket_shard_cache));
if (retcode < 0) {
if (retcode != -EBUSY) {
tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")"));
sync_marker, tn,
status_oid, error_repo,
lease_cr, sync_status,
- bucket_shard_cache,
+ objv, bucket_shard_cache,
inc_lock, modified_shards));
if (retcode < 0) {
if (retcode != -EBUSY) {
rgw_data_sync_status sync_status;
RGWSyncTraceNodeRef tn;
+ RGWObjVersionTracker& objv;
public:
RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool,
- uint32_t _shard_id, rgw_data_sync_marker& _marker, const rgw_data_sync_status& sync_status,
- RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, false),
- sc(_sc), sync_env(_sc->env),
- pool(_pool),
- shard_id(_shard_id),
- sync_marker(_marker) {
+ uint32_t _shard_id, rgw_data_sync_marker& _marker,
+ const rgw_data_sync_status& sync_status,
+ RGWObjVersionTracker& objv,
+ RGWSyncTraceNodeRef& _tn_parent)
+ : RGWBackoffControlCR(_sc->cct, false),
+ sc(_sc), sync_env(_sc->env),
+ pool(_pool),
+ shard_id(_shard_id),
+ sync_marker(_marker), objv(objv) {
tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id));
}
RGWCoroutine *alloc_cr() override {
- return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, backoff_ptr());
+ return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, objv, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() override {
return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)),
- &sync_marker);
+ &sync_marker, true, &objv);
}
void append_modified_shards(bc::flat_set<rgw_data_notify_entry>& keys) {
uint32_t num_shards;
rgw_data_sync_status sync_status;
+ std::vector<RGWObjVersionTracker> objvs;
ceph::mutex shard_crs_lock =
ceph::make_mutex("RGWDataSyncCR::shard_crs_lock");
RGWSyncTraceNodeRef tn;
RGWDataSyncModule *data_sync_module{nullptr};
+ RGWObjVersionTracker objv;
public:
RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct),
sc(_sc), sync_env(_sc->env),
reenter(this) {
/* read sync status */
- yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status));
+ yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, objvs));
data_sync_module = sync_env->sync_module->get_data_handler();
sync_status.sync_info.num_shards = num_shards;
uint64_t instance_id;
instance_id = ceph::util::generate_random_number<uint64_t>();
- yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status));
+ yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode));
return set_cr_error(retcode);
return set_cr_error(retcode);
}
/* state: building full sync maps */
- yield call(new RGWListBucketIndexesCR(sc, &sync_status));
+ yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs));
if (retcode < 0) {
tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode));
return set_cr_error(retcode);
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool,
- iter->first, iter->second, sync_status, tn);
+ iter->first, iter->second, sync_status, objvs[iter->first], tn);
cr->get();
shard_crs_lock.lock();
shard_crs[iter->first] = cr;