From c185ec8c20470bd769d38a5fb9c1f2fb19f1e058 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 31 Oct 2019 19:19:58 -0700 Subject: [PATCH] rgw: svc.zone: keep policy handler for all zones So that we can use the correct policy handler when checking source bucket. Also build list of source zones for data sync out of all potential zones, and not just the ones that are enabled, because it can be enabled at the bucket level. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 27 +++++----- src/rgw/rgw_bucket.cc | 9 ++-- src/rgw/rgw_bucket.h | 3 +- src/rgw/rgw_bucket_sync.cc | 65 ++++++++++++++++++++---- src/rgw/rgw_bucket_sync.h | 11 +++- src/rgw/rgw_cr_tools.cc | 3 +- src/rgw/rgw_cr_tools.h | 3 +- src/rgw/rgw_data_sync.cc | 6 ++- src/rgw/services/svc_bucket_sync.h | 3 +- src/rgw/services/svc_bucket_sync_sobj.cc | 32 ++++++++---- src/rgw/services/svc_bucket_sync_sobj.h | 3 +- src/rgw/services/svc_zone.cc | 30 +++++++---- src/rgw/services/svc_zone.h | 7 ++- 13 files changed, 146 insertions(+), 56 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 7771b0a85a8b8..1c5d5b574b8df 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2333,21 +2333,24 @@ void encode_json(const char *name, const RGWBucketSyncFlowManager::pipe_set& pse static int sync_info(std::optional opt_target_zone, std::optional opt_bucket, Formatter *formatter) { - const RGWRealm& realm = store->svc()->zone->get_realm(); - const RGWZoneGroup& zonegroup = store->svc()->zone->get_zonegroup(); - const RGWZone& zone = store->svc()->zone->get_zone(); + std::optional zone_id; - string zone_name = opt_target_zone.value_or(store->svc()->zone->zone_name()); + if (opt_target_zone) { + string zid; + if (!store->svc()->zone->find_zone_id_by_name(*opt_target_zone, &zid)) { + cerr << "WARNING: cannot find zone id for zone=" << *opt_target_zone << std::endl; + return -ENOENT; + } + zone_id = zid; + } - RGWBucketSyncPolicyHandler zone_policy_handler(RGWBucketSyncPolicyHandler(store->svc()->zone, - store->svc()->sync_modules, - opt_target_zone)); + auto zone_policy_handler = store->svc()->zone->get_sync_policy_handler(zone_id); - std::unique_ptr bucket_handler; + RGWBucketSyncPolicyHandlerRef bucket_handler; std::optional eff_bucket = opt_bucket; - auto handler = &zone_policy_handler; + auto handler = zone_policy_handler; if (eff_bucket) { rgw_bucket bucket; @@ -2366,7 +2369,7 @@ static int sync_info(std::optional opt_target_zone, std::optionalalloc_child(*eff_bucket, nullopt)); } - handler = bucket_handler.get(); + handler = bucket_handler; } RGWBucketSyncFlowManager::pipe_set *sources; @@ -2405,7 +2408,7 @@ static int bucket_sync_info(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& RGWBucketSyncPolicyHandlerRef handler; - int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield); + int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield); if (r < 0) { lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl; return r; @@ -2445,7 +2448,7 @@ static int bucket_sync_status(rgw::sal::RGWRadosStore *store, const RGWBucketInf RGWBucketSyncPolicyHandlerRef handler; - int r = store->ctl()->bucket->get_sync_policy_handler(info.bucket, &handler, null_yield); + int r = store->ctl()->bucket->get_sync_policy_handler(std::nullopt, info.bucket, &handler, null_yield); if (r < 0) { lderr(store->ctx()) << "ERROR: failed to get policy handler for bucket (" << info.bucket << "): r=" << r << ": " << cpp_strerror(-r) << dendl; return r; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index cbbf015ee1dad..ff2829b60696f 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -3662,12 +3662,13 @@ int RGWBucketCtl::sync_user_stats(const rgw_user& user_id, return ctl.user->flush_bucket_stats(user_id, *pent); } -int RGWBucketCtl::get_sync_policy_handler(const rgw_bucket& bucket, +int RGWBucketCtl::get_sync_policy_handler(std::optional zone, + std::optional bucket, RGWBucketSyncPolicyHandlerRef *phandler, optional_yield y) { int r = call([&](RGWSI_Bucket_X_Ctx& ctx) { - return svc.bucket_sync->get_policy_handler(ctx.bi, bucket, phandler, y); + return svc.bucket_sync->get_policy_handler(ctx.bi, zone, bucket, phandler, y); }); if (r < 0) { ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl; @@ -3682,7 +3683,7 @@ int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket, RGWBucketSyncPolicyHandlerRef handler; - int r = get_sync_policy_handler(bucket, &handler, y); + int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y); if (r < 0) { return r; } @@ -3696,7 +3697,7 @@ int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket, RGWBucketSyncPolicyHandlerRef handler; - int r = get_sync_policy_handler(bucket, &handler, y); + int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y); if (r < 0) { return r; } diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index 6c22a9b413b1e..520a1ad80ce8d 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -870,7 +870,8 @@ public: RGWBucketEnt* pent = nullptr); /* bucket sync */ - int get_sync_policy_handler(const rgw_bucket& bucket, + int get_sync_policy_handler(std::optional zone, + std::optional bucket, RGWBucketSyncPolicyHandlerRef *phandler, optional_yield y); int bucket_exports_data(const rgw_bucket& bucket, diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index dfc764d2c0efb..184a31becff06 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -343,7 +343,8 @@ void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) { void RGWBucketSyncFlowManager::reflect(std::optional effective_bucket, RGWBucketSyncFlowManager::pipe_set *source_pipes, - RGWBucketSyncFlowManager::pipe_set *dest_pipes) const + RGWBucketSyncFlowManager::pipe_set *dest_pipes, + bool only_enabled) const { rgw_sync_bucket_entity entity; @@ -351,14 +352,15 @@ void RGWBucketSyncFlowManager::reflect(std::optional effective_bucke entity.bucket = effective_bucket.value_or(rgw_bucket()); if (parent) { - parent->reflect(effective_bucket, source_pipes, dest_pipes); + parent->reflect(effective_bucket, source_pipes, dest_pipes, only_enabled); } for (auto& item : flow_groups) { auto& flow_group_map = item.second; /* only return enabled groups */ - if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED) { + if (flow_group_map.status != rgw_sync_policy_group::Status::ENABLED && + (only_enabled || flow_group_map.status != rgw_sync_policy_group::Status::ALLOWED)) { continue; } @@ -504,34 +506,75 @@ void RGWBucketSyncPolicyHandler::init() { flow_mgr->init(sync_policy); - flow_mgr->reflect(bucket, &sources_by_name, &targets_by_name); + reflect(&sources_by_name, + &targets_by_name, + &sources, + &targets, + &source_zones, + &target_zones, + true); +} + +void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name, + RGWBucketSyncFlowManager::pipe_set *ptargets_by_name, + map *psources, + map *ptargets, + std::set *psource_zones, + std::set *ptarget_zones, + bool only_enabled) const +{ + RGWBucketSyncFlowManager::pipe_set _sources_by_name; + RGWBucketSyncFlowManager::pipe_set _targets_by_name; + map _sources; + map _targets; + std::set _source_zones; + std::set _target_zones; - /* convert to zone ids */ + flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled); - for (auto& pipe : sources_by_name.pipes) { + for (auto& pipe : _sources_by_name.pipes) { if (!pipe.source.zone) { continue; } - source_zones.insert(*pipe.source.zone); + _source_zones.insert(*pipe.source.zone); rgw_sync_bucket_pipe new_pipe = pipe; string zone_id; if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) { new_pipe.source.zone = zone_id; } - sources[*new_pipe.source.zone].pipes.insert(new_pipe); + _sources[*new_pipe.source.zone].pipes.insert(new_pipe); } - for (auto& pipe : targets_by_name.pipes) { + for (auto& pipe : _targets_by_name.pipes) { if (!pipe.dest.zone) { continue; } - target_zones.insert(*pipe.dest.zone); + _target_zones.insert(*pipe.dest.zone); rgw_sync_bucket_pipe new_pipe = pipe; string zone_id; if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) { new_pipe.dest.zone = zone_id; } - targets[*new_pipe.dest.zone].pipes.insert(new_pipe); + _targets[*new_pipe.dest.zone].pipes.insert(new_pipe); + } + + if (psources_by_name) { + *psources_by_name = std::move(_sources_by_name); + } + if (ptargets_by_name) { + *ptargets_by_name = std::move(_targets_by_name); + } + if (psources) { + *psources = std::move(_sources); + } + if (ptargets) { + *ptargets = std::move(_targets); + } + if (psource_zones) { + *psource_zones = std::move(_source_zones); + } + if (ptarget_zones) { + *ptarget_zones = std::move(_target_zones); } } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 17cd4289e98e8..9a0c447ace1c8 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -166,7 +166,8 @@ public: void reflect(std::optional effective_bucket, pipe_set *flow_by_source, - pipe_set *flow_by_dest) const; + pipe_set *flow_by_dest, + bool only_enabled) const; }; @@ -213,6 +214,14 @@ public: RGWBucketSyncPolicyHandler *alloc_child(const rgw_bucket& bucket, std::optional sync_policy) const; + void reflect(RGWBucketSyncFlowManager::pipe_set *psources_by_name, + RGWBucketSyncFlowManager::pipe_set *ptargets_by_name, + map *psources, + map *ptargets, + std::set *psource_zones, + std::set *ptarget_zones, + bool only_enabled) const; + const std::set& get_source_zones() const { return source_zones; } diff --git a/src/rgw/rgw_cr_tools.cc b/src/rgw/rgw_cr_tools.cc index fe9e355bc3949..9ea45c3712787 100644 --- a/src/rgw/rgw_cr_tools.cc +++ b/src/rgw/rgw_cr_tools.cc @@ -279,7 +279,8 @@ int RGWBucketGetSyncPolicyHandlerCR::Request::_send_request() { CephContext *cct = store->ctx(); - int r = store->ctl()->bucket->get_sync_policy_handler(params.bucket, + int r = store->ctl()->bucket->get_sync_policy_handler(params.zone, + params.bucket, &result->policy_handler, null_yield); if (r < 0) { diff --git a/src/rgw/rgw_cr_tools.h b/src/rgw/rgw_cr_tools.h index 33cc27b7a0615..ccbd4a4d0ffd2 100644 --- a/src/rgw/rgw_cr_tools.h +++ b/src/rgw/rgw_cr_tools.h @@ -76,7 +76,8 @@ struct rgw_bucket_lifecycle_config_params { using RGWBucketLifecycleConfigCR = RGWSimpleWriteOnlyAsyncCR; struct rgw_bucket_get_sync_policy_params { - rgw_bucket bucket; + std::optional zone; + std::optional bucket; }; struct rgw_bucket_get_sync_policy_result { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index dce0c03c60053..d0011f848c25a 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1070,8 +1070,9 @@ public: if (!binfo || binfo->bucket != *e.bucket) { bucket_info.bucket = *e.bucket; + } else { + set_bucket_info(*binfo); } - set_bucket_info(*binfo); } void update_empty_bucket_info(const std::map& buckets_info) { @@ -3817,6 +3818,7 @@ int RGWRunBucketSourcesSyncCR::operate() } else { sync_pair.source_bs.bucket = siter->source.get_bucket(); } + sync_pair.dest_bs.bucket = siter->target.get_bucket(); if (sync_pair.source_bs.shard_id >= 0) { num_shards = 1; @@ -3991,6 +3993,7 @@ int RGWGetBucketPeersCR::operate() return set_cr_error(retcode); } + get_policy_params.zone = nullopt; get_policy_params.bucket = *target_bucket; yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, @@ -4012,6 +4015,7 @@ int RGWGetBucketPeersCR::operate() return set_cr_error(retcode); } + get_policy_params.zone = source_zone; get_policy_params.bucket = *source_bucket; yield call(new RGWBucketGetSyncPolicyHandlerCR(sync_env->async_rados, sync_env->store, diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h index 51c88789a247f..1f2a93d2c5380 100644 --- a/src/rgw/services/svc_bucket_sync.h +++ b/src/rgw/services/svc_bucket_sync.h @@ -31,7 +31,8 @@ public: RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {} virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, - const rgw_bucket& bucket, + std::optional zone, + std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) = 0; }; diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index 550fa83e2e480..71e364be35a2f 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -29,26 +29,41 @@ int RGWSI_Bucket_Sync_SObj::do_start() } int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, - const rgw_bucket& bucket, + std::optional zone, + std::optional _bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) { - string key = RGWSI_Bucket::get_bi_meta_key(bucket); - string cache_key("bi/"); - cache_key.append(key); + if (!_bucket) { + *handler = svc.zone->get_sync_policy_handler(zone); + return 0; + } + + auto& bucket = *_bucket; + + string zone_key; + string bucket_key; + + if (zone && *zone != svc.zone->zone_id()) { + zone_key = *zone; + } + + bucket_key = RGWSI_Bucket::get_bi_meta_key(bucket); + + string cache_key("bi/" + zone_key + "/" + bucket_key); if (auto e = sync_policy_cache->find(cache_key)) { *handler = e->handler; return 0; } - + bucket_sync_policy_cache_entry e; rgw_cache_entry_info cache_info; RGWBucketInfo bucket_info; int r = svc.bucket_sobj->read_bucket_instance_info(ctx, - key, + bucket_key, &bucket_info, nullptr, nullptr, @@ -56,13 +71,12 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, &cache_info); if (r < 0) { if (r != -ENOENT) { - ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << key << ") returned r=" << r << dendl; + ldout(cct, 0) << "ERROR: svc.bucket->read_bucket_instance_info(key=" << bucket_key << ") returned r=" << r << dendl; } return r; } - bucket_sync_policy_cache_entry e; - e.handler.reset(svc.zone->get_sync_policy_handler()->alloc_child(bucket_info)); + e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info)); if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h index 5742168d5cbeb..967cb98770803 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.h +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -56,7 +56,8 @@ public: int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, - const rgw_bucket& bucket, + std::optional zone, + std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, optional_yield y) override; }; diff --git a/src/rgw/services/svc_zone.cc b/src/rgw/services/svc_zone.cc index aba47b27ec727..c86d28c591f6c 100644 --- a/src/rgw/services/svc_zone.cc +++ b/src/rgw/services/svc_zone.cc @@ -38,7 +38,6 @@ void RGWSI_Zone::init(RGWSI_SysObj *_sysobj_svc, RGWSI_Zone::~RGWSI_Zone() { - delete sync_policy_handler; delete realm; delete zonegroup; delete zone_public_config; @@ -46,6 +45,17 @@ RGWSI_Zone::~RGWSI_Zone() delete current_period; } +std::shared_ptr RGWSI_Zone::get_sync_policy_handler(std::optional zone) const { + if (!zone || *zone == zone_id()) { + return sync_policy_handler; + } + auto iter = sync_policy_handlers.find(*zone); + if (iter == sync_policy_handlers.end()) { + return std::shared_ptr(); + } + return iter->second; +} + bool RGWSI_Zone::zone_syncs_from(const RGWZone& target_zone, const RGWZone& source_zone) const { return target_zone.syncs_from(source_zone.name) && @@ -153,18 +163,20 @@ int RGWSI_Zone::do_start() zone_short_id = current_period->get_map().get_zone_short_id(zone_params->get_id()); - sync_policy_handler = new RGWBucketSyncPolicyHandler(this, sync_modules_svc); + for (auto ziter : zonegroup->zones) { + sync_policy_handlers[ziter.second.id].reset(new RGWBucketSyncPolicyHandler(this, sync_modules_svc, ziter.second.name)); + } + + sync_policy_handler = sync_policy_handlers[zone_id()]; /* we made sure earlier that zonegroup->zones has our zone */ set source_zones_by_name; set target_zones_by_name; - for (auto& zone_name : sync_policy_handler->get_source_zones()) { - source_zones_by_name.insert(zone_name); - } - - for (auto& zone_name : sync_policy_handler->get_target_zones()) { - target_zones_by_name.insert(zone_name); - } + sync_policy_handler->reflect(nullptr, nullptr, + nullptr, nullptr, + &source_zones_by_name, + &target_zones_by_name, + false); /* relaxed: also get all zones that we allow to sync to/from */ ret = sync_modules_svc->start(); if (ret < 0) { diff --git a/src/rgw/services/svc_zone.h b/src/rgw/services/svc_zone.h index b4f53e868361c..50403343d36fa 100644 --- a/src/rgw/services/svc_zone.h +++ b/src/rgw/services/svc_zone.h @@ -39,7 +39,8 @@ class RGWSI_Zone : public RGWServiceInstance uint32_t zone_short_id{0}; bool writeable_zone{false}; - RGWBucketSyncPolicyHandler *sync_policy_handler{nullptr}; + std::shared_ptr sync_policy_handler; + std::map > sync_policy_handlers; RGWRESTConn *rest_master_conn{nullptr}; map zone_conn_map; @@ -75,9 +76,7 @@ public: int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) const; const RGWZone& get_zone() const; - const RGWBucketSyncPolicyHandler *get_sync_policy_handler() const { - return sync_policy_handler; - } + std::shared_ptr get_sync_policy_handler(std::optional zone = nullopt) const; const string& zone_name() const; const string& zone_id() const; -- 2.39.5