From e286158d3e6ea5d8aee79dfaf5773a6e7355d796 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 7 Nov 2019 16:56:52 -0800 Subject: [PATCH] rgw: data sync: use hints when determining sync pipes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_common.h | 8 ++ src/rgw/rgw_data_sync.cc | 12 +- src/rgw/services/svc_bucket_sync_sobj.cc | 150 ++++++++++++++--------- src/rgw/services/svc_bucket_sync_sobj.h | 29 +---- 4 files changed, 109 insertions(+), 90 deletions(-) diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index ce206281146d1..0df1d90ec15b3 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -1178,6 +1178,14 @@ struct rgw_bucket { rgw_bucket(const rgw_bucket&) = default; rgw_bucket(rgw_bucket&&) = default; + bool match(const rgw_bucket& b) const { + return (tenant == b.tenant && + name == b.name && + (bucket_id == b.bucket_id || + bucket_id.empty() || + b.bucket_id.empty())); + } + void convert(cls_user_bucket *b) const { b->name = name; b->marker = marker; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 3b28f934b82e4..4455a48538bff 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3650,7 +3650,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine { continue; } if (source_bucket && - *source_bucket != *pipe.source.bucket) { + !source_bucket->match(*pipe.source.bucket)) { continue; } ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl; @@ -3671,7 +3671,7 @@ class RGWGetBucketPeersCR : public RGWCoroutine { for (auto& pipe : i->second.pipes) { if (target_bucket && pipe.dest.bucket && - *target_bucket != *pipe.dest.bucket) { + !target_bucket->match(*pipe.dest.bucket)) { ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl; continue; } @@ -3726,12 +3726,6 @@ public: SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source=" << target_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) { - if (target_bucket) { - target_policy = make_shared(); - } - if (source_bucket) { - source_policy = make_shared(); - } } int operate() override; @@ -4001,6 +3995,7 @@ int RGWGetBucketPeersCR::operate() if (source_bucket && source_zone) { get_policy_params.zone = source_zone; get_policy_params.bucket = *source_bucket; + source_policy = make_shared(); yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, get_policy_params, @@ -4032,6 +4027,7 @@ int RGWGetBucketPeersCR::operate() get_policy_params.zone = nullopt; get_policy_params.bucket = *hiter; + target_policy = make_shared(); yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, get_policy_params, diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index 0244b57c95b7e..0f488c1cc3e2a 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -11,6 +11,37 @@ static string bucket_sync_sources_oid_prefix = "bucket.sync-source-hints"; static string bucket_sync_targets_oid_prefix = "bucket.sync-target-hints"; +class RGWSI_Bucket_Sync_SObj_HintIndexManager { + CephContext *cct; + + struct { + RGWSI_Zone *zone; + RGWSI_SysObj *sysobj; + } svc; + +public: + RGWSI_Bucket_Sync_SObj_HintIndexManager(RGWSI_Zone *_zone_svc, + RGWSI_SysObj *_sysobj_svc) { + svc.zone = _zone_svc; + svc.sysobj = _sysobj_svc; + + cct = svc.zone->ctx(); + } + + rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const; + rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const; + + template + int update_hints(const RGWBucketInfo& bucket_info, + C1& added_dests, + C2& removed_dests, + C1& added_sources, + C2& removed_sources, + optional_yield y); +}; + +RGWSI_Bucket_Sync_SObj::RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) { +} RGWSI_Bucket_Sync_SObj::~RGWSI_Bucket_Sync_SObj() { } @@ -24,8 +55,7 @@ void RGWSI_Bucket_Sync_SObj::init(RGWSI_Zone *_zone_svc, svc.cache = _cache_svc; svc.bucket_sobj = bucket_sobj_svc; - hint_index_mgr.init(svc.zone, - svc.sysobj); + hint_index_mgr.reset(new RGWSI_Bucket_Sync_SObj_HintIndexManager(svc.zone, svc.sysobj)); } int RGWSI_Bucket_Sync_SObj::do_start() @@ -117,11 +147,13 @@ static bool diff_sets(std::set& orig_set, ++niter; continue; } - while (*oiter < *niter) { + while (*oiter < *niter && + oiter != orig_set.end()) { removed->push_back(*oiter); ++oiter; } - while (*niter < *oiter) { + while (*niter < *oiter + && niter != new_set.end()) { added->push_back(*niter); ++niter; } @@ -319,17 +351,19 @@ public: svc.sysobj = _sysobj_svc; } + template int update(const rgw_bucket& entity, const RGWBucketInfo& info_source, - std::optional > add, - std::optional > remove, + C1 *add, + C2 *remove, optional_yield y); private: + template void update_entries(const rgw_bucket& info_source, const obj_version& info_source_ver, - std::optional > add, - std::optional > remove, + C1 *add, + C2 *remove, single_instance_info *instance); int read(optional_yield y); @@ -349,10 +383,11 @@ WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::bi_entry) WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::single_instance_info) WRITE_CLASS_ENCODER(RGWSI_BS_SObj_HintIndexObj::info_map) +template int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity, const RGWBucketInfo& info_source, - std::optional > add, - std::optional > remove, + C1 *add, + C2 *remove, optional_yield y) { int r = 0; @@ -396,10 +431,11 @@ int RGWSI_BS_SObj_HintIndexObj::update(const rgw_bucket& entity, return -EIO; } +template void RGWSI_BS_SObj_HintIndexObj::update_entries(const rgw_bucket& info_source, const obj_version& info_source_ver, - std::optional > add, - std::optional > remove, + C1 *add, + C2 *remove, single_instance_info *instance) { if (remove) { @@ -468,7 +504,7 @@ int RGWSI_BS_SObj_HintIndexObj::flush(optional_yield y) { return 0; } -rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const +rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_sources_obj(const rgw_bucket& bucket) const { rgw_bucket b = bucket; b.bucket_id.clear(); @@ -476,7 +512,7 @@ rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_sources_obj(const rgw_ bucket_sync_sources_oid_prefix + "." + b.get_key()); } -rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const +rgw_raw_obj RGWSI_Bucket_Sync_SObj_HintIndexManager::get_dests_obj(const rgw_bucket& bucket) const { rgw_bucket b = bucket; b.bucket_id.clear(); @@ -484,39 +520,39 @@ rgw_raw_obj RGWSI_Bucket_Sync_SObj::HintIndexManager::get_dests_obj(const rgw_bu bucket_sync_targets_oid_prefix + "." + b.get_key()); } -int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, - std::vector& added_dests, - std::vector& removed_dests, - std::vector& added_sources, - std::vector& removed_sources, - optional_yield y) +template +int RGWSI_Bucket_Sync_SObj_HintIndexManager::update_hints(const RGWBucketInfo& bucket_info, + C1& added_dests, + C2& removed_dests, + C1& added_sources, + C2& removed_sources, + optional_yield y) { - std::vector self_entity; - self_entity.push_back(bucket_info.bucket); + C1 self_entity = { bucket_info.bucket }; if (!added_dests.empty() || !removed_dests.empty()) { /* update our dests */ RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, - hint_index_mgr.get_dests_obj(bucket_info.bucket)); + get_dests_obj(bucket_info.bucket)); int r = index.update(bucket_info.bucket, bucket_info, - added_dests, - removed_dests, + &added_dests, + &removed_dests, y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; return r; } - /* update added dest buckets */ + /* update dest buckets */ for (auto& dest_bucket : added_dests) { RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, - hint_index_mgr.get_sources_obj(dest_bucket)); + get_sources_obj(dest_bucket)); int r = dep_index.update(dest_bucket, bucket_info, - self_entity, - std::nullopt, + &self_entity, + static_cast(nullptr), y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; @@ -526,11 +562,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, /* update removed dest buckets */ for (auto& dest_bucket : removed_dests) { RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, - hint_index_mgr.get_sources_obj(dest_bucket)); + get_sources_obj(dest_bucket)); int r = dep_index.update(dest_bucket, bucket_info, - std::nullopt, - self_entity, + static_cast(nullptr), + &self_entity, y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << dest_bucket << " r=" << r << dendl; @@ -539,15 +575,15 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, } } - if (!added_dests.empty() || - !removed_dests.empty()) { + if (!added_sources.empty() || + !removed_sources.empty()) { RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, - hint_index_mgr.get_sources_obj(bucket_info.bucket)); + get_sources_obj(bucket_info.bucket)); /* update our sources */ int r = index.update(bucket_info.bucket, bucket_info, - added_sources, - removed_sources, + &added_sources, + &removed_sources, y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << bucket_info.bucket << " r=" << r << dendl; @@ -557,11 +593,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, /* update added sources buckets */ for (auto& source_bucket : added_sources) { RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, - hint_index_mgr.get_dests_obj(source_bucket)); + get_dests_obj(source_bucket)); int r = dep_index.update(source_bucket, bucket_info, - self_entity, - std::nullopt, + &self_entity, + static_cast(nullptr), y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; @@ -571,11 +607,11 @@ int RGWSI_Bucket_Sync_SObj::do_update_hints(const RGWBucketInfo& bucket_info, /* update removed dest buckets */ for (auto& source_bucket : removed_sources) { RGWSI_BS_SObj_HintIndexObj dep_index(svc.sysobj, - hint_index_mgr.get_dests_obj(source_bucket)); + get_dests_obj(source_bucket)); int r = dep_index.update(source_bucket, bucket_info, - std::nullopt, - self_entity, + static_cast(nullptr), + &self_entity, y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update targets index for bucket=" << source_bucket << " r=" << r << dendl; @@ -614,12 +650,12 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_removal(const RGWBucketInfo& bucket_info, std::vector added_sources; std::vector added_dests; - return do_update_hints(bucket_info, - added_dests, - removed_dests, - added_sources, - removed_sources, - y); + return hint_index_mgr->update_hints(bucket_info, + added_dests, + removed_dests, + added_sources, + removed_sources, + y); } int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, @@ -661,12 +697,12 @@ int RGWSI_Bucket_Sync_SObj::handle_bi_update(RGWBucketInfo& bucket_info, return 0; } - return do_update_hints(bucket_info, - added_dests, - removed_dests, - added_sources, - removed_sources, - y); + return hint_index_mgr->update_hints(bucket_info, + dests, /* set all dests, not just the ones that were added */ + removed_dests, + sources, /* set all sources, not just that the ones that were added */ + removed_sources, + y); } int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket, @@ -680,7 +716,7 @@ int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket, if (sources) { RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, - hint_index_mgr.get_sources_obj(bucket)); + hint_index_mgr->get_sources_obj(bucket)); int r = index.read(y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to update sources index for bucket=" << bucket << " r=" << r << dendl; @@ -698,7 +734,7 @@ int RGWSI_Bucket_Sync_SObj::get_bucket_sync_hints(const rgw_bucket& bucket, if (dests) { RGWSI_BS_SObj_HintIndexObj index(svc.sysobj, - hint_index_mgr.get_dests_obj(bucket)); + hint_index_mgr->get_dests_obj(bucket)); int r = index.read(y); if (r < 0) { ldout(cct, 0) << "ERROR: failed to read targets index for bucket=" << bucket << " r=" << r << dendl; diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h index a0305e12b4d93..02fa66b01c13c 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.h +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -29,6 +29,8 @@ class RGWSI_Bucket_SObj; template class RGWChainedCacheImpl; +class RGWSI_Bucket_Sync_SObj_HintIndexManager; + class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync { struct bucket_sync_policy_cache_entry { @@ -38,33 +40,10 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync using RGWChainedCacheImpl_bucket_sync_policy_cache_entry = RGWChainedCacheImpl; unique_ptr sync_policy_cache; - class HintIndexManager { - struct { - RGWSI_Zone *zone; - RGWSI_SysObj *sysobj; - } svc; - - public: - HintIndexManager() {} - - void init(RGWSI_Zone *_zone_svc, - RGWSI_SysObj *_sysobj_svc) { - svc.zone = _zone_svc; - svc.sysobj = _sysobj_svc; - } - - rgw_raw_obj get_sources_obj(const rgw_bucket& bucket) const; - rgw_raw_obj get_dests_obj(const rgw_bucket& bucket) const; - } hint_index_mgr; + std::unique_ptr hint_index_mgr; int do_start() override; - int do_update_hints(const RGWBucketInfo& bucket_info, - std::vector& added_dests, - std::vector& removed_dests, - std::vector& added_sources, - std::vector& removed_sources, - optional_yield y); public: struct Svc { RGWSI_Zone *zone{nullptr}; @@ -73,7 +52,7 @@ public: RGWSI_Bucket_SObj *bucket_sobj{nullptr}; } svc; - RGWSI_Bucket_Sync_SObj(CephContext *cct) : RGWSI_Bucket_Sync(cct) {} + RGWSI_Bucket_Sync_SObj(CephContext *cct); ~RGWSI_Bucket_Sync_SObj(); void init(RGWSI_Zone *_zone_svc, -- 2.39.5