rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
+ RGWObjVersionTracker objv_tracker;
ceph::real_time* progress;
const std::string status_oid;
const string sync_status_oid;
rgw_bucket_shard_sync_info& status;
-
+ RGWObjVersionTracker& objv_tracker;
rgw_bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& _sync_pair,
- rgw_bucket_shard_sync_info& _status)
+ rgw_bucket_shard_sync_info& _status,
+ RGWObjVersionTracker& objv_tracker)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pair(_sync_pair),
sync_status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, _sync_pair)),
- status(_status)
+ status(_status), objv_tracker(objv_tracker)
{}
int operate() override {
if (write_status) {
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs));
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, obj, attrs, &objv_tracker));
} else {
- call(new RGWRadosRemoveCR(store, obj));
+ call(new RGWRadosRemoveCR(store, obj, &objv_tracker));
}
}
if (info.syncstopped) {
sc.init(sync_env, conn, source_zone);
}
-RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num)
+RGWCoroutine *RGWRemoteBucketManager::init_sync_status_cr(int num, RGWObjVersionTracker& objv_tracker)
{
if ((size_t)num >= sync_pairs.size()) {
return nullptr;
}
- return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status);
+ return new RGWInitBucketShardSyncStatusCoroutine(&sc, sync_pairs[num], init_status, objv_tracker);
}
#define BUCKET_SYNC_ATTR_PREFIX RGW_ATTR_PREFIX "bucket-sync."
RGWDataSyncEnv *sync_env;
string oid;
rgw_bucket_shard_sync_info *status;
-
+ RGWObjVersionTracker* objv_tracker;
map<string, bufferlist> attrs;
public:
RGWReadBucketPipeSyncStatusCoroutine(RGWDataSyncCtx *_sc,
const rgw_bucket_sync_pair_info& sync_pair,
- rgw_bucket_shard_sync_info *_status)
+ rgw_bucket_shard_sync_info *_status,
+ RGWObjVersionTracker* objv_tracker)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
- status(_status) {}
+ status(_status), objv_tracker(objv_tracker)
+ {}
int operate() override;
};
reenter(this) {
yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, oid),
- &attrs, true));
+ &attrs, true, objv_tracker));
if (retcode == -ENOENT) {
*status = rgw_bucket_shard_sync_info();
return set_cr_done();
return nullptr;
}
- return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status);
+ return new RGWReadBucketPipeSyncStatusCoroutine(&sc, sync_pairs[num], sync_status, nullptr);
}
RGWBucketPipeSyncStatusManager::RGWBucketPipeSyncStatusManager(rgw::sal::RGWRadosStore *_store,
string marker_oid;
rgw_bucket_shard_full_sync_marker sync_marker;
-
RGWSyncTraceNodeRef tn;
+ RGWObjVersionTracker& objv_tracker;
public:
RGWBucketFullSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
- const string& _marker_oid,
- const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
- sc(_sc), sync_env(_sc->env),
- marker_oid(_marker_oid),
- sync_marker(_marker) {}
-
- void set_tn(RGWSyncTraceNodeRef& _tn) {
- tn = _tn;
- }
+ const string& _marker_oid,
+ const rgw_bucket_shard_full_sync_marker& _marker,
+ RGWSyncTraceNodeRef tn,
+ RGWObjVersionTracker& objv_tracker)
+ : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
+ sc(_sc), sync_env(_sc->env), marker_oid(_marker_oid),
+ sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker)
+ {}
RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.position = new_marker;
tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
- attrs);
+ attrs, &objv_tracker);
}
RGWOrderCallCR *allocate_order_control_cr() override {
rgw_raw_obj obj;
rgw_bucket_shard_inc_sync_marker sync_marker;
ceph::real_time* stable_timestamp;
- RGWObjVersionTracker* objv_tracker;
+ RGWObjVersionTracker& objv_tracker;
std::map<std::string, bufferlist> attrs;
public:
RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
const rgw_raw_obj& obj,
const rgw_bucket_shard_inc_sync_marker& sync_marker,
ceph::real_time* stable_timestamp,
- RGWObjVersionTracker* objv_tracker)
+ RGWObjVersionTracker& objv_tracker)
: RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
sync_marker(sync_marker), stable_timestamp(stable_timestamp),
objv_tracker(objv_tracker)
sync_marker.encode_attr(attrs);
yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
- obj, attrs, objv_tracker));
+ obj, attrs, &objv_tracker));
if (retcode < 0) {
return set_cr_error(retcode);
}
std::set<std::string> pending_olh; // object names with pending olh operations
RGWSyncTraceNodeRef tn;
+ RGWObjVersionTracker& objv_tracker;
ceph::real_time* stable_timestamp;
void handle_finish(const string& marker) override {
RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
const rgw_bucket_shard_inc_sync_marker& _marker,
+ RGWSyncTraceNodeRef tn,
+ RGWObjVersionTracker& objv_tracker,
ceph::real_time* stable_timestamp)
: RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
sc(_sc), sync_env(_sc->env),
obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
- sync_marker(_marker), stable_timestamp(stable_timestamp)
+ sync_marker(_marker), tn(std::move(tn)), objv_tracker(objv_tracker),
+ stable_timestamp(stable_timestamp)
{}
- void set_tn(RGWSyncTraceNodeRef& _tn) {
- tn = _tn;
- }
RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
sync_marker.position = new_marker;
tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
- stable_timestamp, nullptr);
+ stable_timestamp, objv_tracker);
}
/*
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
rgw_bucket_shard_sync_info& sync_info;
- RGWBucketFullSyncShardMarkerTrack marker_tracker;
rgw_obj_key list_marker;
bucket_list_entry *entry{nullptr};
rgw_zone_set zones_trace;
RGWSyncTraceNodeRef tn;
+ RGWBucketFullSyncShardMarkerTrack marker_tracker;
struct _prefix_handler {
RGWBucketSyncFlowManager::pipe_rules_ref rules;
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
rgw_bucket_shard_sync_info& sync_info,
- RGWSyncTraceNodeRef tn_parent)
+ RGWSyncTraceNodeRef tn_parent,
+ RGWObjVersionTracker& objv_tracker)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
- marker_tracker(sc, status_oid, sync_info.full_marker),
status_oid(status_oid),
tn(sync_env->sync_tracer->add_node(tn_parent, "full_sync",
- SSTR(bucket_shard_str{bs}))) {
+ SSTR(bucket_shard_str{bs}))),
+ marker_tracker(sc, status_oid, sync_info.full_marker, tn, objv_tracker)
+ {
zones_trace.insert(sc->source_zone.id, sync_pipe.info.dest_bs.bucket.get_key());
- marker_tracker.set_tn(tn);
prefix_handler.set_rules(sync_pipe.get_rules());
}
rgw_bucket_shard_sync_info& sync_info;
rgw_obj_key key;
rgw_bi_log_entry *entry{nullptr};
- RGWBucketIncSyncShardMarkerTrack marker_tracker;
bool updated_status{false};
- const string& status_oid;
rgw_zone_id zone_id;
string target_location_key;
bool syncstopped{false};
RGWSyncTraceNodeRef tn;
+ RGWBucketIncSyncShardMarkerTrack marker_tracker;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
RGWContinuousLeaseCR *lease_cr,
rgw_bucket_shard_sync_info& sync_info,
RGWSyncTraceNodeRef& _tn_parent,
+ RGWObjVersionTracker& objv_tracker,
ceph::real_time* stable_timestamp)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
- marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp),
- status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
+ zone_id(sync_env->svc->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
- SSTR(bucket_shard_str{bs})))
+ SSTR(bucket_shard_str{bs}))),
+ marker_tracker(sc, status_oid, sync_info.inc_marker, tn,
+ objv_tracker, stable_timestamp)
{
set_description() << "bucket shard incremental sync bucket="
<< bucket_shard_str{bs};
set_status("init");
- marker_tracker.set_tn(tn);
rules = sync_pipe.get_rules();
target_location_key = sync_pipe.info.dest_bs.bucket.get_key();
}
}
tn->log(10, "took lease");
- yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status));
+ yield call(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &sync_status, &objv_tracker));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
lease_cr->go_down();
do {
if (sync_status.state == rgw_bucket_shard_sync_info::StateInit ||
sync_status.state == rgw_bucket_shard_sync_info::StateStopped) {
- yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status));
+ yield call(new RGWInitBucketShardSyncStatusCoroutine(sc, sync_pair, sync_status, objv_tracker));
if (retcode == -ENOENT) {
tn->log(0, "bucket sync disabled");
lease_cr->abort(); // deleted lease object, abort/wakeup instead of unlock
if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
status_oid, lease_cr.get(),
- sync_status, tn));
+ sync_status, tn, objv_tracker));
if (retcode < 0) {
tn->log(5, SSTR("full sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
status_oid, lease_cr.get(),
sync_status, tn,
- progress));
+ objv_tracker, progress));
if (retcode < 0) {
tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
int RGWBucketPipeSyncStatusManager::init_sync_status()
{
list<RGWCoroutinesStack *> stacks;
+ // pass an empty objv tracker to each so that the version gets incremented
+ std::list<RGWObjVersionTracker> objvs;
for (auto& mgr : source_mgrs) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
for (int i = 0; i < mgr->num_pipes(); ++i) {
- stack->call(mgr->init_sync_status_cr(i));
+ objvs.emplace_back();
+ stack->call(mgr->init_sync_status_cr(i, objvs.back()));
}
stacks.push_back(stack);
}
sync_pair.source_bs = source_bs;
sync_pair.dest_bs = dest_bs;
- spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i), false);
+ spawn(new RGWReadBucketPipeSyncStatusCoroutine(sc, sync_pair, &*i, nullptr), false);
++i;
++source_bs.shard_id;
if (shard_to_shard_sync) {