From: Yehuda Sadeh Date: Fri, 25 Oct 2019 22:47:57 +0000 (-0700) Subject: rgw: modify sync policy handler api X-Git-Tag: v15.1.0~22^2~80 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cf1f6e427f7dd48fa3961a3dc126f6816753465b;p=ceph.git rgw: modify sync policy handler api Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index ecfa33f49162..6d8378db37ed 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2339,16 +2339,16 @@ static int sync_info(std::optional opt_target_zone, std::optionalsvc()->zone->zone_name()); - RGWBucketSyncFlowManager zone_flow(zone_name, nullopt, nullptr); + RGWBucketSyncPolicyHandler zone_policy_handler(RGWBucketSyncPolicyHandler(store->svc()->zone, + store->svc()->sync_modules, + opt_target_zone)); - zone_flow.init(zonegroup.sync_policy); - - RGWBucketSyncFlowManager *flow_mgr = &zone_flow; - - std::optional bucket_flow; + std::unique_ptr bucket_handler; std::optional eff_bucket = opt_bucket; + auto handler = &zone_policy_handler; + if (eff_bucket) { rgw_bucket bucket; RGWBucketInfo bucket_info; @@ -2359,36 +2359,25 @@ static int sync_info(std::optional opt_target_zone, std::optional= 0) { + bucket_handler.reset(handler->alloc_child(bucket_info)); } else { - eff_bucket = bucket_info.bucket; + cerr << "WARNING: bucket not found, simulating result" << std::endl; + bucket_handler.reset(handler->alloc_child(*eff_bucket, nullopt)); } - if (bucket_info.sync_policy) { - policy = (rgw_sync_policy_info *)bucket_info.sync_policy.get(); - bucket_flow.emplace(zone_name, bucket, &zone_flow); - - bucket_flow->init(*policy); - - flow_mgr = &(*bucket_flow); - } + handler = bucket_handler.get(); } - RGWBucketSyncFlowManager::pipe_set sources; - RGWBucketSyncFlowManager::pipe_set dests; + RGWBucketSyncFlowManager::pipe_set *sources; + RGWBucketSyncFlowManager::pipe_set *dests; - flow_mgr->reflect(eff_bucket, &sources, &dests); + handler->get_pipes(&sources, &dests); { Formatter::ObjectSection os(*formatter, "result"); - encode_json("sources", sources, formatter); - encode_json("dests", dests, formatter); + encode_json("sources", *sources, formatter); + encode_json("dests", *dests, formatter); } formatter->flush(cout); diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index e1f6f8a77edc..dfc764d2c0ef 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -446,20 +446,65 @@ void RGWSyncPolicyCompat::convert_old_sync_config(RGWSI_Zone *zone_svc, } RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, - RGWBucketInfo& _bucket_info) : zone_svc(_zone_svc), - bucket_info(_bucket_info) { - flow_mgr.reset(new RGWBucketSyncFlowManager(zone_svc->zone_name(), - bucket_info.bucket, - zone_svc->get_sync_flow_manager())); + RGWSI_SyncModules *sync_modules_svc, + std::optional effective_zone) : zone_svc(_zone_svc) { + zone_name = effective_zone.value_or(zone_svc->zone_name()); + flow_mgr.reset(new RGWBucketSyncFlowManager(zone_name, + nullopt, + nullptr)); + sync_policy = zone_svc->get_zonegroup().sync_policy; + + if (sync_policy.empty()) { + RGWSyncPolicyCompat::convert_old_sync_config(zone_svc, sync_modules_svc, &sync_policy); + } + + init(); +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const RGWBucketInfo& _bucket_info) : parent(_parent), + bucket_info(_bucket_info) { + if (_bucket_info.sync_policy) { + sync_policy = *_bucket_info.sync_policy; + } + bucket = _bucket_info.bucket; + zone_svc = parent->zone_svc; + flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name, + _bucket_info.bucket, + parent->flow_mgr.get())); + init(); +} + +RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const rgw_bucket& _bucket, + std::optional _sync_policy) : parent(_parent) { + if (_sync_policy) { + sync_policy = *_sync_policy; + } + bucket = _bucket; + zone_svc = parent->zone_svc; + flow_mgr.reset(new RGWBucketSyncFlowManager(parent->zone_name, + _bucket, + parent->flow_mgr.get())); + init(); } -int RGWBucketSyncPolicyHandler::init() +RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const RGWBucketInfo& bucket_info) const { - flow_mgr->init(*bucket_info.sync_policy); + return new RGWBucketSyncPolicyHandler(this, bucket_info); +} - RGWBucketSyncFlowManager::pipe_set sources_by_name; - RGWBucketSyncFlowManager::pipe_set targets_by_name; - flow_mgr->reflect(bucket_info.bucket, &sources_by_name, &targets_by_name); +RGWBucketSyncPolicyHandler *RGWBucketSyncPolicyHandler::alloc_child(const rgw_bucket& bucket, + std::optional sync_policy) const +{ + return new RGWBucketSyncPolicyHandler(this, bucket, sync_policy); +} + +void RGWBucketSyncPolicyHandler::init() +{ + flow_mgr->init(sync_policy); + + flow_mgr->reflect(bucket, &sources_by_name, &targets_by_name); /* convert to zone ids */ @@ -467,6 +512,7 @@ int RGWBucketSyncPolicyHandler::init() if (!pipe.source.zone) { continue; } + source_zones.insert(*pipe.source.zone); rgw_sync_bucket_pipe new_pipe = pipe; string zone_id; @@ -479,6 +525,7 @@ int RGWBucketSyncPolicyHandler::init() if (!pipe.dest.zone) { continue; } + target_zones.insert(*pipe.dest.zone); rgw_sync_bucket_pipe new_pipe = pipe; string zone_id; if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) { @@ -486,18 +533,20 @@ int RGWBucketSyncPolicyHandler::init() } targets[*new_pipe.dest.zone].pipes.insert(new_pipe); } - - return 0; } bool RGWBucketSyncPolicyHandler::bucket_exports_data() const { + if (!bucket) { + return false; + } + if (bucket_is_sync_source()) { return true; } return (zone_svc->need_to_log_data() && - bucket_info.datasync_flag_enabled()); + bucket_info->datasync_flag_enabled()); } bool RGWBucketSyncPolicyHandler::bucket_imports_data() const diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index cdded5a9ddf1..c62f4cf2edb4 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -21,6 +21,7 @@ class RGWSI_Zone; class RGWSI_SyncModules; + struct rgw_sync_group_pipe_map; struct rgw_sync_bucket_pipes; struct rgw_sync_policy_info; @@ -107,6 +108,7 @@ public: }; class RGWBucketSyncFlowManager { + friend class RGWBucketSyncPolicyHandler; public: struct pipe_set { std::set pipes; @@ -136,13 +138,14 @@ private: */ void update_flow_maps(const rgw_sync_bucket_pipes& pipe); + void init(const rgw_sync_policy_info& sync_policy); + public: RGWBucketSyncFlowManager(const string& _zone_name, std::optional _bucket, const RGWBucketSyncFlowManager *_parent); - void init(const rgw_sync_policy_info& sync_policy); void reflect(std::optional effective_bucket, pipe_set *flow_by_source, pipe_set *flow_by_dest) const; @@ -150,13 +153,23 @@ public: }; class RGWBucketSyncPolicyHandler { + const RGWBucketSyncPolicyHandler *parent{nullptr}; RGWSI_Zone *zone_svc; - RGWBucketInfo bucket_info; + string zone_name; + std::optional bucket_info; + std::optional bucket; std::unique_ptr flow_mgr; + rgw_sync_policy_info sync_policy; + + RGWBucketSyncFlowManager::pipe_set sources_by_name; + RGWBucketSyncFlowManager::pipe_set targets_by_name; map sources; /* source pipes by source zone id */ map targets; /* target pipes by target zone id */ + std::set source_zones; /* source zones by name */ + std::set target_zones; /* target zones by name */ + bool bucket_is_sync_source() const { return !targets.empty(); } @@ -165,19 +178,44 @@ class RGWBucketSyncPolicyHandler { return !sources.empty(); } + void init(); + + RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const RGWBucketInfo& _bucket_info); + + RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicyHandler *_parent, + const rgw_bucket& _bucket, + std::optional _sync_policy); public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, - RGWBucketInfo& _bucket_info); - int init(); + RGWSI_SyncModules *sync_modules_svc, + std::optional effective_zone = std::nullopt); + + RGWBucketSyncPolicyHandler *alloc_child(const RGWBucketInfo& bucket_info) const; + RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket, + std::optional sync_policy) const; + + const std::set& get_source_zones() const { + return source_zones; + } + + const std::set& get_target_zones() const { + return target_zones; + } const map& get_sources() { return sources; } - const RGWBucketInfo& get_bucket_info() const { + const std::optional& get_bucket_info() const { return bucket_info; } + void get_pipes(RGWBucketSyncFlowManager::pipe_set **sources, RGWBucketSyncFlowManager::pipe_set **targets) { /* return raw pipes (with zone name) */ + *sources = &sources_by_name; + *targets = &targets_by_name; + } + bool bucket_exports_data() const; bool bucket_imports_data() const; }; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 4bba4bdac4b8..730b9a342eb5 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3470,7 +3470,10 @@ int RGWGetBucketSourcePeersCR::operate() auto& handler = policy->policy_handler; *sources = handler->get_sources(); - *pbucket_info = handler->get_bucket_info(); + auto& binfo = handler->get_bucket_info(); + if (binfo) { + *pbucket_info = *binfo; + } } return set_cr_done(); diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index c7c2604c8774..550fa83e2e48 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -62,13 +62,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, } bucket_sync_policy_cache_entry e; - e.handler.reset(new RGWBucketSyncPolicyHandler(svc.zone, bucket_info)); - - r = e.handler->init(); - if (r < 0) { - ldout(cct, 0) << "ERROR: RGWBucketSyncPolicyHandler::init() returned r=" << r << dendl; - return r; - } + e.handler.reset(svc.zone->get_sync_policy_handler()->alloc_child(bucket_info)); if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc index 5af307c9caff..aba47b27ec72 100644 --- a/src/rgw/services/svc_zone.cc +++ b/src/rgw/services/svc_zone.cc @@ -38,7 +38,7 @@ void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc, RGWSI_Zone::~RGWSI_Zone() { - delete sync_flow_mgr; + delete sync_policy_handler; delete realm; delete zonegroup; delete zone_public_config; @@ -153,36 +153,17 @@ int RGWSI_Zone::do_start() zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id()); - sync_flow_mgr = new RGWBucketSyncFlowManager(zone_params->get_name(), - nullopt, - nullptr); - - rgw_sync_policy_info sync_policy = zonegroup->sync_policy; - - if (sync_policy.empty()) { - RGWSyncPolicyCompat::convert_old_sync_config(this, sync_modules_svc, &sync_policy); - } - - sync_flow_mgr->init(sync_policy); - - RGWBucketSyncFlowManager::pipe_set zone_sources; - RGWBucketSyncFlowManager::pipe_set zone_targets; - - sync_flow_mgr->reflect(nullopt, &zone_sources, &zone_targets); + sync_policy_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc); set source_zones_by_name; set target_zones_by_name; - for (auto& pipe : zone_sources.pipes) { - if (pipe.source.zone) { - source_zones_by_name.insert(*pipe.source.zone); - } + for (auto& zone_name : sync_policy_handler->get_source_zones()) { + source_zones_by_name.insert(zone_name); } - for (auto& pipe : zone_targets.pipes) { - if (pipe.dest.zone) { - target_zones_by_name.insert(*pipe.dest.zone); - } + for (auto& zone_name : sync_policy_handler->get_target_zones()) { + target_zones_by_name.insert(zone_name); } ret = sync_modules_svc->start(); diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h index 168b9102e296..b4f53e868361 100644 --- a/src/rgw/services/svc_zone.h +++ b/src/rgw/services/svc_zone.h @@ -17,10 +17,12 @@ class RGWZoneParams; class RGWPeriod; class RGWZonePlacementInfo; -class RGWBucketSyncFlowManager; +class RGWBucketSyncPolicyHandler; class RGWRESTConn; +struct rgw_sync_policy_info; + class RGWSI_Zone : public RGWServiceInstance { friend struct RGWServices_Def; @@ -37,7 +39,7 @@ class RGWSI_Zone : public RGWServiceInstance uint32_t zone_short_id{0}; bool writeable_zone{false}; - RGWBucketSyncFlowManager *sync_flow_mgr{nullptr}; + RGWBucketSyncPolicyHandler *sync_policy_handler{nullptr}; RGWRESTConn *rest_master_conn{nullptr}; map zone_conn_map; @@ -48,6 +50,8 @@ class RGWSI_Zone : public RGWServiceInstance map zone_id_by_name; map zone_by_id; + std::unique_ptr sync_policy; + void init(RGWSI_SysObj *_sysobj_svc, RGWSI_RADOS *_rados_svc, RGWSI_SyncModules *_sync_modules_svc); @@ -71,8 +75,8 @@ public: int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const; const RGWZone& get_zone() const; - const RGWBucketSyncFlowManager *get_sync_flow_manager() const { - return sync_flow_mgr; + const RGWBucketSyncPolicyHandler *get_sync_policy_handler() const { + return sync_policy_handler; } const string& zone_name() const;