From 801244af85a471ea14dd6943a055843d0438e079 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 24 Oct 2019 14:13:38 -0700 Subject: [PATCH] rgw: sync flow: init zone level sync flow manager Hold the zone level sync flow manager in the zone svc. Convert old zonegroup sync config into new sync policy structure. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 116 +++++++++++++++++++---------------- src/rgw/rgw_bucket_sync.h | 18 ++++-- src/rgw/services/svc_zone.cc | 9 +++ src/rgw/services/svc_zone.h | 8 +++ 4 files changed, 92 insertions(+), 59 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index f910b016c74..d74b33c16f9 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -343,7 +343,7 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { void RGWBucketSyncFlowManager::reflect(std::optional effective_bucket, RGWBucketSyncFlowManager::pipe_set *source_pipes, - RGWBucketSyncFlowManager::pipe_set *dest_pipes) + RGWBucketSyncFlowManager::pipe_set *dest_pipes) const { rgw_sync_bucket_entity entity; @@ -392,75 +392,82 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke RGWBucketSyncFlowManager::RGWBucketSyncFlowManager(const string& _zone_name, std::optional _bucket, - RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name), - bucket(_bucket), - parent(_parent) {} + const RGWBucketSyncFlowManager *_parent) : zone_name(_zone_name), + bucket(_bucket), + parent(_parent) {} -int RGWBucketSyncPolicyHandler::init() +void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, + RGWSI_SyncModules *sync_modules_svc, + rgw_sync_policy_info *ppolicy) { -#warning FIXME -#if 0 - const auto& zone_id = zone_svc->get_zone().id; - auto& zg = zone_svc->get_zonegroup(); + bool found = false; - if (!bucket_info.sync_policy) { - return 0; - } + rgw_sync_policy_info policy; - auto& sync_policy = *bucket_info.sync_policy; + auto& group = policy.groups["default"]; + auto& zonegroup = zone_svc->get_zonegroup(); + + for (const auto& ziter1 : zonegroup.zones) { + const string& id1 = ziter1.first; + const RGWZone& z1 = ziter1.second; + + for (const auto& ziter2 : zonegroup.zones) { + const string& id2 = ziter2.first; + const RGWZone& z2 = ziter2.second; - if (sync_policy.dests) { - for (auto& dest : *sync_policy.dests) { - if (!(dest.bucket || *dest.bucket == bucket_info.bucket)) { + if (id1 == id2) { continue; } - if (dest.zones.find("*") == dest.zones.end() && - dest.zones.find(zone_id) == dest.zones.end()) { - continue; + if (z1.syncs_from(z2.name)) { + found = true; + rgw_sync_directional_rule *rule; + group.data_flow.find_directional(z2.name, z1.name, true, &rule); } + } + } - if (dest.flow_rules) { - /* populate trivial peers */ - for (auto& rule : *dest.flow_rules) { - set source_zones; - set dest_zones; - rule.get_zone_peers(zone_id, &source_zones, &dest_zones); - - for (auto& sz : source_zones) { - peer_info sinfo; - sinfo.bucket = bucket_info.bucket; - sources[sz].insert(sinfo); - } + if (!found) { /* nothing syncs */ + return; + } - for (auto& tz : dest_zones) { - peer_info tinfo; - tinfo.bucket = bucket_info.bucket; - dests[tz].insert(tinfo); - } - } - } + rgw_sync_bucket_pipes pipes; + pipes.id = "all"; + pipes.source.all_zones = true; + pipes.dest.all_zones = true; - /* non trivial sources */ - for (auto& source : dest.sources) { - if (!source.bucket || - *source.bucket == bucket_info.bucket) { - if ((source.type.empty() || source.type == "rgw") && - source.zone && - source.bucket) { - peer_info sinfo; - sinfo.type = source.type; - sinfo.bucket = *source.bucket; - sources[*source.zone].insert(sinfo); - } - } - } - } + group.pipes.emplace_back(std::move(pipes)); + + + group.status = rgw_sync_policy_group::Status::ENABLED; + + *ppolicy = std::move(policy); +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, + RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc), + bucket_info(_bucket_info) { + flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->zone_name(), + bucket_info.bucket, + zone_svc->get_sync_flow_manager())); +} + +int RGWBucketSyncPolicyHandler::init() +{ +#warning FIXME +#if 0 + const auto& zone_id = zone_svc->get_zone().id; + auto& zg = zone_svc->get_zonegroup(); + + if (!bucket_info.sync_policy) { + return 0; } -#endif + + auto& sync_policy = *bucket_info.sync_policy; return 0; +#endif } bool RGWBucketSyncPolicyHandler::bucket_exports_data() const @@ -477,3 +484,4 @@ bool RGWBucketSyncPolicyHandler::bucket_imports_data() const { return bucket_is_sync_target(); } + diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 7e705c030d5..159cdbb2449 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -20,6 +20,7 @@ #include "rgw_sync_policy.h" class RGWSI_Zone; +class RGWSI_SyncModules; struct rgw_sync_group_pipe_map; struct rgw_sync_bucket_pipes; struct rgw_sync_policy_info; @@ -98,6 +99,13 @@ struct rgw_sync_group_pipe_map { std::optional dest_bucket) const; }; +class RGWSyncPolicyCompat { +public: + static void convert_old_sync_config(RGWSI_Zone *zone_svc, + RGWSI_SyncModules *sync_modules_svc, + rgw_sync_policy_info *ppolicy); +}; + class RGWBucketSyncFlowManager { public: struct pipe_set { @@ -111,7 +119,7 @@ private: string zone_name; std::optional bucket; - RGWBucketSyncFlowManager *parent{nullptr}; + const RGWBucketSyncFlowManager *parent{nullptr}; map flow_groups; @@ -132,18 +140,19 @@ public: RGWBucketSyncFlowManager(const string& _zone_name, std::optional _bucket, - RGWBucketSyncFlowManager *_parent); + const RGWBucketSyncFlowManager *_parent); void init(const rgw_sync_policy_info& sync_policy); void reflect(std::optional effective_bucket, pipe_set *flow_by_source, - pipe_set *flow_by_dest); + pipe_set *flow_by_dest) const; }; class RGWBucketSyncPolicyHandler { RGWSI_Zone *zone_svc; RGWBucketInfo bucket_info; + std::unique_ptr flow_mgr; std::set source_zones; @@ -180,8 +189,7 @@ private: public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, - RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc), - bucket_info(_bucket_info) {} + RGWBucketInfo& _bucket_info); int init(); std::map >& get_sources() { diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc index 7cccd99cb26..15fea6b9968 100644 --- a/src/rgw/services/svc_zone.cc +++ b/src/rgw/services/svc_zone.cc @@ -8,6 +8,7 @@ #include "rgw/rgw_zone.h" #include "rgw/rgw_rest_conn.h" +#include "rgw/rgw_bucket_sync.h" #include "common/errno.h" #include "include/random.h" @@ -37,6 +38,7 @@ void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc, RGWSI_Zone::~RGWSI_Zone() { + delete sync_flow_mgr; delete realm; delete zonegroup; delete zone_public_config; @@ -151,6 +153,13 @@ int RGWSI_Zone::do_start() zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id()); + sync_flow_mgr = new RGWBucketSyncFlowManager(zone_params->get_name(), + nullopt, + nullptr); + + sync_flow_mgr->init(zonegroup->sync_policy); + + ret = sync_modules_svc->start(); if (ret < 0) { return ret; diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h index 19ebd4f19da..168b9102e29 100644 --- a/src/rgw/services/svc_zone.h +++ b/src/rgw/services/svc_zone.h @@ -17,6 +17,8 @@ class RGWZoneParams; class RGWPeriod; class RGWZonePlacementInfo; +class RGWBucketSyncFlowManager; + class RGWRESTConn; class RGWSI_Zone : public RGWServiceInstance @@ -35,6 +37,8 @@ class RGWSI_Zone : public RGWServiceInstance uint32_t zone_short_id{0}; bool writeable_zone{false}; + RGWBucketSyncFlowManager *sync_flow_mgr{nullptr}; + RGWRESTConn *rest_master_conn{nullptr}; map zone_conn_map; std::vector data_sync_source_zones; @@ -67,6 +71,10 @@ public: int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const; const RGWZone& get_zone() const; + const RGWBucketSyncFlowManager *get_sync_flow_manager() const { + return sync_flow_mgr; + } + const string& zone_name() const; const string& zone_id() const; uint32_t get_zone_short_id() const; -- 2.39.5