Formatter::ObjectSection top_section(*f, name);
Formatter::ArraySection as(*f, "entries");
- for (auto& pipe : pset.pipes) {
- encode_json("pipe", pipe, f);
+ for (auto& pipe_handler : pset) {
+ Formatter::ObjectSection hs(*f, "handler");
+ encode_json("source", pipe_handler.source, f);
+ encode_json("dest", pipe_handler.dest, f);
}
}
handler = bucket_handler;
}
- RGWBucketSyncFlowManager::pipe_set *sources;
- RGWBucketSyncFlowManager::pipe_set *dests;
+ std::set<rgw_sync_bucket_pipe> sources;
+ std::set<rgw_sync_bucket_pipe> dests;
- handler->get_pipes(&sources, &dests);
+ handler->get_pipes(&sources, &dests, std::nullopt);
auto source_hints_vec = convert_bucket_set_to_str_vec(handler->get_source_hints());
auto target_hints_vec = convert_bucket_set_to_str_vec(handler->get_target_hints());
- RGWBucketSyncFlowManager::pipe_set resolved_sources;
- RGWBucketSyncFlowManager::pipe_set resolved_dests;
+ std::set<rgw_sync_bucket_pipe> resolved_sources;
+ std::set<rgw_sync_bucket_pipe> resolved_dests;
rgw_sync_bucket_entity self_entity(zone_name, opt_bucket);
{
Formatter::ObjectSection os(*formatter, "result");
- encode_json("sources", *sources, formatter);
- encode_json("dests", *dests, formatter);
+ encode_json("sources", sources, formatter);
+ encode_json("dests", dests, formatter);
{
Formatter::ObjectSection hints_section(*formatter, "hints");
encode_json("sources", source_hints_vec, formatter);
for (auto& m : sources) {
auto& zone = m.first;
out << indented{width, "source zone"} << zone << std::endl;
- for (auto& pipe : m.second.pipes) {
- out << indented{width, "bucket"} << *pipe.source.bucket << std::endl;
+ for (auto& pipe_handler : m.second) {
+ out << indented{width, "bucket"} << *pipe_handler.source.bucket << std::endl;
}
}
}
for (auto& m : sources) {
- for (auto& pipe : m.second.pipes) {
+ for (auto& entry : m.second.pipe_map) {
+ auto& pipe = entry.second;
if (pipe.source.zone.value_or("") == z->second.id) {
bucket_source_sync_status(store, zone, z->second,
c->second,
return vector<rgw_sync_bucket_pipe>();
}
-void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
+void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe)
{
- auto ppipe = &pipe_map[pipe.id];
-
- auto prefix = pipe.params.filter.prefix.value_or(string());
+ auto prefix = ppipe->params.filter.prefix.value_or(string());
prefix_by_size.insert(make_pair(prefix.size(), ppipe));
- for (auto& tag : pipe.params.filter.tags) {
+ for (auto& tag : ppipe->params.filter.tags) {
auto titer = tag_refs.find(tag);
if (titer != tag_refs.end() &&
- pipe.params.priority > titer->second->params.priority) {
+ ppipe->params.priority > titer->second->params.priority) {
titer->second = ppipe;
} else {
tag_refs[tag] = ppipe;
}
}
+void RGWBucketSyncFlowManager::pipe_set::finish_init()
+{
+ for (auto& entry : rules) {
+ entry.second.finish_init();
+ }
+}
+
void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
{
auto prefix = ppipe->params.filter.prefix.value_or(string());
}
}
+void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
+ auto ppipe = &pipe_map[pipe.id];
+ *ppipe = pipe;
+
+ auto prules = &rules[endpoints_pair(*ppipe)];
+
+ prules->insert(ppipe);
+
+ pipe_handler h(prules, pipe);
+
+ handlers.insert(h);
+}
+
void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
{
- encode_json("pipes", pipes, f);
+ encode_json("pipes", pipe_map, f);
}
bool RGWBucketSyncFlowManager::allowed_data_flow(const string& source_zone,
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
- source_pipes->pipes.insert(pipe);
+ source_pipes->insert(pipe);
}
for (auto& entry : flow_group_map.dests) {
pipe.source.apply_bucket(effective_bucket);
pipe.dest.apply_bucket(effective_bucket);
- dest_pipes->pipes.insert(pipe);
+ dest_pipes->insert(pipe);
}
}
+
+ source_pipes->finish_init();
+ dest_pipes->finish_init();
}
flow_mgr->reflect(bucket, &_sources_by_name, &_targets_by_name, only_enabled);
- for (auto& pipe : _sources_by_name.pipes) {
+ for (auto& entry : _sources_by_name.pipe_map) {
+ auto& pipe = entry.second;
if (!pipe.source.zone) {
continue;
}
if (zone_svc->find_zone_id_by_name(*pipe.source.zone, &zone_id)) {
new_pipe.source.zone = zone_id;
}
- _sources[*new_pipe.source.zone].pipes.insert(new_pipe);
+ _sources[*new_pipe.source.zone].insert(new_pipe);
+ }
+ for (auto& s : _sources) {
+ s.second.finish_init();
}
- for (auto& pipe : _targets_by_name.pipes) {
+
+ for (auto& entry : _targets_by_name.pipe_map) {
+ auto& pipe = entry.second;
if (!pipe.dest.zone) {
continue;
}
if (zone_svc->find_zone_id_by_name(*pipe.dest.zone, &zone_id)) {
new_pipe.dest.zone = zone_id;
}
- _targets[*new_pipe.dest.zone].pipes.insert(new_pipe);
+ _targets[*new_pipe.dest.zone].insert(new_pipe);
+ }
+ for (auto& t : _targets) {
+ t.second.finish_init();
}
if (psources_by_name) {
}
}
-void RGWBucketSyncPolicyHandler::get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
+void RGWBucketSyncPolicyHandler::get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
std::optional<rgw_sync_bucket_entity> filter_peer) { /* return raw pipes (with zone name) */
- if (!filter_peer) {
- *sources = sources_by_name;
- *targets = targets_by_name;
- return;
- }
-
- auto& filter = *filter_peer;
-
- for (auto& source_pipe : sources_by_name.pipes) {
- if (source_pipe.source.match(filter)) {
- sources->pipes.insert(source_pipe);
+ for (auto& entry : sources_by_name.pipe_map) {
+ auto& source_pipe = entry.second;
+ if (!filter_peer ||
+ source_pipe.source.match(*filter_peer)) {
+ sources->insert(source_pipe);
}
}
- for (auto& target_pipe : targets_by_name.pipes) {
- if (target_pipe.dest.match(filter)) {
- targets->pipes.insert(target_pipe);
+ for (auto& entry : targets_by_name.pipe_map) {
+ auto& target_pipe = entry.second;
+ if (!filter_peer ||
+ target_pipe.dest.match(*filter_peer)) {
+ targets->insert(target_pipe);
}
}
}
}
};
+ /*
+ * pipe_rules: deal with a set of pipes that have common endpoints_pair
+ */
class pipe_rules {
void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
public:
- std::map<string, rgw_sync_bucket_pipe> pipe_map; /* id to pipe */
-
std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
map<string, rgw_sync_bucket_pipe *> prefix_refs;
- void insert(const rgw_sync_bucket_pipe& pipe);
+ void insert(rgw_sync_bucket_pipe *pipe);
void finish_init();
};
+ /*
+ * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
+ */
+ struct pipe_handler : public endpoints_pair {
+ pipe_rules *rules;
+
+ pipe_handler() {}
+ pipe_handler(pipe_rules *_rules,
+ const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
+ rules(_rules) {}
+ bool specific() const {
+ return source.specific() && dest.specific();
+ }
+ };
+
struct pipe_set {
std::map<endpoints_pair, pipe_rules> rules;
- std::set<rgw_sync_bucket_pipe> pipes;
+ std::map<string, rgw_sync_bucket_pipe> pipe_map;
- using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
+ std::set<pipe_handler> handlers;
+
+ using iterator = std::set<pipe_handler>::iterator;
void clear() {
- pipes.clear();
+ rules.clear();
+ pipe_map.clear();
+ handlers.clear();
}
- void insert(const rgw_sync_bucket_pipe& pipe) {
- pipes.insert(pipe);
- }
+ void insert(const rgw_sync_bucket_pipe& pipe);
+ void finish_init();
- iterator begin() {
- return pipes.begin();
+ iterator begin() const {
+ return handlers.begin();
}
- iterator end() {
- return pipes.end();
+ iterator end() const {
+ return handlers.end();
}
void dump(ceph::Formatter *f) const;
};
+ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
+ os << e.dest << " -> " << e.source;
+ return os;
+}
+
class RGWBucketSyncPolicyHandler {
const RGWBucketSyncPolicyHandler *parent{nullptr};
RGWSI_Zone *zone_svc;
*sources = &sources_by_name;
*targets = &targets_by_name;
}
- void get_pipes(RGWBucketSyncFlowManager::pipe_set *sources, RGWBucketSyncFlowManager::pipe_set *targets,
- std::optional<rgw_sync_bucket_entity> filter_peer);
+ void get_pipes(std::set<rgw_sync_bucket_pipe> *sources, std::set<rgw_sync_bucket_pipe> *targets,
+ std::optional<rgw_sync_bucket_entity> filter_peer);
const std::set<rgw_bucket>& get_source_hints() const {
return source_hints;
return out;
}
-struct rgw_sync_pipe_info {
- rgw_sync_pipe_params params;
+struct rgw_sync_pipe_handler_info {
+ RGWBucketSyncFlowManager::pipe_handler handler;
rgw_sync_pipe_info_entity source;
rgw_sync_pipe_info_entity target;
- rgw_sync_pipe_info() {}
- rgw_sync_pipe_info(const rgw_sync_bucket_pipe& pipe,
+ rgw_sync_pipe_handler_info() {}
+ rgw_sync_pipe_handler_info(const RGWBucketSyncFlowManager::pipe_handler& _handler,
std::optional<RGWBucketInfo> source_bucket_info,
- std::optional<RGWBucketInfo> target_bucket_info) : source(pipe.source, source_bucket_info),
- target(pipe.dest, target_bucket_info) {
- params = pipe.params;
+ std::optional<RGWBucketInfo> target_bucket_info) : handler(_handler),
+ source(handler.source, source_bucket_info),
+ target(handler.dest, target_bucket_info) {
}
- bool operator<(const rgw_sync_pipe_info& p) const {
+ bool operator<(const rgw_sync_pipe_handler_info& p) const {
if (source < p.source) {
return true;
}
}
};
-std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_info& p) {
+std::ostream& operator<<(std::ostream& out, const rgw_sync_pipe_handler_info& p) {
out << p.source << ">" << p.target;
return out;
}
struct rgw_sync_pipe_info_set {
- std::set<rgw_sync_pipe_info> pipes;
+ std::set<rgw_sync_pipe_handler_info> handlers;
- using iterator = std::set<rgw_sync_pipe_info>::iterator;
+ using iterator = std::set<rgw_sync_pipe_handler_info>::iterator;
void clear() {
- pipes.clear();
+ handlers.clear();
}
- void insert(const rgw_sync_bucket_pipe& pipe,
+ void insert(const RGWBucketSyncFlowManager::pipe_handler& handler,
std::optional<RGWBucketInfo>& source_bucket_info,
std::optional<RGWBucketInfo>& target_bucket_info) {
- rgw_sync_pipe_info p(pipe, source_bucket_info, target_bucket_info);
- pipes.insert(p);
+ rgw_sync_pipe_handler_info p(handler, source_bucket_info, target_bucket_info);
+ handlers.insert(p);
}
iterator begin() {
- return pipes.begin();
+ return handlers.begin();
}
iterator end() {
- return pipes.end();
+ return handlers.end();
}
bool empty() const {
- return pipes.empty();
+ return handlers.empty();
}
void update_empty_bucket_info(const std::map<rgw_bucket, RGWBucketInfo>& buckets_info) {
return;
}
- std::set<rgw_sync_pipe_info> p;
+ std::set<rgw_sync_pipe_handler_info> p;
- for (auto pipe : pipes) {
+ for (auto pipe : handlers) {
pipe.update_empty_bucket_info(buckets_info);
p.insert(pipe);
}
- pipes = std::move(p);
+ handlers = std::move(p);
}
};
<< " all_sources.size()=" << all_sources.size() << dendl;
auto iters = get_pipe_iters(all_sources, source_zone);
for (auto i = iters.first; i != iters.second; ++i) {
- for (auto& pipe : i->second.pipes) {
- if (!pipe.specific()) {
- ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
+ for (auto& handler : i->second) {
+ if (!handler.specific()) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
continue;
}
if (source_bucket &&
- !source_bucket->match(*pipe.source.bucket)) {
+ !source_bucket->match(*handler.source.bucket)) {
continue;
}
- ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
- result->insert(pipe, source_bucket_info, target_bucket_info);
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
+ result->insert(handler, source_bucket_info, target_bucket_info);
}
}
}
<< " all_targets.size()=" << all_targets.size() << dendl;
auto iters = get_pipe_iters(all_targets, target_zone);
for (auto i = iters.first; i != iters.second; ++i) {
- for (auto& pipe : i->second.pipes) {
+ for (auto& handler : i->second) {
if (target_bucket &&
- pipe.dest.bucket &&
- !target_bucket->match(*pipe.dest.bucket)) {
- ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": skipping" << dendl;
+ handler.dest.bucket &&
+ !target_bucket->match(*handler.dest.bucket)) {
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": skipping" << dendl;
continue;
}
- ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe=" << pipe << ": adding" << dendl;
- result->insert(pipe, source_bucket_info, target_bucket_info);
+ ldpp_dout(sync_env->dpp, 20) << __func__ << ": pipe_handler=" << handler << ": adding" << dendl;
+ result->insert(handler, source_bucket_info, target_bucket_info);
}
}
}
}
sync_pair.dest_bs.bucket = siter->target.get_bucket();
- sync_pair.params = siter->params;
+ sync_pair.handler = siter->handler;
if (sync_pair.source_bs.shard_id >= 0) {
num_shards = 1;
#include "rgw_sync_trace.h"
#include "rgw_sync_policy.h"
+#include "rgw_bucket_sync.h"
+
class JSONObj;
struct rgw_sync_bucket_pipe;
struct rgw_bucket_sync_pair_info {
- rgw_sync_pipe_params params;
+ RGWBucketSyncFlowManager::pipe_handler handler; /* responsible for sync filters */
rgw_bucket_shard source_bs;
rgw_bucket_shard dest_bs;
};
out << p.source_bs;
- if (p.params.filter.prefix) {
- out << "/" << *p.params.filter.prefix + "*";
- }
-
out << "->" << p.dest_bs.bucket;
return out;