From a8eb80db2b06423ed81ee19bc45c743ab1095236 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 13 Nov 2019 19:26:33 -0800 Subject: [PATCH] rgw: sync: resolve hints when initializing policy handler Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw_types.cc | 5 +- src/cls/rgw/cls_rgw_types.h | 1 - src/rgw/rgw_bucket.cc | 2 +- src/rgw/rgw_bucket_sync.h | 17 +++ src/rgw/services/svc_bucket_sync.h | 2 +- src/rgw/services/svc_bucket_sync_sobj.cc | 125 +++++++++++++++++++++-- src/rgw/services/svc_bucket_sync_sobj.h | 41 +++++++- 7 files changed, 177 insertions(+), 16 deletions(-) diff --git a/src/cls/rgw/cls_rgw_types.cc b/src/cls/rgw/cls_rgw_types.cc index 6dcb75bf32c..a2d4580cbaf 100644 --- a/src/cls/rgw/cls_rgw_types.cc +++ b/src/cls/rgw/cls_rgw_types.cc @@ -64,10 +64,7 @@ bool rgw_zone_set::exists(const string& zone, std::optional location_key void encode_json(const char *name, const rgw_zone_set& zs, ceph::Formatter *f) { - Formatter::ArraySection as(*f, name); - for (auto& e : zs.entries) { - encode_json("entry", e, f); - } + encode_json(name, zs.entries, f); } void decode_json_obj(rgw_zone_set& zs, JSONObj *obj) diff --git a/src/cls/rgw/cls_rgw_types.h b/src/cls/rgw/cls_rgw_types.h index 8cc5c823bfc..db8a3bcd18a 100644 --- a/src/cls/rgw/cls_rgw_types.h +++ b/src/cls/rgw/cls_rgw_types.h @@ -67,7 +67,6 @@ struct rgw_zone_set { /* no DECODE_START, DECODE_END for backward compatibility */ ceph::decode(entries, bl); } - void decode_json(JSONObj *obj); void insert(const string& zone, std::optional location_key); bool exists(const string& zone, std::optional location_key) const; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index a433b30ba18..4086f0c858b 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -3669,7 +3669,7 @@ int RGWBucketCtl::get_sync_policy_handler(std::optional zone, optional_yield y) { int r = call([&](RGWSI_Bucket_X_Ctx& ctx) { - return svc.bucket_sync->get_policy_handler(ctx.bi, zone, bucket, phandler, y); + return svc.bucket_sync->get_policy_handler(ctx, zone, bucket, phandler, y); }); if (r < 0) { ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl; diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 23446bffadd..8f8ea1cb32d 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -277,6 +277,9 @@ class RGWBucketSyncPolicyHandler { std::set source_hints; std::set target_hints; + std::set resolved_sources; + std::set resolved_dests; + bool bucket_is_sync_source() const { return !targets.empty(); @@ -312,6 +315,20 @@ public: std::set *ptarget_zones, bool only_enabled) const; + void set_resolved_hints(std::set&& _resolved_sources, + std::set&& _resolved_dests) { + resolved_sources = std::move(_resolved_sources); + resolved_dests = std::move(_resolved_dests); + } + + const std::set& get_resolved_source_hints() { + return resolved_sources; + } + + const std::set& get_resolved_dest_hints() { + return resolved_dests; + } + const std::set& get_source_zones() const { return source_zones; } diff --git a/src/rgw/services/svc_bucket_sync.h b/src/rgw/services/svc_bucket_sync.h index 250e874f006..c0d6bdd9a8a 100644 --- a/src/rgw/services/svc_bucket_sync.h +++ b/src/rgw/services/svc_bucket_sync.h @@ -30,7 +30,7 @@ class RGWSI_Bucket_Sync : public RGWServiceInstance public: RGWSI_Bucket_Sync(CephContext *cct) : RGWServiceInstance(cct) {} - virtual int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, + virtual int get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, std::optional zone, std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, diff --git a/src/rgw/services/svc_bucket_sync_sobj.cc b/src/rgw/services/svc_bucket_sync_sobj.cc index 0f488c1cc3e..31fa3ca1539 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.cc +++ b/src/rgw/services/svc_bucket_sync_sobj.cc @@ -5,6 +5,7 @@ #include "rgw/rgw_bucket_sync.h" #include "rgw/rgw_zone.h" +#include "rgw/rgw_sync_policy.h" #define dout_subsys ceph_subsys_rgw @@ -66,12 +67,97 @@ int RGWSI_Bucket_Sync_SObj::do_start() return 0; } - -int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, - std::optional zone, - std::optional _bucket, - RGWBucketSyncPolicyHandlerRef *handler, +void RGWSI_Bucket_Sync_SObj::get_hint_entities(RGWSI_Bucket_X_Ctx& ctx, + const std::set& zone_names, + const std::set& buckets, + std::set *hint_entities, optional_yield y) +{ + for (auto& zone : zone_names) { + for (auto& b : buckets) { + string zid; + if (!svc.zone->find_zone_id_by_name(zone, &zid)) { + cerr << "WARNING: cannot find zone id for zone=" << zone << ", skippping" << std::endl; + continue; + } + + RGWBucketInfo hint_bucket_info; + int ret = svc.bucket_sobj->read_bucket_info(ctx, b, &hint_bucket_info, + nullptr, nullptr, boost::none, + y); + if (ret < 0) { + ldout(cct, 20) << "could not init bucket info for hint bucket=" << b << " ... skipping" << dendl; + continue; + } + + hint_entities->insert(rgw_sync_bucket_entity(zid, hint_bucket_info.bucket)); + } + } +} + +int RGWSI_Bucket_Sync_SObj::resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx, + rgw_sync_bucket_entity& self_entity, + RGWBucketSyncPolicyHandlerRef& handler, + RGWBucketSyncPolicyHandlerRef& zone_policy_handler, + std::map& temp_map, + optional_yield y) +{ + set source_zones; + set target_zones; + + zone_policy_handler->reflect(nullptr, nullptr, + nullptr, nullptr, + &source_zones, + &target_zones, + false); /* relaxed: also get all zones that we allow to sync to/from */ + + std::set hint_entities; + + get_hint_entities(ctx, source_zones, handler->get_source_hints(), &hint_entities, y); + get_hint_entities(ctx, target_zones, handler->get_target_hints(), &hint_entities, y); + + std::set resolved_sources; + std::set resolved_dests; + + for (auto& hint_entity : hint_entities) { + if (!hint_entity.zone || + !hint_entity.bucket) { + continue; /* shouldn't really happen */ + } + + auto& zid = *hint_entity.zone; + auto& hint_bucket = *hint_entity.bucket; + + RGWBucketSyncPolicyHandlerRef hint_bucket_handler; + + auto iter = temp_map.find(optional_zone_bucket(zid, hint_bucket)); + if (iter != temp_map.end()) { + hint_bucket_handler = iter->second; + } else { + int r = do_get_policy_handler(ctx, zid, hint_bucket, temp_map, &hint_bucket_handler, y); + if (r < 0) { + ldout(cct, 20) << "could not get bucket sync policy handler for hint bucket=" << hint_bucket << " ... skipping" << dendl; + continue; + } + } + + hint_bucket_handler->get_pipes(&resolved_dests, + &resolved_sources, + self_entity); /* flipping resolved dests and sources as these are + relative to the remote entity */ + } + + handler->set_resolved_hints(std::move(resolved_sources), std::move(resolved_dests)); + + return 0; +} + +int RGWSI_Bucket_Sync_SObj::do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, + std::optional zone, + std::optional _bucket, + std::map& temp_map, + RGWBucketSyncPolicyHandlerRef *handler, + optional_yield y) { if (!_bucket) { *handler = svc.zone->get_sync_policy_handler(zone); @@ -101,7 +187,7 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, RGWBucketInfo bucket_info; - int r = svc.bucket_sobj->read_bucket_instance_info(ctx, + int r = svc.bucket_sobj->read_bucket_instance_info(ctx.bi, bucket_key, &bucket_info, nullptr, @@ -115,7 +201,9 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, return r; } - e.handler.reset(svc.zone->get_sync_policy_handler(zone)->alloc_child(bucket_info)); + auto zone_policy_handler = svc.zone->get_sync_policy_handler(zone); + + e.handler.reset(zone_policy_handler->alloc_child(bucket_info)); r = e.handler->init(y); if (r < 0) { @@ -123,6 +211,19 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, return r; } + temp_map.emplace(optional_zone_bucket{zone, bucket}, e.handler); + + rgw_sync_bucket_entity self_entity(zone.value_or(svc.zone->zone_id()), bucket); + + r = resolve_policy_hints(ctx, self_entity, + e.handler, + zone_policy_handler, + temp_map, y); + if (r < 0) { + ldout(cct, 20) << "ERROR: failed to resolve policy hints: bucket_key=" << bucket_key << ", r=" << r << dendl; + return r; + } + if (!sync_policy_cache->put(svc.cache, cache_key, &e, {&cache_info})) { ldout(cct, 20) << "couldn't put bucket_sync_policy cache entry, might have raced with data changes" << dendl; } @@ -132,6 +233,16 @@ int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, return 0; } +int RGWSI_Bucket_Sync_SObj::get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, + std::optional zone, + std::optional _bucket, + RGWBucketSyncPolicyHandlerRef *handler, + optional_yield y) +{ + std::map temp_map; + return do_get_policy_handler(ctx, zone, _bucket, temp_map, handler, y); +} + static bool diff_sets(std::set& orig_set, std::set& new_set, vector *added, diff --git a/src/rgw/services/svc_bucket_sync_sobj.h b/src/rgw/services/svc_bucket_sync_sobj.h index 02fa66b01c1..ae497f3b580 100644 --- a/src/rgw/services/svc_bucket_sync_sobj.h +++ b/src/rgw/services/svc_bucket_sync_sobj.h @@ -31,6 +31,8 @@ class RGWChainedCacheImpl; class RGWSI_Bucket_Sync_SObj_HintIndexManager; +struct rgw_sync_bucket_entity; + class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync { struct bucket_sync_policy_cache_entry { @@ -44,6 +46,41 @@ class RGWSI_Bucket_Sync_SObj : public RGWSI_Bucket_Sync int do_start() override; + struct optional_zone_bucket { + optional zone; + optional bucket; + + optional_zone_bucket(const optional& _zone, + const optional& _bucket) : zone(_zone), bucket(_bucket) {} + + bool operator<(const optional_zone_bucket& ozb) const { + if (zone < ozb.zone) { + return true; + } + if (zone > ozb.zone) { + return false; + } + return bucket < ozb.bucket; + } + }; + + void get_hint_entities(RGWSI_Bucket_X_Ctx& ctx, + const std::set& zone_names, + const std::set& buckets, + std::set *hint_entities, + optional_yield y); + int resolve_policy_hints(RGWSI_Bucket_X_Ctx& ctx, + rgw_sync_bucket_entity& self_entity, + RGWBucketSyncPolicyHandlerRef& handler, + RGWBucketSyncPolicyHandlerRef& zone_policy_handler, + std::map& temp_map, + optional_yield y); + int do_get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, + std::optional zone, + std::optional _bucket, + std::map& temp_map, + RGWBucketSyncPolicyHandlerRef *handler, + optional_yield y); public: struct Svc { RGWSI_Zone *zone{nullptr}; @@ -61,11 +98,11 @@ public: RGWSI_Bucket_SObj *_bucket_sobj_svc); - int get_policy_handler(RGWSI_Bucket_BI_Ctx& ctx, + int get_policy_handler(RGWSI_Bucket_X_Ctx& ctx, std::optional zone, std::optional bucket, RGWBucketSyncPolicyHandlerRef *handler, - optional_yield y) override; + optional_yield y); int handle_bi_update(RGWBucketInfo& bucket_info, RGWBucketInfo *orig_bucket_info, -- 2.39.5