}
-static std::vector<rgw_sync_bucket_pipes> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
+static std::vector<rgw_sync_bucket_pipe> filter_relevant_pipes(const std::vector<rgw_sync_bucket_pipes>& pipes,
const string& source_zone,
const string& dest_zone)
{
- std::vector<rgw_sync_bucket_pipes> relevant_pipes;
- for (auto& pipe : pipes) {
- if (pipe.source.match_zone(source_zone) &&
- pipe.dest.match_zone(dest_zone)) {
- relevant_pipes.push_back(pipe);
+ std::vector<rgw_sync_bucket_pipe> relevant_pipes;
+ for (auto& p : pipes) {
+ if (p.source.match_zone(source_zone) &&
+ p.dest.match_zone(dest_zone)) {
+ for (auto pipe : p.expand()) {
+ pipe.source.apply_zone(source_zone);
+ pipe.dest.apply_zone(dest_zone);
+ relevant_pipes.push_back(pipe);
+ }
}
}
}
auto relevant_pipes = filter_relevant_pipes(pipes, source_zone, dest_zone);
- for (auto& pipes : relevant_pipes) {
- for (auto& pipe : pipes.expand()) {
- rgw_sync_bucket_entity zb;
- if (!call_filter_cb(pipe, &zb)) {
- continue;
- }
- pipe_map->insert(make_pair(zb, pipe));
+ for (auto& pipe : relevant_pipes) {
+ rgw_sync_bucket_entity zb;
+ if (!call_filter_cb(pipe, &zb)) {
+ continue;
}
+ pipe_map->insert(make_pair(zb, pipe));
}
}
void RGWBucketSyncFlowManager::pipe_flow::dump(ceph::Formatter *f) const
{
-#warning cleanup
-#if 0
- {
- Formatter::ArraySection os(*f, "flow_groups");
- for (auto& g : flow_groups) {
- encode_json("group", *g, f);
- }
- }
-#endif
-
encode_json("pipes", pipes, f);
}
case rgw_sync_policy_group::Status::ENABLED:
found = true;
found_activated = true;
- /* fall through */
+ break;
case rgw_sync_policy_group::Status::ALLOWED:
found = true;
break;
return m.find(rgw_bucket());
}
-#warning cleanup
-#if 0
-void RGWBucketSyncFlowManager::update_flow_maps(const rgw_sync_bucket_pipes& pipe) {
- auto source_bucket = pipe.source.get_bucket();
- auto dest_bucket = pipe.dest.get_bucket();
-
- if (pipe.match_dest(zone_name, bucket)) { /* we're the dest */
- auto& by_source = flow_by_source[source_bucket];
- by_source.pipe.push_back(pipe);
- }
-
- if (pipe.match_source(zone_name, bucket)) { /* we're the source */
- auto& by_dest = flow_by_dest[dest_bucket];
- by_dest.pipe.push_back(pipe);
- }
-
-#if 0
- if (!bucket ||
- *bucket != source_bucket) {
- auto& by_source = flow_by_source[source_bucket];
- by_source.flow_groups.push_back(flow_group);
- by_source.pipe.push_back(pipe);
- }
-
- if (!bucket ||
- *bucket != dest_bucket) {
- auto& by_dest = flow_by_dest[dest_bucket];
- by_dest.flow_groups.push_back(flow_group);
- by_dest.pipe.push_back(pipe);
- }
-#endif
-}
-#endif
-
void RGWBucketSyncFlowManager::init(const rgw_sync_policy_info& sync_policy) {
for (auto& item : sync_policy.groups) {
auto& group = item.second;
for (auto& item : flow_groups) {
auto& flow_group_map = item.second;
for (auto& entry : flow_group_map.sources) {
- rgw_sync_bucket_pipe pipe;
- rgw_sync_bucket_entity source;
- pipe.source = entry.first;
+ rgw_sync_bucket_pipe pipe = entry.second;
+ if (!pipe.dest.match_bucket(effective_bucket)) {
+ continue;
+ }
+
pipe.source.apply_bucket(effective_bucket);
- pipe.dest = entity;
+ pipe.dest.apply_bucket(effective_bucket);
auto& by_source = (*flow_by_source)[pipe.source.get_bucket()];
by_source.pipes.insert(pipe);
}
for (auto& entry : flow_group_map.dests) {
- rgw_sync_bucket_pipe pipe;
- rgw_sync_bucket_entity dest;
- pipe.dest = entry.first;
+ rgw_sync_bucket_pipe pipe = entry.second;
+
+ if (!pipe.source.match_bucket(effective_bucket)) {
+ continue;
+ }
+
+ pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
- pipe.source = entity;
auto& by_dest = (*flow_by_dest)[pipe.dest.get_bucket()];
by_dest.pipes.insert(pipe);
#include "rgw_common.h"
-#if 0
-struct rgw_sync_flow_directional_rule {
- string source_zone;
- string target_zone;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(source_zone, bl);
- encode(target_zone, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(source_zone, bl);
- decode(target_zone, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(rgw_sync_flow_directional_rule)
-
-struct rgw_sync_flow_rule {
- string id;
- std::optional<rgw_sync_flow_directional_rule> directional;
- std::optional<std::set<string> > symmetrical;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(directional, bl);
- encode(symmetrical, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(id, bl);
- decode(directional, bl);
- decode(symmetrical, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- void get_zone_peers(const string& zone_id, std::set<string> *sources, std::set<string> *targets) const;
-};
-WRITE_CLASS_ENCODER(rgw_sync_flow_rule)
-
-struct rgw_sync_source {
- string id;
- string type;
- std::optional<string> zone;
- std::optional<rgw_bucket> bucket;
- /* FIXME: config */
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(type, bl);
- encode(zone, bl);
- encode(bucket, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(id, bl);
- decode(type, bl);
- decode(zone, bl);
- decode(bucket, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(rgw_sync_source)
-
-struct rgw_sync_target {
- string id;
- string type;
- std::optional<std::vector<rgw_sync_flow_rule> > flow_rules; /* flow rules for trivial sources,
- if set then needs to be a subset of higher level rules */
- std::set<string> zones; /* target zones. Can be wildcard */
- /* FIXME: add config */
-
- std::vector<rgw_sync_source> sources; /* non-trivial sources */
- std::optional<rgw_bucket> bucket; /* can be explicit, or not set. If not set then depending
- on the context */
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(id, bl);
- encode(type, bl);
- encode(flow_rules, bl);
- encode(zones, bl);
- encode(sources, bl);
- encode(bucket, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(id, bl);
- decode(type, bl);
- decode(flow_rules, bl);
- decode(zones, bl);
- decode(sources, bl);
- decode(bucket, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(rgw_sync_target)
-
-
-struct rgw_sync_policy_info {
- std::optional<std::vector<rgw_sync_flow_rule> > flow_rules;
- std::optional<std::vector<rgw_sync_source> > sources;
- std::optional<std::vector<rgw_sync_target> > targets;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(flow_rules, bl);
- encode(sources, bl);
- encode(targets, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(flow_rules, bl);
- decode(sources, bl);
- decode(targets, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- bool empty() const {
- return (!flow_rules || flow_rules->empty()) &&
- (!targets || targets->empty());
- }
-
-};
-WRITE_CLASS_ENCODER(rgw_sync_policy_info)
-
-#endif
struct rgw_sync_symmetric_group {
string id;
WRITE_CLASS_ENCODER(rgw_sync_bucket_pipe)
struct rgw_sync_bucket_entities {
-private:
- bool match_str(const string& s1, const string& s2) const { /* empty string is wildcard */
- return (s1.empty() ||
- s2.empty() ||
- s1 == s2);
- }
-
-public:
std::optional<rgw_bucket> bucket; /* define specific bucket */
std::optional<std::set<string> > zones; /* define specific zones, if not set then all zones */
return true;
}
- return (match_str(bucket->tenant, b->tenant) &&
- match_str(bucket->name, b->name) &&
- match_str(bucket->bucket_id, b->bucket_id));
+ return (rgw_sync_bucket_entity::match_str(bucket->tenant, b->tenant) &&
+ rgw_sync_bucket_entity::match_str(bucket->name, b->name) &&
+ rgw_sync_bucket_entity::match_str(bucket->bucket_id, b->bucket_id));
}
void add_zones(const std::vector<string>& new_zones);
WRITE_CLASS_ENCODER(rgw_sync_bucket_entities)
struct rgw_sync_bucket_pipes {
-private:
- void symmetrical_copy_if_empty(string& s1, string& s2) const {
- if (s1.empty()) {
- s1 = s2;
- } else if (s2.empty()) {
- s2 = s1;
- }
- }
-
-public:
string id;
rgw_sync_bucket_entities source;
rgw_sync_bucket_entities dest;
DECODE_FINISH(bl);
}
- bool contains_bucket(std::optional<rgw_bucket> b) const {
- return (source.match_bucket(b) || dest.match_bucket(b));
- }
- bool contains_zone(const string& zone) const {
- return (source.match_zone(zone) || dest.match_zone(zone));
- }
-
bool match_source(const string& zone, std::optional<rgw_bucket> b) const {
return (source.match_zone(zone) && source.match_bucket(b));
}