return vector<rgw_sync_bucket_pipe>();
}
-void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe)
+void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
{
+ pipes.push_back(pipe);
+
+ auto ppipe = &pipes.back();
auto prefix = ppipe->params.filter.prefix.value_or(string());
- prefix_by_size.insert(make_pair(prefix.size(), ppipe));
+ prefix_refs.insert(make_pair(prefix, ppipe));
- for (auto& tag : ppipe->params.filter.tags) {
+ for (auto& t : ppipe->params.filter.tags) {
+ string tag = t.key + "=" + t.value;
auto titer = tag_refs.find(tag);
if (titer != tag_refs.end() &&
ppipe->params.priority > titer->second->params.priority) {
}
}
-void RGWBucketSyncFlowManager::pipe_set::finish_init()
+#warning add support for tags
+bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
+ const vector<string>& tags,
+ rgw_sync_pipe_params *params) const
{
- for (auto& entry : rules) {
- entry.second.finish_init();
+ auto iter = prefix_refs.lower_bound(key.name);
+ if (iter == prefix_refs.end()) {
+ return false;
}
-}
-
-void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
-{
- auto prefix = ppipe->params.filter.prefix.value_or(string());
- auto iter = prefix_refs.lower_bound(prefix);
- while (iter != prefix_refs.end()) {
- auto cur_iter = iter++;
+ auto max = prefix_refs.end();
- auto& cur_pipe = *cur_iter->second;
- auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string());
+ std::optional<int> priority;
- if (!boost::starts_with(cur_prefix, prefix)) {
- return;
+ for (; iter != prefix_refs.end(); ++iter) {
+ auto& prefix = iter->first;
+ if (!boost::starts_with(key.name, prefix)) {
+ break;
}
- if (cur_pipe.params.priority > ppipe->params.priority) {
+ auto& rule_params = iter->second->params;
+ auto& filter = rule_params.filter;
+
+ if (!filter.check_tags(tags)) {
continue;
}
- prefix_refs.erase(cur_iter);
+ if (rule_params.priority > priority) {
+ priority = rule_params.priority;
+ max = iter;
+ }
+ }
+
+ if (max == prefix_refs.end()) {
+ return false;
}
+
+ *params = max->second->params;
+ return true;
}
-void RGWBucketSyncFlowManager::pipe_rules::finish_init()
+/*
+ * return either the current prefix for s, or the next one if s is not within a prefix
+ */
+
+RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const
{
- /* go from the bigger prefixes to the shorter ones, this way we know that we covered all
- * overlapping prefixes
- */
- for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) {
- resolve_prefix(iter->second);
+ if (prefix_refs.empty()) {
+ return prefix_refs.end();
+ }
+ auto next = prefix_refs.upper_bound(s);
+ auto iter = next;
+ if (iter != prefix_refs.begin()) {
+ --iter;
+ }
+ if (!boost::starts_with(s, iter->first)) {
+ return next;
}
+
+ return iter;
}
void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
- auto ppipe = &pipe_map[pipe.id];
- *ppipe = pipe;
+ pipe_map[pipe.id] = pipe;
+
+ auto& rules_ref = rules[endpoints_pair(pipe)];
- auto prules = &rules[endpoints_pair(*ppipe)];
+ if (!rules_ref) {
+ rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>();
+ }
- prules->insert(ppipe);
+ rules_ref->insert(pipe);
- pipe_handler h(prules, pipe);
+ pipe_handler h(rules_ref, pipe);
handlers.insert(h);
}
dest_pipes->insert(pipe);
}
}
-
- source_pipes->finish_init();
- dest_pipes->finish_init();
}
}
_sources[*new_pipe.source.zone].insert(new_pipe);
}
- for (auto& s : _sources) {
- s.second.finish_init();
- }
for (auto& entry : _targets_by_name.pipe_map) {
auto& pipe = entry.second;
}
_targets[*new_pipe.dest.zone].insert(new_pipe);
}
- for (auto& t : _targets) {
- t.second.finish_init();
- }
if (psources_by_name) {
*psources_by_name = std::move(_sources_by_name);
* pipe_rules: deal with a set of pipes that have common endpoints_pair
*/
class pipe_rules {
- void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
+ std::vector<rgw_sync_bucket_pipe> pipes;
public:
- std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
+ using prefix_map_t = multimap<string, rgw_sync_bucket_pipe *>;
- map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
- map<string, rgw_sync_bucket_pipe *> prefix_refs;
+ map<string, rgw_sync_bucket_pipe *> tag_refs;
+ prefix_map_t prefix_refs;
- void insert(rgw_sync_bucket_pipe *pipe);
+ void insert(const rgw_sync_bucket_pipe& pipe);
+
+ bool find_obj_params(const rgw_obj_key& key,
+ const vector<string>& tags,
+ rgw_sync_pipe_params *params) const;
+
+ void scan_prefixes(std::vector<string> *prefixes) const;
- void finish_init();
+ prefix_map_t::const_iterator prefix_begin() const {
+ return prefix_refs.begin();
+ }
+ prefix_map_t::const_iterator prefix_search(const std::string& s) const;
+ prefix_map_t::const_iterator prefix_end() const {
+ return prefix_refs.end();
+ }
};
+ using pipe_rules_ref = std::shared_ptr<pipe_rules>;
+
/*
* pipe_handler: extends endpoints_rule to point at the corresponding rules handler
*/
struct pipe_handler : public endpoints_pair {
- pipe_rules *rules;
+ pipe_rules_ref rules;
pipe_handler() {}
- pipe_handler(pipe_rules *_rules,
+ pipe_handler(pipe_rules_ref& _rules,
const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
rules(_rules) {}
bool specific() const {
return source.specific() && dest.specific();
}
+
+ bool find_obj_params(const rgw_obj_key& key,
+ const std::vector<string>& tags,
+ rgw_sync_pipe_params *params) const {
+ if (!rules) {
+ return false;
+ }
+ return rules->find_obj_params(key, tags, params);
+ }
};
struct pipe_set {
- std::map<endpoints_pair, pipe_rules> rules;
+ std::map<endpoints_pair, pipe_rules_ref> rules;
std::map<string, rgw_sync_bucket_pipe> pipe_map;
std::set<pipe_handler> handlers;
}
void insert(const rgw_sync_bucket_pipe& pipe);
- void finish_init();
iterator begin() const {
return handlers.begin();
};
-ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
+static inline ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
os << e.dest << " -> " << e.source;
return os;
}
rgw_zone_set zones_trace;
RGWSyncTraceNodeRef tn;
+
+ struct _prefix_handler {
+ RGWBucketSyncFlowManager::pipe_rules_ref rules;
+ RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter;
+ std::optional<string> cur_prefix;
+
+ void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref& _rules) {
+ rules = _rules;
+ }
+
+ bool revalidate_marker(rgw_obj_key *marker) {
+ if (cur_prefix &&
+ boost::starts_with(marker->name, *cur_prefix)) {
+ return true;
+ }
+ if (!rules) {
+ return false;
+ }
+ iter = rules->prefix_search(marker->name);
+ if (iter == rules->prefix_end()) {
+ return false;
+ }
+ cur_prefix = iter->first;
+ marker->name = *cur_prefix;
+ marker->instance.clear();
+ return true;
+ }
+
+ bool check_key(const rgw_obj_key& key) {
+ if (!rules) {
+ return false;
+ }
+ if (cur_prefix &&
+ boost::starts_with(key.name, *cur_prefix)) {
+ return true;
+ }
+ iter = rules->prefix_search(key.name);
+ if (iter == rules->prefix_end()) {
+ return false;
+ }
+ cur_prefix = iter->first;
+ return boost::starts_with(key.name, iter->first);
+ }
+ } prefix_handler;
+
public:
RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
SSTR(bucket_shard_str{bs}))) {
zones_trace.insert(sc->source_zone);
marker_tracker.set_tn(tn);
+ prefix_handler.set_rules(sync_pipe.get_rules());
}
int operate() override;
}
set_status("listing remote bucket");
tn->log(20, "listing bucket for full sync");
+
+ if (!prefix_handler.revalidate_marker(&list_marker)) {
+ break;
+ }
+
yield call(new RGWListBucketShardCR(sc, bs, list_marker,
&list_result));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(20, SSTR("[full sync] syncing object: "
<< bucket_shard_str{bs} << "/" << entries_iter->key));
entry = &(*entries_iter);
- total_entries++;
list_marker = entries_iter->key;
+ if (!prefix_handler.check_key(entries_iter->key)) {
+ continue;
+ }
+ total_entries++;
if (!marker_tracker.start(entry->key, total_entries, real_time())) {
tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
} else {
+#warning look in here
using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
yield spawn(new SyncCR(sc, sync_pipe, entry->key,
false, /* versioned, only matters for object removal */
rgw_bucket_sync_pair_info info;
RGWBucketInfo source_bucket_info;
RGWBucketInfo dest_bucket_info;
+
+ RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
+ return info.handler.rules;
+ }
};
inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
return true;
}
+bool rgw_sync_pipe_filter_tag::operator==(const string& s) const
+{
+ if (s.empty()) {
+ return false;
+ }
+
+ auto pos = s.find('=');
+ if (pos == string::npos) {
+ return value.empty() && (s == key);
+ }
+
+ return s.compare(0, pos, s) == 0 &&
+ s.compare(pos + 1, s.size() - pos - 1, value) == 0;
+}
+
void rgw_sync_pipe_filter::encode(bufferlist& bl) const
{
ENCODE_START(1, 1, bl);
return true;
}
+bool rgw_sync_pipe_filter::check_tag(const string& s) const
+{
+ if (tags.empty()) { /* tag filter wasn't defined */
+ return true;
+ }
+
+ for (auto& t : tags) {
+ if (t == s) {
+ return true;
+ }
+ }
+ return false;
+}
+
+bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const
+{
+ for (auto& t : tags) {
+ if (check_tag(t)) {
+ return true;
+ }
+ }
+ return false;
+}
+
void rgw_sync_bucket_entity::apply_bucket(std::optional<rgw_bucket> b)
{
if (!b) {
}
return (value < t.value);
}
+
+ bool operator==(const string& s) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag)
void decode_json(JSONObj *obj);
bool is_subset_of(const rgw_sync_pipe_filter& f) const;
+
+ bool check_tag(const string& s) const;
+ bool check_tags(const std::vector<string>& tags) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)