return vector<rgw_sync_bucket_pipe>();
}
+void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
+{
+ auto ppipe = &pipe_map[pipe.id];
+
+ auto prefix = pipe.params.filter.prefix.value_or(string());
+
+ prefix_by_size.insert(make_pair(prefix.size(), ppipe));
+
+ for (auto& tag : pipe.params.filter.tags) {
+ auto titer = tag_refs.find(tag);
+ if (titer != tag_refs.end() &&
+ pipe.params.priority > titer->second->params.priority) {
+ titer->second = ppipe;
+ } else {
+ tag_refs[tag] = ppipe;
+ }
+ }
+}
+
+void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
+{
+ auto prefix = ppipe->params.filter.prefix.value_or(string());
+ auto iter = prefix_refs.lower_bound(prefix);
+
+ while (iter != prefix_refs.end()) {
+ auto cur_iter = iter++;
+
+ auto& cur_pipe = *cur_iter->second;
+ auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string());
+
+ if (!boost::starts_with(cur_prefix, prefix)) {
+ return;
+ }
+
+ if (cur_pipe.params.priority > ppipe->params.priority) {
+ continue;
+ }
+
+ prefix_refs.erase(cur_iter);
+ }
+}
+
+void RGWBucketSyncFlowManager::pipe_rules::finish_init()
+{
+ /* go from the bigger prefixes to the shorter ones, this way we know that we covered all
+ * overlapping prefixes
+ */
+ for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) {
+ resolve_prefix(iter->second);
+ }
+}
+
void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
{
encode_json("pipes", pipes, f);
class RGWBucketSyncFlowManager {
friend class RGWBucketSyncPolicyHandler;
public:
+ struct endpoints_pair {
+ rgw_sync_bucket_entity source;
+ rgw_sync_bucket_entity dest;
+
+ endpoints_pair() {}
+ endpoints_pair(const rgw_sync_bucket_pipe& pipe) {
+ source = pipe.source;
+ dest = pipe.dest;
+ }
+
+ bool operator<(const endpoints_pair& e) const {
+ if (source < e.source) {
+ return true;
+ }
+ if (e.source < source) {
+ return false;
+ }
+ return (dest < e.dest);
+ }
+ };
+
+ class pipe_rules {
+ void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
+
+ public:
+ std::map<string, rgw_sync_bucket_pipe> pipe_map; /* id to pipe */
+
+ std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
+
+ map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
+ map<string, rgw_sync_bucket_pipe *> prefix_refs;
+
+ void insert(const rgw_sync_bucket_pipe& pipe);
+
+ void finish_init();
+ };
+
struct pipe_set {
+ std::map<endpoints_pair, pipe_rules> rules;
std::set<rgw_sync_bucket_pipe> pipes;
using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
string entry_marker;
rgw_bucket_shard source_bs;
- rgw_bucket_sync_pair_info sync_pair;
int sync_status;
int RGWRunBucketSourcesSyncCR::operate()
{
reenter(this) {
-#if 0
- yield {
- set_status("acquiring sync lock");
- auto store = sync_env->store;
- lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
- sources_obj,
- "sync_lock",
- cct->_conf->rgw_sync_lease_period,
- this));
- lease_stack.reset(spawn(lease_cr.get(), false));
- }
- while (!lease_cr->is_locked()) {
- if (lease_cr->is_done()) {
- tn->log(5, "failed to take lease");
- set_status("lease lock failed, early abort");
- return set_cr_error(lease_cr->get_ret_status());
- }
- set_sleeping(true);
- yield;
- }
-
- tn->log(10, "took lease");
-#endif
yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
if (retcode < 0 && retcode != -ENOENT) {
tn->log(0, "ERROR: failed to read sync status for bucket");
-#if 0
- lease_cr->go_down();
- drain_all();
-#endif
return set_cr_error(retcode);
}
+
ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
if (pipes.empty()) {
}
}
-void rgw_sync_pipe_filter::_tag::dump(Formatter *f) const
+void rgw_sync_pipe_filter_tag::dump(Formatter *f) const
{
encode_json("key", key, f);
encode_json("value", value, f);
}
-void rgw_sync_pipe_filter::_tag::decode_json(JSONObj *obj)
+void rgw_sync_pipe_filter_tag::decode_json(JSONObj *obj)
{
JSONDecoder::decode_json("key", key, obj);
JSONDecoder::decode_json("value", value, obj);
return rgw_sync_bucket_entities::bucket_key(bucket);
}
-bool rgw_sync_pipe_filter::_tag::from_str(const string& s)
+bool rgw_sync_pipe_filter_tag::from_str(const string& s)
{
if (s.empty()) {
return false;
std::list<std::string>& tags_rm)
{
for (auto& t : tags_rm) {
- _tag tag;
+ rgw_sync_pipe_filter_tag tag;
if (tag.from_str(t)) {
tags.erase(tag);
}
}
for (auto& t : tags_add) {
- _tag tag;
+ rgw_sync_pipe_filter_tag tag;
if (tag.from_str(t)) {
tags.insert(tag);
}
}
}
+bool rgw_sync_pipe_filter::is_subset_of(const rgw_sync_pipe_filter& f) const
+{
+ if (f.prefix) {
+ if (!prefix) {
+ return false;
+ }
+ /* f.prefix exists, and this->prefix is either equal or bigger,
+ * therefore this->prefix also set */
+
+ if (!boost::starts_with(*prefix, *f.prefix)) {
+ return false;
+ }
+ }
+
+ /* prefix is subset, now check tags. All our tags should exist in f.tags */
+
+ for (auto& t : tags) {
+ if (f.tags.find(t) == f.tags.end()) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
void rgw_sync_bucket_entity::apply_bucket(std::optional<rgw_bucket> b)
{
if (!b) {
};
WRITE_CLASS_ENCODER(rgw_sync_bucket_entity)
-struct rgw_sync_pipe_filter {
- struct _tag {
- string key;
- string value;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- encode(key, bl);
- encode(value, bl);
- ENCODE_FINISH(bl);
- }
+struct rgw_sync_pipe_filter_tag {
+ string key;
+ string value;
- void decode(bufferlist::const_iterator& bl) {
- DECODE_START(1, bl);
- decode(key, bl);
- decode(value, bl);
- DECODE_FINISH(bl);
- }
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(key, bl);
+ encode(value, bl);
+ ENCODE_FINISH(bl);
+ }
- void dump(ceph::Formatter *f) const;
- void decode_json(JSONObj *obj);
+ void decode(bufferlist::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(key, bl);
+ decode(value, bl);
+ DECODE_FINISH(bl);
+ }
- bool from_str(const string& s);
+ void dump(ceph::Formatter *f) const;
+ void decode_json(JSONObj *obj);
+
+ bool from_str(const string& s);
- bool operator<(const _tag& t) const {
- if (key < t.key) {
- return true;
- }
- if (t.key < key) {
- return false;
- }
- return (value < t.value);
+ bool operator<(const rgw_sync_pipe_filter_tag& t) const {
+ if (key < t.key) {
+ return true;
+ }
+ if (t.key < key) {
+ return false;
}
- };
+ return (value < t.value);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag)
+struct rgw_sync_pipe_filter {
std::optional<string> prefix;
- std::set<_tag> tags;
+ std::set<rgw_sync_pipe_filter_tag> tags;
void set_prefix(std::optional<std::string> opt_prefix,
bool prefix_rm);
void dump(ceph::Formatter *f) const;
void decode_json(JSONObj *obj);
+
+ bool is_subset_of(const rgw_sync_pipe_filter& f) const;
};
WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)
-WRITE_CLASS_ENCODER(rgw_sync_pipe_filter::_tag)
struct rgw_sync_pipe_acl_translation {
rgw_user owner;