From e4940631f33cee3d407559df81c49cc71175caef Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 30 Oct 2019 18:14:54 -0700 Subject: [PATCH] rgw: data sync: rework RGWGetBucketPeersCR so that it returns pipes with bucket info Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.h | 8 + src/rgw/rgw_data_sync.cc | 419 +++++++++++++++++++++++++++----------- src/rgw/rgw_sync_policy.h | 8 + 3 files changed, 312 insertions(+), 123 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 2368ffb097d..17cd4289e98 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -115,6 +115,10 @@ public: using iterator = std::set::iterator; + void clear() { + pipes.clear(); + } + void insert(const rgw_sync_bucket_pipe& pipe) { pipes.insert(pipe); } @@ -221,6 +225,10 @@ public: return sources; } + const map& get_targets() { + return targets; + } + const std::optional& get_bucket_info() const { return bucket_info; } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2e0e3ad5f2a..e0a902725eb 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3351,19 +3351,19 @@ int RGWBucketShardIncrementalSyncCR::operate() class RGWBucketSyncPeersManager { public: - static string sync_sources_oid(const rgw_bucket bucket) { + static string sync_sources_oid(const rgw_bucket& bucket) { return bucket_sync_sources_oid_prefix + "." + bucket.get_key(); } static rgw_raw_obj sync_sources_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_sources_oid(bucket)); } - static string sync_targets_oid(const rgw_bucket bucket) { - return bucket_sync_targets_oid_prefix + "." + bucket.get_key(); + static string sync_targets_oid(const string& source_zone, const rgw_bucket& source_bucket) { + return bucket_sync_targets_oid_prefix + "." + source_zone + "." + source_bucket.get_key(); } - static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const rgw_bucket& bucket) { - return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(bucket)); + static rgw_raw_obj sync_targets_obj(RGWSI_Zone *zone_svc, const string& source_zone, const rgw_bucket& source_bucket) { + return rgw_raw_obj(zone_svc->get_zone_params().log_pool, sync_targets_oid(source_zone, source_bucket)); } }; @@ -3421,6 +3421,142 @@ struct rgw_bucket_sync_sources_local_info { }; WRITE_CLASS_ENCODER(rgw_bucket_sync_sources_local_info) + +struct rgw_sync_pipe_info_entity +{ +private: + RGWBucketInfo bucket_info; + bool _has_bucket_info{false}; + +public: + string zone; + + rgw_sync_pipe_info_entity() {} + rgw_sync_pipe_info_entity(const rgw_sync_bucket_entity& e, + std::optional& binfo) { + if (e.zone) { + zone = *e.zone; + } + if (!e.bucket) { + return; + } + if (!binfo || + binfo->bucket != *e.bucket) { + bucket_info.bucket = *e.bucket; + } + set_bucket_info(*binfo); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + if (_has_bucket_info) { + return; + } + if (bucket_info.bucket.name.empty()) { + return; + } + + auto iter = buckets_info.find(bucket_info.bucket); + if (iter == buckets_info.end()) { + return; + } + + set_bucket_info(iter->second); + } + + bool has_bucket_info() const { + return _has_bucket_info; + } + + void set_bucket_info(const RGWBucketInfo& _bucket_info) { + bucket_info = _bucket_info; + _has_bucket_info = true; + } + + const RGWBucketInfo& get_bucket_info() const { + return bucket_info; + } + + const rgw_bucket& get_bucket() const { + return bucket_info.bucket; + } + + bool operator<(const rgw_sync_pipe_info_entity& e) const { + if (zone < e.zone) { + return false; + } + if (zone > e.zone) { + return true; + } + return (bucket_info.bucket < e.bucket_info.bucket); + } +}; + +struct rgw_sync_pipe_info { + rgw_sync_pipe_info_entity source; + rgw_sync_pipe_info_entity target; + + rgw_sync_pipe_info() {} + rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe, + std::optional source_bucket_info, + std::optional target_bucket_info) : source(pipe.source, source_bucket_info), + target(pipe.dest, target_bucket_info) {} + + bool operator<(const rgw_sync_pipe_info& p) const { + if (source < p.source) { + return true; + } + if (p.source < source) { + return false; + } + return (target < p.target); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + source.update_empty_bucket_info(buckets_info); + target.update_empty_bucket_info(buckets_info); + } +}; + +struct rgw_sync_pipe_info_set { + std::set pipes; + + using iterator = std::set::iterator; + + void clear() { + pipes.clear(); + } + + void insert(const rgw_sync_bucket_pipe& pipe, + std::optional& source_bucket_info, + std::optional& target_bucket_info) { + rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info); + pipes.insert(p); + } + + iterator begin() { + return pipes.begin(); + } + + iterator end() { + return pipes.end(); + } + + void update_empty_bucket_info(const std::map& buckets_info) { + if (buckets_info.empty()) { + return; + } + + std::set p; + + for (auto pipe : pipes) { + pipe.update_empty_bucket_info(buckets_info); + p.insert(pipe); + } + + pipes = std::move(p); + } +}; + class RGWGetBucketPeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -3428,18 +3564,20 @@ class RGWGetBucketPeersCR : public RGWCoroutine { std::optional source_zone; std::optional source_bucket; - RGWBucketSyncFlowManager::pipe_set *sources; - map *sources_info; + rgw_sync_pipe_info_set *pipes; + map buckets_info; map::iterator siiter; - RGWBucketInfo *pbucket_info; + std::optional target_bucket_info; + std::optional source_bucket_info; - RGWBucketSyncFlowManager::pipe_set::iterator siter; + rgw_sync_pipe_info_set::iterator siter; rgw_bucket_sync_sources_local_info sources_local_info; rgw_bucket_sync_sources_local_info expected_local_info; rgw_bucket_get_sync_policy_params get_policy_params; - std::shared_ptr policy; + std::shared_ptr source_policy; + std::shared_ptr target_policy; RGWSyncTraceNodeRef tn; @@ -3457,81 +3595,86 @@ class RGWGetBucketPeersCR : public RGWCoroutine { return { b, std::next(b) }; } - static RGWBucketSyncFlowManager::pipe_set filter_sources(std::optional source_zone, - std::optional source_bucket, - const map& all_sources) { - RGWBucketSyncFlowManager::pipe_set result; - + void filter_sources(std::optional source_zone, + std::optional source_bucket, + const map& all_sources, + rgw_sync_pipe_info_set *result) { auto iters = get_pipe_iters(all_sources, source_zone); for (auto i = iters.first; i != iters.second; ++i) { - for (auto& peer : i->second.pipes) { + for (auto& pipe : i->second.pipes) { + if (!pipe.specific()) { + continue; + } if (source_bucket && - peer.source.bucket && - *source_bucket != *peer.source.bucket) { + *source_bucket != *pipe.source.bucket) { continue; } - result.insert(peer); + result->insert(pipe, source_bucket_info, target_bucket_info); } } - return result; } - static RGWBucketSyncFlowManager::pipe_set filter_targets(std::optional target_zone, - std::optional target_bucket, - const map& all_targets) { - RGWBucketSyncFlowManager::pipe_set result; - + void filter_targets(std::optional target_zone, + std::optional target_bucket, + const map& all_targets, + rgw_sync_pipe_info_set *result) { auto iters = get_pipe_iters(all_targets, target_zone); for (auto i = iters.first; i != iters.second; ++i) { - for (auto& peer : i->second.pipes) { + for (auto& pipe : i->second.pipes) { if (target_bucket && - peer.dest.bucket && - *target_bucket != *peer.dest.bucket) { + pipe.dest.bucket && + *target_bucket != *pipe.dest.bucket) { continue; } - result.insert(peer); + result->insert(pipe, source_bucket_info, target_bucket_info); } } - return result; } + void update_from_target_bucket_policy(); + void update_from_source_bucket_policy(); + public: RGWGetBucketPeersCR(RGWDataSyncEnv *_sync_env, std::optional _target_bucket, std::optional _source_zone, std::optional _source_bucket, - RGWBucketSyncFlowManager::pipe_set *_sources, - map *_sources_info, - RGWBucketInfo *_pbucket_info, + rgw_sync_pipe_info_set *_pipes, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), target_bucket(_target_bucket), source_zone(_source_zone), - sources(_sources), - sources_info(_sources_info), - pbucket_info(_pbucket_info), - policy(make_shared()), + pipes(_pipes), tn(sync_env->sync_tracer->add_node(_tn_parent, "get_bucket_peers", SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source=" << target_bucket.value_or(rgw_bucket()) - << ":source_zone=" << source_zone.value_or("*")))) {} + << ":source_zone=" << source_zone.value_or("*")))) { + if (target_bucket) { + target_policy = make_shared(); + } + if (source_bucket) { + source_policy = make_shared(); + } + } int operate() override; }; class RGWRunBucketSourcesSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - rgw_bucket bucket; - RGWBucketInfo bucket_info; + std::optional target_bs; std::optional source_zone; + std::optional source_bs; + + std::optional target_bucket; + std::optional source_bucket; rgw_raw_obj sources_obj; - RGWBucketSyncFlowManager::pipe_set sources; - map sources_info; - RGWBucketSyncFlowManager::pipe_set::iterator siter; + rgw_sync_pipe_info_set pipes; + rgw_sync_pipe_info_set::iterator siter; rgw_bucket_sync_pair_info sync_pair; @@ -3549,16 +3692,24 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { public: RGWRunBucketSourcesSyncCR(RGWDataSyncEnv *_sync_env, - const rgw_bucket& _bucket, - optional _source_zone, + std::optional _target_bs, + std::optional _source_zone, + std::optional _source_bs, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket(_bucket), + target_bs(_target_bs), source_zone(_source_zone), - sources_obj(RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, bucket)), + source_bs(_source_bs), tn(sync_env->sync_tracer->add_node(_tn_parent, "bucket_sync_sources", - SSTR(bucket))) { + SSTR( "target=" << target_bucket.value_or(rgw_bucket()) << ":source_bucket=" << source_bucket.value_or(rgw_bucket()) << ":source_zone=" << source_zone.value_or("*")))) { + if (target_bs) { + target_bucket = target_bs->bucket; + sources_obj = RGWBucketSyncPeersManager::sync_sources_obj(_sync_env->svc->zone, *target_bucket); + } + if (source_bs) { + source_bucket = source_bs->bucket; + } } ~RGWRunBucketSourcesSyncCR() override { if (lease_cr) { @@ -3593,7 +3744,7 @@ int RGWRunBucketSourcesSyncCR::operate() } tn->log(10, "took lease"); - yield call(new RGWGetBucketPeersCR(sync_env, bucket, source_zone, std::nullopt, &sources, &sources_info, &bucket_info, tn)); + yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, source_zone, source_bucket, &pipes, tn)); if (retcode < 0 && retcode != -ENOENT) { tn->log(0, "ERROR: failed to read sync status for bucket"); lease_cr->go_down(); @@ -3601,16 +3752,12 @@ int RGWRunBucketSourcesSyncCR::operate() return set_cr_error(retcode); } - for (siter = sources.begin(); siter != sources.end(); ++siter) { + for (siter = pipes.begin(); siter != pipes.end(); ++siter) { scs.emplace_back(); cur_sc = &scs.back(); - if (!siter->source.zone || - !siter->source.bucket) { - continue; - } { - auto& szone = *siter->source.zone; + auto& szone = siter->source.zone; if (last_zone != szone) { conn = sync_env->svc->zone->get_zone_conn_by_id(szone); if (!conn) { @@ -3622,8 +3769,9 @@ int RGWRunBucketSourcesSyncCR::operate() cur_sc->init(sync_env, conn, szone); } - sync_pair.source_bs.bucket = *siter->source.bucket; - sync_pair.dest_bs.bucket = bucket_info.bucket; + sync_pair.source_bs.bucket = siter->source.get_bucket(); + sync_pair.dest_bs.bucket = siter->target.get_bucket(); +#warning TODO iterate over shards yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) { @@ -3710,9 +3858,58 @@ int RGWSyncGetBucketInfoCR::operate() return 0; } +void RGWGetBucketPeersCR::update_from_target_bucket_policy() +{ + auto handler = target_policy->policy_handler.get(); + + if (!pipes) { + return; + } + + filter_sources(source_zone, + source_bucket, + handler->get_sources(), + pipes); + + for (siter = pipes->begin(); siter != pipes->end(); ++siter) { + if (!siter->source.has_bucket_info()) { + buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo()); + } + if (!siter->target.has_bucket_info()) { + buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo()); + } + } +} + +void RGWGetBucketPeersCR::update_from_source_bucket_policy() +{ + auto handler = source_policy->policy_handler.get(); + + if (!pipes) { + return; + } + + filter_targets(sync_env->svc->zone->get_zone().id, + target_bucket, + handler->get_targets(), + pipes); + + for (siter = pipes->begin(); siter != pipes->end(); ++siter) { + if (!siter->source.has_bucket_info()) { + buckets_info.emplace(siter->source.get_bucket(), RGWBucketInfo()); + } + if (!siter->target.has_bucket_info()) { + buckets_info.emplace(siter->target.get_bucket(), RGWBucketInfo()); + } + } +} + int RGWGetBucketPeersCR::operate() { reenter(this) { + if (pipes) { + pipes->clear(); + } if (target_bucket) { yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, sync_env->svc->sysobj, @@ -3722,55 +3919,53 @@ int RGWGetBucketPeersCR::operate() retcode != -ENOENT) { return set_cr_error(retcode); } - } - get_policy_params.bucket = *target_bucket; - yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, - sync_env->store, - get_policy_params, - policy)); - if (retcode < 0 && - retcode != -ENOENT) { - return set_cr_error(retcode); + get_policy_params.bucket = *target_bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, + sync_env->store, + get_policy_params, + target_policy)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); + } } -#warning update local copy of sources and act on changes - { - auto& handler = policy->policy_handler; - - if (sources) { - *sources = filter_sources(source_zone, - source_bucket, - handler->get_sources()); + if (source_bucket && source_zone) { + yield call(new RGWSimpleRadosReadCR(sync_env->async_rados, + sync_env->svc->sysobj, + RGWBucketSyncPeersManager::sync_targets_obj(sync_env->svc->zone, *source_zone, *source_bucket), + &sources_local_info)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); } - auto& binfo = handler->get_bucket_info(); - if (binfo) { - *pbucket_info = *binfo; - if (sources_info) { - (*sources_info)[binfo->bucket] = *binfo; - } + get_policy_params.bucket = *source_bucket; + yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, + sync_env->store, + get_policy_params, + source_policy)); + if (retcode < 0 && + retcode != -ENOENT) { + return set_cr_error(retcode); } + + source_bucket_info = source_policy->policy_handler->get_bucket_info(); } +#warning update local copy of sources and act on changes - if (sources && sources_info) { - for (siter = sources->begin(); siter != sources->end(); ++siter) { - source_bucket = siter->source.bucket; - if (!source_bucket) { - continue; - } - if (sources_info->find(*source_bucket) != sources_info->end()) { - continue; - } + update_from_target_bucket_policy(); + update_from_source_bucket_policy(); - sources_info->emplace(*source_bucket, RGWBucketInfo()); /* reserve space for it, will fetch it later when map cannot change */ + for (siiter = buckets_info.begin(); siiter != buckets_info.end(); ++siiter) { + if (siiter->second.bucket.name.empty()) { + yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn)); } + } - for (siiter = sources_info->begin(); siiter != sources_info->end(); ++siiter) { - if (siiter->second.bucket.name.empty()) { - yield call(new RGWSyncGetBucketInfoCR(sync_env, siiter->first, &siiter->second, tn)); - } - } + if (pipes) { + pipes->update_empty_bucket_info(buckets_info); } return set_cr_done(); @@ -3921,17 +4116,13 @@ int RGWBucketPipeSyncStatusManager::init() error_logger, store->getRados()->get_sync_tracer(), sync_module, nullptr); - RGWBucketSyncFlowManager::pipe_set sources; - map sources_info; - RGWBucketInfo dest_bucket_info; + rgw_sync_pipe_info_set pipes; ret = cr_mgr.run(new RGWGetBucketPeersCR(&sync_env, dest_bucket, source_zone, std::nullopt, - &sources, - &sources_info, - &dest_bucket_info, + &pipes, sync_env.sync_tracer->root_node)); if (ret < 0) { ldpp_dout(this, 0) << "failed to get bucket source peers info: (ret=" << ret << "): " << cpp_strerror(-ret) << dendl; @@ -3940,12 +4131,8 @@ int RGWBucketPipeSyncStatusManager::init() string last_zone; - for (auto& peer : sources) { - if (!peer.source.zone) { - continue; - } - - auto szone = *peer.source.zone; + for (auto& pipe : pipes) { + auto& szone = pipe.source.zone; if (last_zone != szone) { conn = store->svc()->zone->get_zone_conn_by_id(szone); @@ -3956,24 +4143,10 @@ int RGWBucketPipeSyncStatusManager::init() last_zone = szone; } - auto& source_bucket = peer.source.bucket; - - if (!source_bucket) { - continue; - } - - auto iter = sources_info.find(*source_bucket); - if (iter == sources_info.end()) { - ldpp_dout(this, 0) << "ERROR: " << __func__ << "(): failed to find bucket info for bucket=" << *source_bucket << ". Likely a bug" << dendl; - return -EIO; - } - - auto& source_bucket_info = iter->second; - source_mgrs.push_back(new RGWRemoteBucketManager(this, &sync_env, szone, conn, - source_bucket_info, - dest_bucket_info.bucket)); + pipe.source.get_bucket_info(), + pipe.target.get_bucket())); } return 0; diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 57081a62aae..b38df78e46f 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -86,6 +86,10 @@ struct rgw_sync_bucket_entity { std::optional _bucket) : zone(_zone), bucket(_bucket.value_or(rgw_bucket())) {} + bool specific() const { + return zone && bucket; + } + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(all_zones, bl); @@ -166,6 +170,10 @@ public: rgw_sync_bucket_entity source; rgw_sync_bucket_entity dest; + bool specific() const { + return source.specific() && dest.specific(); + } + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(source, bl); -- 2.39.5