rgw_bucket_sync_pipe sync_pipe;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
+ ceph::real_time* progress;
const std::string status_oid;
RGWSyncTraceNodeRef tn;
public:
- RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent)
- : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair),
+ RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair,
+ const RGWSyncTraceNodeRef& _tn_parent,
+ ceph::real_time* progress)
+ : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ sync_pair(_sync_pair), progress(progress),
status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)),
tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket",
SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) {
boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
RGWSyncTraceNodeRef tn;
- std::vector<RGWDataSyncCtx> scs;
- RGWDataSyncCtx *cur_sc{nullptr};
+ ceph::real_time* progress;
+ std::vector<ceph::real_time> shard_progress;
+ std::vector<ceph::real_time>::iterator cur_shard_progress;
RGWRESTConn *conn{nullptr};
rgw_zone_id last_zone;
RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
- const RGWSyncTraceNodeRef& _tn_parent);
+ const RGWSyncTraceNodeRef& _tn_parent,
+ ceph::real_time* progress);
~RGWRunBucketSourcesSyncCR() override {
if (lease_cr) {
lease_cr->abort();
boost::intrusive_ptr<RGWOmapAppend> error_repo;
RGWSyncTraceNodeRef tn;
+ ceph::real_time progress;
int sync_status = 0;
public:
RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs,
yield call(new RGWRunBucketSourcesSyncCR(sc,
std::nullopt, /* target_bs */
source_bs,
- tn));
+ tn, &progress));
+ if (retcode == 0) {
+ tn->log(20, SSTR("RunBucketSources progress=" << progress));
+ }
} while (marker_tracker && marker_tracker->need_retry(obligation.key));
sync_status = retcode;
}
};
+// write the incremental sync status and update 'stable_timestamp' on success
+class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+ rgw_raw_obj obj;
+ rgw_bucket_shard_inc_sync_marker sync_marker;
+ ceph::real_time* stable_timestamp;
+ RGWObjVersionTracker* objv_tracker;
+ std::map<std::string, bufferlist> attrs;
+ public:
+ RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env,
+ const rgw_raw_obj& obj,
+ const rgw_bucket_shard_inc_sync_marker& sync_marker,
+ ceph::real_time* stable_timestamp,
+ RGWObjVersionTracker* objv_tracker)
+ : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj),
+ sync_marker(sync_marker), stable_timestamp(stable_timestamp),
+ objv_tracker(objv_tracker)
+ {}
+ int operate() {
+ reenter(this) {
+ sync_marker.encode_attr(attrs);
+
+ yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj,
+ obj, attrs, objv_tracker));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ if (stable_timestamp) {
+ *stable_timestamp = sync_marker.timestamp;
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
- string marker_oid;
+ rgw_raw_obj obj;
rgw_bucket_shard_inc_sync_marker sync_marker;
map<rgw_obj_key, string> key_to_marker;
std::set<std::string> pending_olh; // object names with pending olh operations
RGWSyncTraceNodeRef tn;
+ ceph::real_time* stable_timestamp;
void handle_finish(const string& marker) override {
auto iter = marker_to_op.find(marker);
public:
RGWBucketIncSyncShardMarkerTrack(RGWDataSyncCtx *_sc,
const string& _marker_oid,
- const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
- sc(_sc), sync_env(_sc->env),
- marker_oid(_marker_oid),
- sync_marker(_marker) {}
-
+ const rgw_bucket_shard_inc_sync_marker& _marker,
+ ceph::real_time* stable_timestamp)
+ : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
+ sc(_sc), sync_env(_sc->env),
+ obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid),
+ sync_marker(_marker), stable_timestamp(stable_timestamp)
+ {}
void set_tn(RGWSyncTraceNodeRef& _tn) {
tn = _tn;
}
sync_marker.position = new_marker;
sync_marker.timestamp = timestamp;
- map<string, bufferlist> attrs;
- sync_marker.encode_attr(attrs);
-
- tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " timestamp=" << timestamp));
- return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
- sync_env->svc->sysobj,
- rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid),
- attrs);
+ tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp));
+ return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker,
+ stable_timestamp, nullptr);
}
/*
const std::string& status_oid,
RGWContinuousLeaseCR *lease_cr,
rgw_bucket_shard_sync_info& sync_info,
- RGWSyncTraceNodeRef& _tn_parent)
+ RGWSyncTraceNodeRef& _tn_parent,
+ ceph::real_time* stable_timestamp)
: RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs),
lease_cr(lease_cr), sync_info(sync_info),
- marker_tracker(sc, status_oid, sync_info.inc_marker),
+ marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp),
status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id),
tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync",
SSTR(bucket_shard_str{bs})))
RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
std::optional<rgw_bucket_shard> _target_bs,
std::optional<rgw_bucket_shard> _source_bs,
- const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct),
- sc(_sc),
- sync_env(_sc->env),
- target_bs(_target_bs),
- source_bs(_source_bs),
- tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
- SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone)))
+ const RGWSyncTraceNodeRef& _tn_parent,
+ ceph::real_time* progress)
+ : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env),
+ target_bs(_target_bs), source_bs(_source_bs),
+ tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources",
+ SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))),
+ progress(progress)
{
if (target_bs) {
target_bucket = target_bs->bucket;
if (pipes.empty()) {
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl;
+ return set_cr_done();
}
for (siter = pipes.begin(); siter != pipes.end(); ++siter) {
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl;
- for (; num_shards > 0; --num_shards, ++cur_shard) {
+ shard_progress.resize(num_shards);
+ cur_shard_progress = shard_progress.begin();
+ for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) {
/*
* use a negatvie shard_id for backward compatibility,
* this affects the crafted status oid
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl;
- yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false);
+ yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn,
+ &*cur_shard_progress), false);
while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
}
}
}
+ if (progress) {
+ *progress = *std::min_element(shard_progress.begin(), shard_progress.end());
+ }
return set_cr_done();
}
return set_cr_error(retcode);
}
}
+ if (progress) {
+ *progress = sync_status.inc_marker.timestamp;
+ }
if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe,
if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe,
status_oid, lease_cr.get(),
- sync_status, tn));
+ sync_status, tn,
+ progress));
if (retcode < 0) {
tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode));
lease_cr->go_down();
return nullptr;
}
- return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node);
+ return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr);
}
int RGWBucketPipeSyncStatusManager::init()