From: Shilpa Jagannath Date: Thu, 18 Aug 2022 16:55:24 +0000 (-0400) Subject: rgw/multisite: add cls versioning for tracking data sync per shard object and store... X-Git-Tag: v18.1.0~499^2~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9aeb2aa38dba3edd7b96caca024956676b68dc73;p=ceph.git rgw/multisite: add cls versioning for tracking data sync per shard object and store it in a vector Signed-off-by: Shilpa Jagannath --- diff --git a/src/rgw/driver/rados/rgw_data_sync.cc b/src/rgw/driver/rados/rgw_data_sync.cc index 65f4d6bb6bf..cbbcf2cb3dd 100644 --- a/src/rgw/driver/rados/rgw_data_sync.cc +++ b/src/rgw/driver/rados/rgw_data_sync.cc @@ -90,6 +90,7 @@ class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { int shard_id{0};; map& markers; + std::vector& objvs; int handle_result(int r) override { if (r == -ENOENT) { // ENOENT is not a fatal error @@ -103,9 +104,10 @@ class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { } public: RGWReadDataSyncStatusMarkersCR(RGWDataSyncCtx *sc, int num_shards, - map& markers) + map& markers, + std::vector& objvs) : RGWShardCollectCR(sc->cct, MAX_CONCURRENT_SHARDS), - sc(sc), env(sc->env), num_shards(num_shards), markers(markers) + sc(sc), env(sc->env), num_shards(num_shards), markers(markers), objvs(objvs) {} bool spawn_next() override; }; @@ -118,7 +120,7 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() using CR = RGWSimpleRadosReadCR; spawn(new CR(env->dpp, env->async_rados, env->svc->sysobj, rgw_raw_obj(env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), - &markers[shard_id]), + &markers[shard_id], true, &objvs[shard_id]), false); shard_id++; return true; @@ -175,11 +177,13 @@ class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; + std::vector& objvs; public: RGWReadDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, - rgw_data_sync_status *_status) - : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), sync_status(_status) + rgw_data_sync_status *_status, std::vector& objvs) + : RGWCoroutine(_sc->cct), sc(_sc), sync_env(sc->env), + sync_status(_status), objvs(objvs) {} int operate(const DoutPrefixProvider *dpp) override; }; @@ -201,9 +205,10 @@ int RGWReadDataSyncStatusCoroutine::operate(const DoutPrefixProvider *dpp) return set_cr_error(retcode); } // read shard markers + objvs.resize(sync_status->sync_info.num_shards); using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR; yield call(new ReadMarkersCR(sc, sync_status->sync_info.num_shards, - sync_status->sync_markers)); + sync_status->sync_markers, objvs)); if (retcode < 0) { ldpp_dout(dpp, 4) << "failed to read sync status markers with " << cpp_strerror(retcode) << dendl; @@ -528,6 +533,7 @@ class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { string lock_name; string cookie; rgw_data_sync_status *status; + std::vector& objvs; map shards_info; RGWSyncTraceNodeRef tn; @@ -535,10 +541,11 @@ public: RGWInitDataSyncStatusCoroutine(RGWDataSyncCtx *_sc, uint32_t num_shards, uint64_t instance_id, RGWSyncTraceNodeRef& _tn_parent, - rgw_data_sync_status *status) + rgw_data_sync_status *status, + std::vector& objvs) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), driver(sync_env->driver), pool(sync_env->svc->zone->get_zone_params().log_pool), - num_shards(num_shards), status(status), + num_shards(num_shards), status(status), objvs(objvs), tn(sync_env->sync_tracer->add_node(_tn_parent, "init_data_sync_status")) { lock_name = "sync_lock"; @@ -604,15 +611,18 @@ public: yield; } yield { + objvs.resize(num_shards); for (uint32_t i = 0; i < num_shards; i++) { RGWDataChangesLogInfo& info = shards_info[i]; auto& marker = status->sync_markers[i]; marker.next_step_marker = info.marker; marker.timestamp = info.last_update; const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, i); + auto& objv = objvs[i]; + objv.generate_new_write_ver(cct); using WriteMarkerCR = RGWSimpleRadosWriteCR; spawn(new WriteMarkerCR(dpp, sync_env->async_rados, sync_env->svc->sysobj, - rgw_raw_obj{pool, oid}, marker), true); + rgw_raw_obj{pool, oid}, marker, &objv), true); } } while (collect(&ret, NULL)) { @@ -718,6 +728,7 @@ void RGWRemoteDataLog::finish() int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_sync_status *sync_status) { // cannot run concurrently with run_sync(), so run in a separate manager + std::vector objvs; RGWCoroutinesManager crs(cct, cr_registry); RGWHTTPManager http_manager(cct, crs.get_completion_mgr()); int ret = http_manager.start(); @@ -731,7 +742,7 @@ int RGWRemoteDataLog::read_sync_status(const DoutPrefixProvider *dpp, rgw_data_s RGWDataSyncCtx sc_local = sc; sc_local.env = &sync_env_local; - ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status)); + ret = crs.run(dpp, new RGWReadDataSyncStatusCoroutine(&sc_local, sync_status, objvs)); http_manager.stop(); return ret; } @@ -773,6 +784,7 @@ int RGWRemoteDataLog::read_recovering_shards(const DoutPrefixProvider *dpp, cons int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_shards) { rgw_data_sync_status sync_status; + std::vector objvs; sync_status.sync_info.num_shards = num_shards; RGWCoroutinesManager crs(cct, cr_registry); @@ -787,7 +799,7 @@ int RGWRemoteDataLog::init_sync_status(const DoutPrefixProvider *dpp, int num_sh auto instance_id = ceph::util::generate_random_number(); RGWDataSyncCtx sc_local = sc; sc_local.env = &sync_env_local; - ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status)); + ret = crs.run(dpp, new RGWInitDataSyncStatusCoroutine(&sc_local, num_shards, instance_id, tn, &sync_status, objvs)); http_manager.stop(); return ret; } @@ -874,6 +886,7 @@ class RGWListBucketIndexesCR : public RGWCoroutine { rgw::sal::RadosStore* driver = sync_env->driver; rgw_data_sync_status *sync_status; + std::vector& objvs; int req_ret = 0; int ret = 0; @@ -894,8 +907,8 @@ class RGWListBucketIndexesCR : public RGWCoroutine { public: RGWListBucketIndexesCR(RGWDataSyncCtx* sc, - rgw_data_sync_status* sync_status) - : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status) {} + rgw_data_sync_status* sync_status, std::vector& objvs) + : RGWCoroutine(sc->cct), sc(sc), sync_status(sync_status), objvs(objvs) {} ~RGWListBucketIndexesCR() override { } int operate(const DoutPrefixProvider *dpp) override { @@ -975,7 +988,7 @@ public: rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name( sc->source_zone, shard_id)), - marker), + marker, &objvs[shard_id]), true); } } else { @@ -1011,16 +1024,17 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrackenv), marker_oid(_marker_oid), sync_marker(_marker), - tn(_tn) {} + tn(_tn), objv(objv) {} RGWCoroutine* store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override { sync_marker.marker = new_marker; @@ -1031,7 +1045,7 @@ public: return new RGWSimpleRadosWriteCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, marker_oid), - sync_marker); + sync_marker, &objv); } RGWOrderCallCR *allocate_order_control_cr() override { @@ -1631,6 +1645,7 @@ protected: const rgw_raw_obj& error_repo; boost::intrusive_ptr lease_cr; const rgw_data_sync_status& sync_status; + RGWObjVersionTracker& objv; boost::intrusive_ptr bucket_shard_cache; std::optional marker_tracker; @@ -1648,11 +1663,13 @@ protected: const string& status_oid, const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, const rgw_data_sync_status& sync_status, + RGWObjVersionTracker& objv, const boost::intrusive_ptr& bucket_shard_cache) : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id), sync_marker(sync_marker), tn(tn), status_oid(status_oid), error_repo(error_repo), lease_cr(std::move(lease_cr)), - sync_status(sync_status), bucket_shard_cache(bucket_shard_cache) {} + sync_status(sync_status), objv(objv), + bucket_shard_cache(bucket_shard_cache) {} }; class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR { @@ -1672,17 +1689,17 @@ public: rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn, const string& status_oid, const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, - const rgw_data_sync_status& sync_status, + const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv, const boost::intrusive_ptr& bucket_shard_cache) : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, status_oid, error_repo, std::move(lease_cr), - sync_status, bucket_shard_cache) {} + sync_status, objv, bucket_shard_cache) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { tn->log(10, "start full sync"); oid = full_data_sync_index_shard_oid(sc->source_zone, shard_id); - marker_tracker.emplace(sc, status_oid, sync_marker, tn); + marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv); total_entries = sync_marker.pos; entry_timestamp = sync_marker.timestamp; // time when full sync started do { @@ -1744,7 +1761,7 @@ public: sync_marker.next_step_marker.clear(); yield call(new RGWSimpleRadosWriteCR( sc->env->dpp,sc->env->async_rados, sc->env->svc->sysobj, - rgw_raw_obj(pool, status_oid), sync_marker)); + rgw_raw_obj(pool, status_oid), sync_marker, &objv)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to set sync marker: retcode=" << retcode)); return set_cr_error(retcode); @@ -1805,19 +1822,19 @@ public: rgw_data_sync_marker& sync_marker, RGWSyncTraceNodeRef tn, const string& status_oid, const rgw_raw_obj& error_repo, boost::intrusive_ptr lease_cr, - const rgw_data_sync_status& sync_status, + const rgw_data_sync_status& sync_status, RGWObjVersionTracker& objv, const boost::intrusive_ptr& bucket_shard_cache, ceph::mutex& inc_lock, bc::flat_set& modified_shards) : RGWDataBaseSyncShardCR(sc, pool, shard_id, sync_marker, tn, status_oid, error_repo, std::move(lease_cr), - sync_status, bucket_shard_cache), + sync_status, objv, bucket_shard_cache), inc_lock(inc_lock), modified_shards(modified_shards) {} int operate(const DoutPrefixProvider *dpp) override { reenter(this) { tn->log(10, "start incremental sync"); - marker_tracker.emplace(sc, status_oid, sync_marker, tn); + marker_tracker.emplace(sc, status_oid, sync_marker, tn, objv); do { if (!lease_cr->is_locked()) { drain_all(); @@ -1976,6 +1993,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { rgw_data_sync_marker& sync_marker; rgw_data_sync_status sync_status; const RGWSyncTraceNodeRef tn; + RGWObjVersionTracker& objv; bool *reset_backoff; ceph::mutex inc_lock = ceph::make_mutex("RGWDataSyncShardCR::inc_lock"); @@ -2001,10 +2019,10 @@ public: RGWDataSyncShardCR(RGWDataSyncCtx* const _sc, const rgw_pool& pool, const uint32_t shard_id, rgw_data_sync_marker& marker, const rgw_data_sync_status& sync_status, - RGWSyncTraceNodeRef& tn, bool *reset_backoff) + RGWSyncTraceNodeRef& tn, RGWObjVersionTracker& objv, bool *reset_backoff) : RGWCoroutine(_sc->cct), sc(_sc), pool(pool), shard_id(shard_id), sync_marker(marker), sync_status(sync_status), tn(tn), - reset_backoff(reset_backoff) { + objv(objv), reset_backoff(reset_backoff) { set_description() << "data sync shard source_zone=" << sc->source_zone << " shard_id=" << shard_id; } @@ -2042,7 +2060,7 @@ public: sync_marker, tn, status_oid, error_repo, lease_cr, sync_status, - bucket_shard_cache)); + objv, bucket_shard_cache)); if (retcode < 0) { if (retcode != -EBUSY) { tn->log(10, SSTR("full sync failed (retcode=" << retcode << ")")); @@ -2056,7 +2074,7 @@ public: sync_marker, tn, status_oid, error_repo, lease_cr, sync_status, - bucket_shard_cache, + objv, bucket_shard_cache, inc_lock, modified_shards)); if (retcode < 0) { if (retcode != -EBUSY) { @@ -2103,25 +2121,29 @@ class RGWDataSyncShardControlCR : public RGWBackoffControlCR { rgw_data_sync_status sync_status; RGWSyncTraceNodeRef tn; + RGWObjVersionTracker& objv; public: RGWDataSyncShardControlCR(RGWDataSyncCtx *_sc, const rgw_pool& _pool, - uint32_t _shard_id, rgw_data_sync_marker& _marker, const rgw_data_sync_status& sync_status, - RGWSyncTraceNodeRef& _tn_parent) : RGWBackoffControlCR(_sc->cct, false), - sc(_sc), sync_env(_sc->env), - pool(_pool), - shard_id(_shard_id), - sync_marker(_marker) { + uint32_t _shard_id, rgw_data_sync_marker& _marker, + const rgw_data_sync_status& sync_status, + RGWObjVersionTracker& objv, + RGWSyncTraceNodeRef& _tn_parent) + : RGWBackoffControlCR(_sc->cct, false), + sc(_sc), sync_env(_sc->env), + pool(_pool), + shard_id(_shard_id), + sync_marker(_marker), objv(objv) { tn = sync_env->sync_tracer->add_node(_tn_parent, "shard", std::to_string(shard_id)); } RGWCoroutine *alloc_cr() override { - return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, backoff_ptr()); + return new RGWDataSyncShardCR(sc, pool, shard_id, sync_marker, sync_status, tn, objv, backoff_ptr()); } RGWCoroutine *alloc_finisher_cr() override { return new RGWSimpleRadosReadCR(sync_env->dpp, sync_env->async_rados, sync_env->svc->sysobj, rgw_raw_obj(sync_env->svc->zone->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sc->source_zone, shard_id)), - &sync_marker); + &sync_marker, true, &objv); } void append_modified_shards(bc::flat_set& keys) { @@ -2142,6 +2164,7 @@ class RGWDataSyncCR : public RGWCoroutine { uint32_t num_shards; rgw_data_sync_status sync_status; + std::vector objvs; ceph::mutex shard_crs_lock = ceph::make_mutex("RGWDataSyncCR::shard_crs_lock"); @@ -2152,6 +2175,7 @@ class RGWDataSyncCR : public RGWCoroutine { RGWSyncTraceNodeRef tn; RGWDataSyncModule *data_sync_module{nullptr}; + RGWObjVersionTracker objv; public: RGWDataSyncCR(RGWDataSyncCtx *_sc, uint32_t _num_shards, RGWSyncTraceNodeRef& _tn, bool *_reset_backoff) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env), @@ -2170,7 +2194,7 @@ public: reenter(this) { /* read sync status */ - yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status)); + yield call(new RGWReadDataSyncStatusCoroutine(sc, &sync_status, objvs)); data_sync_module = sync_env->sync_module->get_data_handler(); @@ -2185,7 +2209,7 @@ public: sync_status.sync_info.num_shards = num_shards; uint64_t instance_id; instance_id = ceph::util::generate_random_number(); - yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status)); + yield call(new RGWInitDataSyncStatusCoroutine(sc, num_shards, instance_id, tn, &sync_status, objvs)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to init sync, retcode=" << retcode)); return set_cr_error(retcode); @@ -2207,7 +2231,7 @@ public: return set_cr_error(retcode); } /* state: building full sync maps */ - yield call(new RGWListBucketIndexesCR(sc, &sync_status)); + yield call(new RGWListBucketIndexesCR(sc, &sync_status, objvs)); if (retcode < 0) { tn->log(0, SSTR("ERROR: failed to build full sync maps, retcode=" << retcode)); return set_cr_error(retcode); @@ -2236,7 +2260,7 @@ public: for (map::iterator iter = sync_status.sync_markers.begin(); iter != sync_status.sync_markers.end(); ++iter) { RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sc, sync_env->svc->zone->get_zone_params().log_pool, - iter->first, iter->second, sync_status, tn); + iter->first, iter->second, sync_status, objvs[iter->first], tn); cr->get(); shard_crs_lock.lock(); shard_crs[iter->first] = cr;