From: Casey Bodley Date: Tue, 31 Mar 2020 13:23:07 +0000 (-0400) Subject: rgw: track last timestamp written for bucket sync X-Git-Tag: v16.1.0~2586^2~14 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=dec531ad887de0d9cc470405fc5c4da4fcb97e11;p=ceph.git rgw: track last timestamp written for bucket sync bucket sync remembers the latest timestamp that it successfully wrote to the bucket sync status. data sync can use this to make future decisions without having to reread its sync status Signed-off-by: Casey Bodley --- diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4d6200a00072..4b485599dcc9 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1025,6 +1025,7 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { rgw_bucket_sync_pipe sync_pipe; rgw_bucket_shard_sync_info sync_status; RGWMetaSyncEnv meta_sync_env; + ceph::real_time* progress; const std::string status_oid; @@ -1034,8 +1035,11 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWSyncTraceNodeRef tn; public: - RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, const RGWSyncTraceNodeRef& _tn_parent) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pair(_sync_pair), + RGWRunBucketSyncCoroutine(RGWDataSyncCtx *_sc, const rgw_bucket_sync_pair_info& _sync_pair, + const RGWSyncTraceNodeRef& _tn_parent, + ceph::real_time* progress) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), + sync_pair(_sync_pair), progress(progress), status_oid(RGWBucketPipeSyncStatusManager::status_oid(sc->source_zone, sync_pair)), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket", SSTR(bucket_shard_str{_sync_pair.dest_bs} << "<-" << bucket_shard_str{_sync_pair.source_bs} ))) { @@ -1250,8 +1254,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { boost::intrusive_ptr lease_stack; RGWSyncTraceNodeRef tn; - std::vector scs; - RGWDataSyncCtx *cur_sc{nullptr}; + ceph::real_time* progress; + std::vector shard_progress; + std::vector::iterator cur_shard_progress; RGWRESTConn *conn{nullptr}; rgw_zone_id last_zone; @@ -1269,7 +1274,8 @@ public: RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, std::optional _target_bs, std::optional _source_bs, - const RGWSyncTraceNodeRef& _tn_parent); + const RGWSyncTraceNodeRef& _tn_parent, + ceph::real_time* progress); ~RGWRunBucketSourcesSyncCR() override { if (lease_cr) { lease_cr->abort(); @@ -1288,6 +1294,7 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine { boost::intrusive_ptr error_repo; RGWSyncTraceNodeRef tn; + ceph::real_time progress; int sync_status = 0; public: RGWDataSyncSingleEntryCR(RGWDataSyncCtx *_sc, const rgw_bucket_shard& source_bs, @@ -1313,7 +1320,10 @@ public: yield call(new RGWRunBucketSourcesSyncCR(sc, std::nullopt, /* target_bs */ source_bs, - tn)); + tn, &progress)); + if (retcode == 0) { + tn->log(20, SSTR("RunBucketSources progress=" << progress)); + } } while (marker_tracker && marker_tracker->need_retry(obligation.key)); sync_status = retcode; @@ -3384,11 +3394,47 @@ public: } }; +// write the incremental sync status and update 'stable_timestamp' on success +class RGWWriteBucketShardIncSyncStatus : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + rgw_raw_obj obj; + rgw_bucket_shard_inc_sync_marker sync_marker; + ceph::real_time* stable_timestamp; + RGWObjVersionTracker* objv_tracker; + std::map attrs; + public: + RGWWriteBucketShardIncSyncStatus(RGWDataSyncEnv *sync_env, + const rgw_raw_obj& obj, + const rgw_bucket_shard_inc_sync_marker& sync_marker, + ceph::real_time* stable_timestamp, + RGWObjVersionTracker* objv_tracker) + : RGWCoroutine(sync_env->cct), sync_env(sync_env), obj(obj), + sync_marker(sync_marker), stable_timestamp(stable_timestamp), + objv_tracker(objv_tracker) + {} + int operate() { + reenter(this) { + sync_marker.encode_attr(attrs); + + yield call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, sync_env->svc->sysobj, + obj, attrs, objv_tracker)); + if (retcode < 0) { + return set_cr_error(retcode); + } + if (stable_timestamp) { + *stable_timestamp = sync_marker.timestamp; + } + return set_cr_done(); + } + return 0; + } +}; + class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; - string marker_oid; + rgw_raw_obj obj; rgw_bucket_shard_inc_sync_marker sync_marker; map key_to_marker; @@ -3401,6 +3447,7 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack pending_olh; // object names with pending olh operations RGWSyncTraceNodeRef tn; + ceph::real_time* stable_timestamp; void handle_finish(const string& marker) override { auto iter = marker_to_op.find(marker); @@ -3419,11 +3466,13 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), - marker_oid(_marker_oid), - sync_marker(_marker) {} - + const rgw_bucket_shard_inc_sync_marker& _marker, + ceph::real_time* stable_timestamp) + : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW), + sc(_sc), sync_env(_sc->env), + obj(sync_env->svc->zone->get_zone_params().log_pool, _marker_oid), + sync_marker(_marker), stable_timestamp(stable_timestamp) + {} void set_tn(RGWSyncTraceNodeRef& _tn) { tn = _tn; } @@ -3432,14 +3481,9 @@ public: sync_marker.position = new_marker; sync_marker.timestamp = timestamp; - map attrs; - sync_marker.encode_attr(attrs); - - tn->log(20, SSTR("updating marker marker_oid=" << marker_oid << " marker=" << new_marker << " timestamp=" << timestamp)); - return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, - sync_env->svc->sysobj, - rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), - attrs); + tn->log(20, SSTR("updating marker marker_oid=" << obj.oid << " marker=" << new_marker << " timestamp=" << timestamp)); + return new RGWWriteBucketShardIncSyncStatus(sync_env, obj, sync_marker, + stable_timestamp, nullptr); } /* @@ -3881,11 +3925,12 @@ public: const std::string& status_oid, RGWContinuousLeaseCR *lease_cr, rgw_bucket_shard_sync_info& sync_info, - RGWSyncTraceNodeRef& _tn_parent) + RGWSyncTraceNodeRef& _tn_parent, + ceph::real_time* stable_timestamp) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), sync_pipe(_sync_pipe), bs(_sync_pipe.info.source_bs), lease_cr(lease_cr), sync_info(sync_info), - marker_tracker(sc, status_oid, sync_info.inc_marker), + marker_tracker(sc, status_oid, sync_info.inc_marker, stable_timestamp), status_oid(status_oid), zone_id(sync_env->svc->zone->get_zone().id), tn(sync_env->sync_tracer->add_node(_tn_parent, "inc_sync", SSTR(bucket_shard_str{bs}))) @@ -4288,13 +4333,13 @@ std::ostream& operator<<(std::ostream& out, std::optional& bs) RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc, std::optional _target_bs, std::optional _source_bs, - const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sc->env->cct), - sc(_sc), - sync_env(_sc->env), - target_bs(_target_bs), - source_bs(_source_bs), - tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", - SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))) + const RGWSyncTraceNodeRef& _tn_parent, + ceph::real_time* progress) + : RGWCoroutine(_sc->env->cct), sc(_sc), sync_env(_sc->env), + target_bs(_target_bs), source_bs(_source_bs), + tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", + SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << sc->source_zone))), + progress(progress) { if (target_bs) { target_bucket = target_bs->bucket; @@ -4317,6 +4362,7 @@ int RGWRunBucketSourcesSyncCR::operate() if (pipes.empty()) { ldpp_dout(sync_env->dpp, 20) << __func__ << "(): no relevant sync pipes found" << dendl; + return set_cr_done(); } for (siter = pipes.begin(); siter != pipes.end(); ++siter) { @@ -4345,7 +4391,9 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): num shards=" << num_shards << " cur_shard=" << cur_shard << dendl; - for (; num_shards > 0; --num_shards, ++cur_shard) { + shard_progress.resize(num_shards); + cur_shard_progress = shard_progress.begin(); + for (; num_shards > 0; --num_shards, ++cur_shard, ++cur_shard_progress) { /* * use a negatvie shard_id for backward compatibility, * this affects the crafted status oid @@ -4359,7 +4407,8 @@ int RGWRunBucketSourcesSyncCR::operate() ldpp_dout(sync_env->dpp, 20) << __func__ << "(): sync_pair=" << sync_pair << dendl; - yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn), false); + yield spawn(new RGWRunBucketSyncCoroutine(sc, sync_pair, tn, + &*cur_shard_progress), false); while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { set_status() << "num_spawned() > spawn_window"; yield wait_for_child(); @@ -4388,6 +4437,9 @@ int RGWRunBucketSourcesSyncCR::operate() } } } + if (progress) { + *progress = *std::min_element(shard_progress.begin(), shard_progress.end()); + } return set_cr_done(); } @@ -4752,6 +4804,9 @@ int RGWRunBucketSyncCoroutine::operate() return set_cr_error(retcode); } } + if (progress) { + *progress = sync_status.inc_marker.timestamp; + } if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { yield call(new RGWBucketShardFullSyncCR(sc, sync_pipe, @@ -4768,7 +4823,8 @@ int RGWRunBucketSyncCoroutine::operate() if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { yield call(new RGWBucketShardIncrementalSyncCR(sc, sync_pipe, status_oid, lease_cr.get(), - sync_status, tn)); + sync_status, tn, + progress)); if (retcode < 0) { tn->log(5, SSTR("incremental sync on bucket failed, retcode=" << retcode)); lease_cr->go_down(); @@ -4793,7 +4849,7 @@ RGWCoroutine *RGWRemoteBucketManager::run_sync_cr(int num) return nullptr; } - return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node); + return new RGWRunBucketSyncCoroutine(&sc, sync_pairs[num], sync_env->sync_tracer->root_node, nullptr); } int RGWBucketPipeSyncStatusManager::init()