From 6310a6837c224fd64ae0dca9839ff6992e4dc55f Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 16 Aug 2019 11:59:44 -0700 Subject: [PATCH] rgw: rework sync policy Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 76 ++++++++++------ src/rgw/rgw_bucket_sync.h | 18 +++- src/rgw/rgw_data_sync.cc | 29 ++++--- src/rgw/rgw_data_sync.h | 172 +++++++++++++++++++++---------------- src/rgw/rgw_json_enc.cc | 89 +++++++++---------- 5 files changed, 223 insertions(+), 161 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index ecdffaca5be..8732bce32f8 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -7,6 +7,7 @@ #include "services/svc_zone.h" +#define dout_subsys ceph_subsys_rgw #if 0 void RGWBucketSyncPolicyInfo::post_init() @@ -23,6 +24,7 @@ void RGWBucketSyncPolicyInfo::post_init() } #endif + int RGWBucketSyncPolicyHandler::init() { const auto& zone_id = zone_svc->get_zone().id; @@ -33,32 +35,58 @@ int RGWBucketSyncPolicyHandler::init() auto& sync_policy = *bucket_info.sync_policy; - for (auto& entry : sync_policy.entries) { - if (!entry.bucket || - !(*entry.bucket == bucket_info.bucket)) { - continue; - } - - - } - - source_zones.clear(); - -#warning FIXME -#if 0 - if (!sync_policy || - !sync_policy->pipes) { - return 0; - } - - for (auto& p : *sync_policy->pipes) { - auto& pipe = p.second; - - if (pipe.target.zone_id == zone_id) { - source_zones.insert(pipe.source.zone_id()); + if (sync_policy.targets) { + for (auto& target : *sync_policy.targets) { + if (!(target.bucket || *target.bucket == bucket_info.bucket)) { + continue; + } + + if (!(target.type.empty() || + target.type == "rgw")) { + ldout(zone_svc->ctx(), 20) << "unsuppported sync target: " << target.type << dendl; + continue; + } + + if (target.zones.find("*") == target.zones.end() && + target.zones.find(zone_id) == target.zones.end()) { + continue; + } + + /* populate trivial peers */ + for (auto& rule : target.flow_rules) { + set source_zones; + set target_zones; + rule.get_zone_peers(zone_id, &source_zones, &target_zones); + + for (auto& sz : source_zones) { + peer_info sinfo; + sinfo.bucket = bucket_info.bucket; + sources[sz].insert(sinfo); + } + + for (auto& tz : target_zones) { + peer_info tinfo; + tinfo.bucket = bucket_info.bucket; + targets[tz].insert(tinfo); + } + } + + /* non trivial sources */ + for (auto& source : target.sources) { + if (!source.bucket || + *source.bucket == bucket_info.bucket) { + if ((source.type.empty() || source.type == "rgw") && + source.zone && + source.bucket) { + peer_info sinfo; + sinfo.type = source.type; + sinfo.bucket = *source.bucket; + sources[*source.zone].insert(sinfo); + } + } + } } } -#endif return 0; } diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 90a3d2e27b5..ca104f0c4ce 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -27,6 +27,22 @@ class RGWBucketSyncPolicyHandler { std::set source_zones; + struct peer_info { + std::string type; + rgw_bucket bucket; + /* need to have config for other type of sources */ + + bool operator<(const peer_info& si) const { + if (type == si.type) { + return (bucket < si.bucket); + } + return (type < si.type); + } + }; + + std::map > sources; + std::map > targets; + public: RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc, RGWBucketInfo& _bucket_info); @@ -34,7 +50,7 @@ public: int init(); bool zone_is_source(const string& zone_id) const { - return source_zones.find(zone_id) != source_zones.end(); + return sources.find(zone_id) != sources.end(); } }; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e7a2c4b5019..81f5686db0e 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -64,20 +64,25 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) { JSONDecoder::decode_json("entries", entries, obj); }; -void rgw_sync_group_info::encode(bufferlist& bl) const +void rgw_sync_flow_rule::get_zone_peers(const string& zone_id, + std::set *sources, + std::set *targets) const { - ENCODE_START(1, 1, bl); - encode(id, bl); - encode(config, bl); - ENCODE_FINISH(bl); -} + sources->clear(); + targets->clear(); -void rgw_sync_group_info::decode(bufferlist::const_iterator& bl) -{ - DECODE_START(1, bl); - decode(id, bl); - decode(config, bl); - DECODE_FINISH(bl); + if (directional) { + if (directional->target_zone == zone_id) { + sources->insert(directional->source_zone); + } else if (directional->source_zone == zone_id) { + targets->insert(directional->target_zone); + } + } else if (symmetrical && + symmetrical->find(zone_id) != symmetrical->end()) { + *sources = *symmetrical; + sources->erase(zone_id); + *targets = *sources; + } } class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 9ca1cdc7cf6..c67d3586e8c 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -259,122 +259,144 @@ struct rgw_bucket_entry_owner { void decode_json(JSONObj *obj); }; -struct rgw_sync_group_info { - static constexpr int CONFIG_FLAG_NONE = 0x0; - static constexpr int CONFIG_FLAG_DR = 0x1; +struct rgw_sync_flow_directional_rule { + string source_zone; + string target_zone; - std::string id; + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(source_zone, bl); + encode(target_zone, bl); + ENCODE_FINISH(bl); + } - struct _config { - int flags{CONFIG_FLAG_NONE}; + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(source_zone, bl); + decode(target_zone, bl); + DECODE_FINISH(bl); + } - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(flags, bl); - ENCODE_FINISH(bl); - } + void dump(ceph::Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(rgw_sync_flow_directional_rule) - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(flags, bl); - DECODE_FINISH(bl); - } +struct rgw_sync_flow_rule { + string id; + std::optional directional; + std::optional > symmetrical; - void dump(ceph::Formatter *f) const; - void decode_json(JSONObj *obj); - } config; + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(directional, bl); + encode(symmetrical, bl); + ENCODE_FINISH(bl); + } - void encode(bufferlist& bl) const; - void decode(bufferlist::const_iterator& bl); + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(directional, bl); + decode(symmetrical, bl); + DECODE_FINISH(bl); + } void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); -}; -WRITE_CLASS_ENCODER(rgw_sync_group_info::_config) -WRITE_CLASS_ENCODER(rgw_sync_group_info) -struct rgw_sync_instance_info { - std::string id; + void get_zone_peers(const string& zone_id, std::set *sources, std::set *targets) const; +}; +WRITE_CLASS_ENCODER(rgw_sync_flow_rule) - std::optional > sync_groups; /* name of groups entity belongs to */ - std::optional zone_id; +struct rgw_sync_source { + string id; + string type; + std::optional zone; std::optional bucket; - std::optional obj_prefix; - - struct sync_pipe { - string source; - string target_prefix; - - bool operator<(const sync_pipe& rhs) const { - if (source == rhs.source) { - return (target_prefix < rhs.target_prefix); - } - return (source < rhs.source); - } - - void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); - encode(source, bl); - encode(target_prefix, bl); - ENCODE_FINISH(bl); - } + /* FIXME: config */ - void decode(bufferlist::const_iterator& bl) { - DECODE_START(1, bl); - decode(source, bl); - decode(target_prefix, bl); - DECODE_FINISH(bl); - } + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(id, bl); + encode(type, bl); + encode(zone, bl); + encode(bucket, bl); + ENCODE_FINISH(bl); + } - void dump(ceph::Formatter *f) const; - void decode_json(JSONObj *obj); - }; + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(id, bl); + decode(type, bl); + decode(zone, bl); + decode(bucket, bl); + DECODE_FINISH(bl); + } - std::optional > sync_from; /* optional group/entity ids */ + void dump(ceph::Formatter *f) const; + void decode_json(JSONObj *obj); +}; +WRITE_CLASS_ENCODER(rgw_sync_source) +struct rgw_sync_target { + string id; + string type; + std::vector flow_rules; /* flow rules for trivial sources */ + std::set zones; /* target zones. Can be wildcard */ + /* FIXME: add config */ + + std::vector sources; /* non-trivial sources */ + std::optional bucket; /* can be explicit, or not set. If not set then depending + on the context */ + void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); encode(id, bl); - encode(sync_groups, bl); - encode(zone_id, bl); + encode(type, bl); + encode(flow_rules, bl); + encode(zones, bl); + encode(sources, bl); encode(bucket, bl); - encode(obj_prefix, bl); - encode(sync_from, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); decode(id, bl); - decode(sync_groups, bl); - decode(zone_id, bl); + decode(type, bl); + decode(flow_rules, bl); + decode(zones, bl); + decode(sources, bl); decode(bucket, bl); - decode(obj_prefix, bl); - decode(sync_from, bl); DECODE_FINISH(bl); } void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); }; -WRITE_CLASS_ENCODER(rgw_sync_instance_info::sync_pipe) -WRITE_CLASS_ENCODER(rgw_sync_instance_info) +WRITE_CLASS_ENCODER(rgw_sync_target) + struct rgw_sync_policy_info { - std::vector groups; - std::vector entries; + std::optional > flow_rules; + std::optional > sources; + std::optional > targets; void encode(bufferlist& bl) const { ENCODE_START(1, 1, bl); - encode(groups, bl); - encode(entries, bl); + encode(flow_rules, bl); + encode(sources, bl); + encode(targets, bl); ENCODE_FINISH(bl); } void decode(bufferlist::const_iterator& bl) { DECODE_START(1, bl); - decode(groups, bl); - decode(entries, bl); + decode(flow_rules, bl); + decode(sources, bl); + decode(targets, bl); DECODE_FINISH(bl); } @@ -382,8 +404,8 @@ struct rgw_sync_policy_info { void decode_json(JSONObj *obj); bool empty() const { - return groups.empty() && - entries.empty(); + return (!flow_rules || flow_rules->empty()) && + (!targets || targets->empty()); } }; diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index d08cc970409..c3740b233fe 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -832,89 +832,80 @@ void RGWBucketInfo::decode_json(JSONObj *obj) { reshard_status = (cls_rgw_reshard_status)rs; } -void rgw_sync_group_info::dump(Formatter *f) const +void rgw_sync_flow_directional_rule::dump(Formatter *f) const { - encode_json("id", id, f); - encode_json("config", config, f); + encode_json("source_zone", source_zone, f); + encode_json("target_zone", target_zone, f); } -void rgw_sync_group_info::decode_json(JSONObj *obj) +void rgw_sync_flow_directional_rule::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("id", id, obj); - JSONDecoder::decode_json("config", config, obj); + JSONDecoder::decode_json("source_zone", source_zone, obj); + JSONDecoder::decode_json("target_zone", target_zone, obj); } -static struct rgw_flags_desc sync_group_flags_desc[] = { - { rgw_sync_group_info::CONFIG_FLAG_DR, "mirror" }, - { rgw_sync_group_info::CONFIG_FLAG_NONE, "none" }, - { 0, NULL } -}; - -void rgw_sync_group_info::_config::dump(Formatter *f) const +void rgw_sync_flow_rule::dump(Formatter *f) const { - char buf[256]; - mask_to_str(sync_group_flags_desc, flags, buf, sizeof(buf)); - encode_json("flags", (const char *)buf, f); + encode_json("id", id, f); + encode_json("directional", directional, f); + encode_json("symmetrical", symmetrical, f); } -static struct rgw_name_to_flag sync_group_flags_mapping[] = { - {"mirror", rgw_sync_group_info::CONFIG_FLAG_DR}, - {"none", 0}, - {NULL, 0} }; - - - -void rgw_sync_group_info::_config::decode_json(JSONObj *obj) +void rgw_sync_flow_rule::decode_json(JSONObj *obj) { - string s; - JSONDecoder::decode_json("flags", s, obj); - uint32_t f = 0; - rgw_parse_list_of_flags(sync_group_flags_mapping, s, &f); - flags = f; + JSONDecoder::decode_json("id", id, obj); + JSONDecoder::decode_json("directional", directional, obj); + JSONDecoder::decode_json("symmetrical", symmetrical, obj); } -void rgw_sync_instance_info::dump(Formatter *f) const +void rgw_sync_source::dump(Formatter *f) const { encode_json("id", id, f); - encode_json("sync_groups", sync_groups, f); - encode_json("zone_id", zone_id, f); + encode_json("type", type, f); + encode_json("zone", zone, f); encode_json("bucket", bucket, f); - encode_json("obj_prefix", obj_prefix, f); - encode_json("sync_from", sync_from, f); } -void rgw_sync_instance_info::decode_json(JSONObj *obj) +void rgw_sync_source::decode_json(JSONObj *obj) { JSONDecoder::decode_json("id", id, obj); - JSONDecoder::decode_json("sync_groups", sync_groups, obj); - JSONDecoder::decode_json("zone_id", zone_id, obj); + JSONDecoder::decode_json("type", type, obj); + JSONDecoder::decode_json("zone", zone, obj); JSONDecoder::decode_json("bucket", bucket, obj); - JSONDecoder::decode_json("obj_prefix", obj_prefix, obj); - JSONDecoder::decode_json("sync_from", sync_from, obj); } -void rgw_sync_instance_info::sync_pipe::dump(Formatter *f) const +void rgw_sync_target::dump(Formatter *f) const { - encode_json("source", source, f); - encode_json("target_prefix", target_prefix, f); + encode_json("id", id, f); + encode_json("type", type, f); + encode_json("flow_rules", flow_rules, f); + encode_json("zones", zones, f); + encode_json("sources", sources, f); + encode_json("bucket", bucket, f); } -void rgw_sync_instance_info::sync_pipe::decode_json(JSONObj *obj) +void rgw_sync_target::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("source", source, obj); - JSONDecoder::decode_json("target_prefix", target_prefix, obj); + JSONDecoder::decode_json("id", id, obj); + JSONDecoder::decode_json("type", type, obj); + JSONDecoder::decode_json("flow_rules", flow_rules, obj); + JSONDecoder::decode_json("zones", zones, obj); + JSONDecoder::decode_json("sources", sources, obj); + JSONDecoder::decode_json("bucket", bucket, obj); } void rgw_sync_policy_info::dump(Formatter *f) const { - encode_json("groups", groups, f); - encode_json("entries", entries, f); + encode_json("flow_rules", flow_rules, f); + encode_json("sources", sources, f); + encode_json("targets", targets, f); } void rgw_sync_policy_info::decode_json(JSONObj *obj) { - JSONDecoder::decode_json("groups", groups, obj); - JSONDecoder::decode_json("entries", entries, obj); + JSONDecoder::decode_json("flow_rules", flow_rules, obj); + JSONDecoder::decode_json("sources", sources, obj); + JSONDecoder::decode_json("targets", targets, obj); } void rgw_obj_key::dump(Formatter *f) const -- 2.39.5