#include "services/svc_zone.h"
+#define dout_subsys ceph_subsys_rgw
#if 0
void RGWBucketSyncPolicyInfo::post_init()
}
#endif
+
int RGWBucketSyncPolicyHandler::init()
{
const auto& zone_id = zone_svc->get_zone().id;
auto& sync_policy = *bucket_info.sync_policy;
- for (auto& entry : sync_policy.entries) {
- if (!entry.bucket ||
- !(*entry.bucket == bucket_info.bucket)) {
- continue;
- }
-
-
- }
-
- source_zones.clear();
-
-#warning FIXME
-#if 0
- if (!sync_policy ||
- !sync_policy->pipes) {
- return 0;
- }
-
- for (auto& p : *sync_policy->pipes) {
- auto& pipe = p.second;
-
- if (pipe.target.zone_id == zone_id) {
- source_zones.insert(pipe.source.zone_id());
+ if (sync_policy.targets) {
+ for (auto& target : *sync_policy.targets) {
+ if (!(target.bucket || *target.bucket == bucket_info.bucket)) {
+ continue;
+ }
+
+ if (!(target.type.empty() ||
+ target.type == "rgw")) {
+ ldout(zone_svc->ctx(), 20) << "unsuppported sync target: " << target.type << dendl;
+ continue;
+ }
+
+ if (target.zones.find("*") == target.zones.end() &&
+ target.zones.find(zone_id) == target.zones.end()) {
+ continue;
+ }
+
+ /* populate trivial peers */
+ for (auto& rule : target.flow_rules) {
+ set<string> source_zones;
+ set<string> target_zones;
+ rule.get_zone_peers(zone_id, &source_zones, &target_zones);
+
+ for (auto& sz : source_zones) {
+ peer_info sinfo;
+ sinfo.bucket = bucket_info.bucket;
+ sources[sz].insert(sinfo);
+ }
+
+ for (auto& tz : target_zones) {
+ peer_info tinfo;
+ tinfo.bucket = bucket_info.bucket;
+ targets[tz].insert(tinfo);
+ }
+ }
+
+ /* non trivial sources */
+ for (auto& source : target.sources) {
+ if (!source.bucket ||
+ *source.bucket == bucket_info.bucket) {
+ if ((source.type.empty() || source.type == "rgw") &&
+ source.zone &&
+ source.bucket) {
+ peer_info sinfo;
+ sinfo.type = source.type;
+ sinfo.bucket = *source.bucket;
+ sources[*source.zone].insert(sinfo);
+ }
+ }
+ }
}
}
-#endif
return 0;
}
std::set<string> source_zones;
+ struct peer_info {
+ std::string type;
+ rgw_bucket bucket;
+ /* need to have config for other type of sources */
+
+ bool operator<(const peer_info& si) const {
+ if (type == si.type) {
+ return (bucket < si.bucket);
+ }
+ return (type < si.type);
+ }
+ };
+
+ std::map<string, std::set<peer_info> > sources;
+ std::map<string, std::set<peer_info> > targets;
+
public:
RGWBucketSyncPolicyHandler(RGWSI_Zone *_zone_svc,
RGWBucketInfo& _bucket_info);
int init();
bool zone_is_source(const string& zone_id) const {
- return source_zones.find(zone_id) != source_zones.end();
+ return sources.find(zone_id) != sources.end();
}
};
JSONDecoder::decode_json("entries", entries, obj);
};
-void rgw_sync_group_info::encode(bufferlist& bl) const
+void rgw_sync_flow_rule::get_zone_peers(const string& zone_id,
+ std::set<string> *sources,
+ std::set<string> *targets) const
{
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(config, bl);
- ENCODE_FINISH(bl);
-}
+ sources->clear();
+ targets->clear();
-void rgw_sync_group_info::decode(bufferlist::const_iterator& bl)
-{
- DECODE_START(1, bl);
- decode(id, bl);
- decode(config, bl);
- DECODE_FINISH(bl);
+ if (directional) {
+ if (directional->target_zone == zone_id) {
+ sources->insert(directional->source_zone);
+ } else if (directional->source_zone == zone_id) {
+ targets->insert(directional->target_zone);
+ }
+ } else if (symmetrical &&
+ symmetrical->find(zone_id) != symmetrical->end()) {
+ *sources = *symmetrical;
+ sources->erase(zone_id);
+ *targets = *sources;
+ }
}
class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
void decode_json(JSONObj *obj);
};
-struct rgw_sync_group_info {
- static constexpr int CONFIG_FLAG_NONE = 0x0;
- static constexpr int CONFIG_FLAG_DR = 0x1;
+struct rgw_sync_flow_directional_rule {
+ string source_zone;
+ string target_zone;
- std::string id;
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(source_zone, bl);
+ encode(target_zone, bl);
+ ENCODE_FINISH(bl);
+ }
- struct _config {
- int flags{CONFIG_FLAG_NONE};
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(source_zone, bl);
+ decode(target_zone, bl);
+ DECODE_FINISH(bl);
+ }
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(flags, bl);
- ENCODE_FINISH(bl);
- }
+ void dump(ceph::Formatter *f) const;
+ void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_flow_directional_rule)
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(flags, bl);
- DECODE_FINISH(bl);
- }
+struct rgw_sync_flow_rule {
+ string id;
+ std::optional<rgw_sync_flow_directional_rule> directional;
+ std::optional<std::set<string> > symmetrical;
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
- } config;
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(directional, bl);
+ encode(symmetrical, bl);
+ ENCODE_FINISH(bl);
+ }
- void encode(bufferlist& bl) const;
- void decode(bufferlist::const_iterator& bl);
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(directional, bl);
+ decode(symmetrical, bl);
+ DECODE_FINISH(bl);
+ }
void dump(ceph::Formatter *f) const;
void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(rgw_sync_group_info::_config)
-WRITE_CLASS_ENCODER(rgw_sync_group_info)
-struct rgw_sync_instance_info {
- std::string id;
+ void get_zone_peers(const string& zone_id, std::set<string> *sources, std::set<string> *targets) const;
+};
+WRITE_CLASS_ENCODER(rgw_sync_flow_rule)
- std::optional<std::set<std::string> > sync_groups; /* name of groups entity belongs to */
- std::optional<std::string> zone_id;
+struct rgw_sync_source {
+ string id;
+ string type;
+ std::optional<string> zone;
std::optional<rgw_bucket> bucket;
- std::optional<std::string> obj_prefix;
-
- struct sync_pipe {
- string source;
- string target_prefix;
-
- bool operator<(const sync_pipe& rhs) const {
- if (source == rhs.source) {
- return (target_prefix < rhs.target_prefix);
- }
- return (source < rhs.source);
- }
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(source, bl);
- encode(target_prefix, bl);
- ENCODE_FINISH(bl);
- }
+ /* FIXME: config */
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(source, bl);
- decode(target_prefix, bl);
- DECODE_FINISH(bl);
- }
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(id, bl);
+ encode(type, bl);
+ encode(zone, bl);
+ encode(bucket, bl);
+ ENCODE_FINISH(bl);
+ }
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
- };
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(id, bl);
+ decode(type, bl);
+ decode(zone, bl);
+ decode(bucket, bl);
+ DECODE_FINISH(bl);
+ }
- std::optional<std::set<sync_pipe> > sync_from; /* optional group/entity ids */
+ void dump(ceph::Formatter *f) const;
+ void decode_json(JSONObj *obj);
+};
+WRITE_CLASS_ENCODER(rgw_sync_source)
+struct rgw_sync_target {
+ string id;
+ string type;
+ std::vector<rgw_sync_flow_rule> flow_rules; /* flow rules for trivial sources */
+ std::set<string> zones; /* target zones. Can be wildcard */
+ /* FIXME: add config */
+
+ std::vector<rgw_sync_source> sources; /* non-trivial sources */
+ std::optional<rgw_bucket> bucket; /* can be explicit, or not set. If not set then depending
+ on the context */
+
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
encode(id, bl);
- encode(sync_groups, bl);
- encode(zone_id, bl);
+ encode(type, bl);
+ encode(flow_rules, bl);
+ encode(zones, bl);
+ encode(sources, bl);
encode(bucket, bl);
- encode(obj_prefix, bl);
- encode(sync_from, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
decode(id, bl);
- decode(sync_groups, bl);
- decode(zone_id, bl);
+ decode(type, bl);
+ decode(flow_rules, bl);
+ decode(zones, bl);
+ decode(sources, bl);
decode(bucket, bl);
- decode(obj_prefix, bl);
- decode(sync_from, bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter *f) const;
void decode_json(JSONObj *obj);
};
-WRITE_CLASS_ENCODER(rgw_sync_instance_info::sync_pipe)
-WRITE_CLASS_ENCODER(rgw_sync_instance_info)
+WRITE_CLASS_ENCODER(rgw_sync_target)
+
struct rgw_sync_policy_info {
- std::vector<rgw_sync_group_info> groups;
- std::vector<rgw_sync_instance_info> entries;
+ std::optional<std::vector<rgw_sync_flow_rule> > flow_rules;
+ std::optional<std::vector<rgw_sync_source> > sources;
+ std::optional<std::vector<rgw_sync_target> > targets;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(groups, bl);
- encode(entries, bl);
+ encode(flow_rules, bl);
+ encode(sources, bl);
+ encode(targets, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(groups, bl);
- decode(entries, bl);
+ decode(flow_rules, bl);
+ decode(sources, bl);
+ decode(targets, bl);
DECODE_FINISH(bl);
}
void decode_json(JSONObj *obj);
bool empty() const {
- return groups.empty() &&
- entries.empty();
+ return (!flow_rules || flow_rules->empty()) &&
+ (!targets || targets->empty());
}
};
reshard_status = (cls_rgw_reshard_status)rs;
}
-void rgw_sync_group_info::dump(Formatter *f) const
+void rgw_sync_flow_directional_rule::dump(Formatter *f) const
{
- encode_json("id", id, f);
- encode_json("config", config, f);
+ encode_json("source_zone", source_zone, f);
+ encode_json("target_zone", target_zone, f);
}
-void rgw_sync_group_info::decode_json(JSONObj *obj)
+void rgw_sync_flow_directional_rule::decode_json(JSONObj *obj)
{
- JSONDecoder::decode_json("id", id, obj);
- JSONDecoder::decode_json("config", config, obj);
+ JSONDecoder::decode_json("source_zone", source_zone, obj);
+ JSONDecoder::decode_json("target_zone", target_zone, obj);
}
-static struct rgw_flags_desc sync_group_flags_desc[] = {
- { rgw_sync_group_info::CONFIG_FLAG_DR, "mirror" },
- { rgw_sync_group_info::CONFIG_FLAG_NONE, "none" },
- { 0, NULL }
-};
-
-void rgw_sync_group_info::_config::dump(Formatter *f) const
+void rgw_sync_flow_rule::dump(Formatter *f) const
{
- char buf[256];
- mask_to_str(sync_group_flags_desc, flags, buf, sizeof(buf));
- encode_json("flags", (const char *)buf, f);
+ encode_json("id", id, f);
+ encode_json("directional", directional, f);
+ encode_json("symmetrical", symmetrical, f);
}
-static struct rgw_name_to_flag sync_group_flags_mapping[] = {
- {"mirror", rgw_sync_group_info::CONFIG_FLAG_DR},
- {"none", 0},
- {NULL, 0} };
-
-
-
-void rgw_sync_group_info::_config::decode_json(JSONObj *obj)
+void rgw_sync_flow_rule::decode_json(JSONObj *obj)
{
- string s;
- JSONDecoder::decode_json("flags", s, obj);
- uint32_t f = 0;
- rgw_parse_list_of_flags(sync_group_flags_mapping, s, &f);
- flags = f;
+ JSONDecoder::decode_json("id", id, obj);
+ JSONDecoder::decode_json("directional", directional, obj);
+ JSONDecoder::decode_json("symmetrical", symmetrical, obj);
}
-void rgw_sync_instance_info::dump(Formatter *f) const
+void rgw_sync_source::dump(Formatter *f) const
{
encode_json("id", id, f);
- encode_json("sync_groups", sync_groups, f);
- encode_json("zone_id", zone_id, f);
+ encode_json("type", type, f);
+ encode_json("zone", zone, f);
encode_json("bucket", bucket, f);
- encode_json("obj_prefix", obj_prefix, f);
- encode_json("sync_from", sync_from, f);
}
-void rgw_sync_instance_info::decode_json(JSONObj *obj)
+void rgw_sync_source::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("id", id, obj);
- JSONDecoder::decode_json("sync_groups", sync_groups, obj);
- JSONDecoder::decode_json("zone_id", zone_id, obj);
+ JSONDecoder::decode_json("type", type, obj);
+ JSONDecoder::decode_json("zone", zone, obj);
JSONDecoder::decode_json("bucket", bucket, obj);
- JSONDecoder::decode_json("obj_prefix", obj_prefix, obj);
- JSONDecoder::decode_json("sync_from", sync_from, obj);
}
-void rgw_sync_instance_info::sync_pipe::dump(Formatter *f) const
+void rgw_sync_target::dump(Formatter *f) const
{
- encode_json("source", source, f);
- encode_json("target_prefix", target_prefix, f);
+ encode_json("id", id, f);
+ encode_json("type", type, f);
+ encode_json("flow_rules", flow_rules, f);
+ encode_json("zones", zones, f);
+ encode_json("sources", sources, f);
+ encode_json("bucket", bucket, f);
}
-void rgw_sync_instance_info::sync_pipe::decode_json(JSONObj *obj)
+void rgw_sync_target::decode_json(JSONObj *obj)
{
- JSONDecoder::decode_json("source", source, obj);
- JSONDecoder::decode_json("target_prefix", target_prefix, obj);
+ JSONDecoder::decode_json("id", id, obj);
+ JSONDecoder::decode_json("type", type, obj);
+ JSONDecoder::decode_json("flow_rules", flow_rules, obj);
+ JSONDecoder::decode_json("zones", zones, obj);
+ JSONDecoder::decode_json("sources", sources, obj);
+ JSONDecoder::decode_json("bucket", bucket, obj);
}
void rgw_sync_policy_info::dump(Formatter *f) const
{
- encode_json("groups", groups, f);
- encode_json("entries", entries, f);
+ encode_json("flow_rules", flow_rules, f);
+ encode_json("sources", sources, f);
+ encode_json("targets", targets, f);
}
void rgw_sync_policy_info::decode_json(JSONObj *obj)
{
- JSONDecoder::decode_json("groups", groups, obj);
- JSONDecoder::decode_json("entries", entries, obj);
+ JSONDecoder::decode_json("flow_rules", flow_rules, obj);
+ JSONDecoder::decode_json("sources", sources, obj);
+ JSONDecoder::decode_json("targets", targets, obj);
}
void rgw_obj_key::dump(Formatter *f) const