From: Yehuda Sadeh Date: Thu, 24 Oct 2019 22:58:06 +0000 (-0700) Subject: rgw: sync flow handler: integrate with flow manager X-Git-Tag: v15.1.0~22^2~83 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=eb8df1dfaa9a391df791d5433c270f8333a9b618;p=ceph.git rgw: sync flow handler: integrate with flow manager Convert zone names to zone ids, adjust interfaces. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 8778b8c61d96..ff6083269162 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2453,8 +2453,8 @@ static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& for (auto& m : sources) { auto& zone = m.first; out << indented{width, "source zone"} << zone << std::endl; - for (auto& s : m.second) { - out << indented{width, "bucket"} << s << std::endl; + for (auto& pipe : m.second.pipes) { + out << indented{width, "bucket"} << *pipe.source.bucket << std::endl; } } diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index d74b33c16f91..e1f6f8a77edc 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -455,19 +455,39 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, int RGWBucketSyncPolicyHandler::init() { -#warning FIXME -#if 0 - const auto& zone_id = zone_svc->get_zone().id; - auto& zg = zone_svc->get_zonegroup(); + flow_mgr->init(*bucket_info.sync_policy); - if (!bucket_info.sync_policy) { - return 0; - } + RGWBucketSyncFlowManager::pipe_set sources_by_name; + RGWBucketSyncFlowManager::pipe_set targets_by_name; + flow_mgr->reflect(bucket_info.bucket, &sources_by_name, &targets_by_name); + + /* convert to zone ids */ + + for (auto& pipe : sources_by_name.pipes) { + if (!pipe.source.zone) { + continue; + } + rgw_sync_bucket_pipe new_pipe = pipe; + string zone_id; - auto& sync_policy = *bucket_info.sync_policy; + if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) { + new_pipe.source.zone = zone_id; + } + sources[*new_pipe.source.zone].pipes.insert(new_pipe); + } + for (auto& pipe : targets_by_name.pipes) { + if (!pipe.dest.zone) { + continue; + } + rgw_sync_bucket_pipe new_pipe = pipe; + string zone_id; + if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) { + new_pipe.dest.zone = zone_id; + } + targets[*new_pipe.dest.zone].pipes.insert(new_pipe); + } return 0; -#endif } bool RGWBucketSyncPolicyHandler::bucket_exports_data() const diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 159cdbb24497..cdded5a9ddf1 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -154,45 +154,23 @@ class RGWBucketSyncPolicyHandler { RGWBucketInfo bucket_info; std::unique_ptr flow_mgr; - std::set source_zones; + map sources; /* source pipes by source zone id */ + map targets; /* target pipes by target zone id */ -public: - struct peer_info { - std::string type; - rgw_bucket bucket; - /* need to have config for other type of sources */ - - bool operator<(const peer_info& si) const { - if (type == si.type) { - return (bucket < si.bucket); - } - return (type < si.type); - } - - bool is_rgw() const { - return (type.empty() || type == "rgw"); - } - - string get_type() const { - if (!type.empty()) { - return type; - } - return "rgw"; - } - - void dump(Formatter *f) const; - }; + bool bucket_is_sync_source() const { + return !targets.empty(); + } -private: - std::map > sources; /* peers by zone */ - std::map > targets; /* peers by zone */ + bool bucket_is_sync_target() const { + return !sources.empty(); + } public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, RGWBucketInfo& _bucket_info); int init(); - std::map >& get_sources() { + const map& get_sources() { return sources; } @@ -200,22 +178,7 @@ public: return bucket_info; } - bool zone_is_source(const string& zone_id) const { - return sources.find(zone_id) != sources.end(); - } - - bool bucket_is_sync_source() const { - return !targets.empty(); - } - - bool bucket_is_sync_target() const { - return !sources.empty(); - } - bool bucket_exports_data() const; bool bucket_imports_data() const; }; -inline ostream& operator<<(ostream& out, const RGWBucketSyncPolicyHandler::peer_info& pi) { - return out << pi.bucket << " (" << pi.get_type() << ")"; -} diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a732804f8a99..43bfcefaf507 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3410,7 +3410,7 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_bucket bucket; - map > *sources; + map *sources; RGWBucketInfo *pbucket_info; rgw_raw_obj sources_obj; @@ -3426,7 +3426,7 @@ class RGWGetBucketSourcePeersCR : public RGWCoroutine { public: RGWGetBucketSourcePeersCR(RGWDataSyncEnv *_sync_env, const rgw_bucket& _bucket, - map > *_sources, + map *_sources, RGWBucketInfo *_pbucket_info, const RGWSyncTraceNodeRef& _tn_parent) : RGWCoroutine(_sync_env->cct), @@ -3486,9 +3486,9 @@ class RGWRunBucketSourcesSyncCR : public RGWCoroutine { rgw_raw_obj sources_obj; - map > sources; - map >::iterator siter; - set::iterator piter; + map sources; + map::iterator siter; + set::iterator piter; rgw_bucket_sync_pair_info sync_pair; @@ -3565,12 +3565,8 @@ int RGWRunBucketSourcesSyncCR::operate() } cur_sc->init(sync_env, conn, siter->first); } - for (piter = siter->second.begin(); piter != siter->second.end(); ++piter) { - if (!piter->is_rgw()) { - continue; - } - - sync_pair.source_bs.bucket = piter->bucket; + for (piter = siter->second.pipes.begin(); piter != siter->second.pipes.end(); ++piter) { + sync_pair.source_bs.bucket = *piter->source.bucket; sync_pair.dest_bs.bucket = bucket_info.bucket; yield spawn(new RGWRunBucketSyncCoroutine(cur_sc, sync_pair, tn), false); diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index d045ae6487d4..f73019f3ec6c 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -1012,12 +1012,6 @@ void rgw_sync_policy_info::decode_json(JSONObj *obj) } } -void RGWBucketSyncPolicyHandler::peer_info::dump(Formatter *f) const -{ - encode_json("type", get_type(), f); - encode_json("bucket", bucket, f); -} - void rgw_obj_key::dump(Formatter *f) const { encode_json("name", name, f);