]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: pipe rules handler
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 12 Nov 2019 00:19:56 +0000 (16:19 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_json_enc.cc
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h

index e20d26da19e1b99dc29238f4ccd439015186343d..79a3f65ed0b255547acbf34572d6c2c2bbc31d9f 100644 (file)
@@ -267,6 +267,58 @@ vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& s
   return vector<rgw_sync_bucket_pipe>();
 }
 
+void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
+{
+  auto ppipe = &pipe_map[pipe.id];
+
+  auto prefix = pipe.params.filter.prefix.value_or(string());
+
+  prefix_by_size.insert(make_pair(prefix.size(), ppipe));
+
+  for (auto& tag : pipe.params.filter.tags) {
+    auto titer = tag_refs.find(tag);
+    if (titer != tag_refs.end() &&
+        pipe.params.priority > titer->second->params.priority) {
+      titer->second = ppipe;
+    } else {
+      tag_refs[tag] = ppipe;
+    }
+  }
+}
+
+void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
+{
+  auto prefix = ppipe->params.filter.prefix.value_or(string());
+  auto iter = prefix_refs.lower_bound(prefix);
+
+  while (iter != prefix_refs.end()) {
+    auto cur_iter = iter++;
+
+    auto& cur_pipe = *cur_iter->second;
+    auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string());
+
+    if (!boost::starts_with(cur_prefix, prefix)) {
+      return;
+    }
+
+    if (cur_pipe.params.priority > ppipe->params.priority) {
+      continue;
+    }
+
+    prefix_refs.erase(cur_iter);
+  }
+}
+
+void RGWBucketSyncFlowManager::pipe_rules::finish_init()
+{
+  /* go from the bigger prefixes to the shorter ones, this way we know that we covered all
+   * overlapping prefixes
+   */
+  for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) {
+    resolve_prefix(iter->second);
+  }
+}
+
 void RGWBucketSyncFlowManager::pipe_set::dump(ceph::Formatter *f) const
 {
   encode_json("pipes", pipes, f);
index d413ab04e50396967d40c6a1dd753dd2fa9d36d7..7dcb757eccd9f1e43675ed94a87528fc7f6e4c8e 100644 (file)
@@ -111,7 +111,45 @@ public:
 class RGWBucketSyncFlowManager {
   friend class RGWBucketSyncPolicyHandler;
 public:
+  struct endpoints_pair {
+    rgw_sync_bucket_entity source;
+    rgw_sync_bucket_entity dest;
+
+    endpoints_pair() {}
+    endpoints_pair(const rgw_sync_bucket_pipe& pipe) {
+      source = pipe.source;
+      dest = pipe.dest;
+    }
+
+    bool operator<(const endpoints_pair& e) const {
+      if (source < e.source) {
+        return true;
+      }
+      if (e.source < source) {
+        return false;
+      }
+      return (dest < e.dest);
+    }
+  };
+
+  class pipe_rules {
+    void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
+
+  public:
+    std::map<string, rgw_sync_bucket_pipe> pipe_map; /* id to pipe */
+
+    std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
+
+    map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
+    map<string, rgw_sync_bucket_pipe *> prefix_refs;
+
+    void insert(const rgw_sync_bucket_pipe& pipe);
+
+    void finish_init();
+  };
+
   struct pipe_set {
+    std::map<endpoints_pair, pipe_rules> rules;
     std::set<rgw_sync_bucket_pipe> pipes;
 
     using iterator = std::set<rgw_sync_bucket_pipe>::iterator;
index bbc4bc6717540f3ab7a01644bdf4cf09a396414c..7659877d7210f6a5af9d3de12c987076e19a60cb 100644 (file)
@@ -1277,7 +1277,6 @@ class RGWDataSyncSingleEntryCR : public RGWCoroutine {
   string entry_marker;
 
   rgw_bucket_shard source_bs;
-  rgw_bucket_sync_pair_info sync_pair;
 
   int sync_status;
 
@@ -3765,38 +3764,12 @@ RGWRunBucketSourcesSyncCR::RGWRunBucketSourcesSyncCR(RGWDataSyncCtx *_sc,
 int RGWRunBucketSourcesSyncCR::operate()
 {
   reenter(this) {
-#if 0
-    yield {
-      set_status("acquiring sync lock");
-      auto store = sync_env->store;
-      lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
-                                              sources_obj,
-                                              "sync_lock",
-                                              cct->_conf->rgw_sync_lease_period,
-                                              this));
-      lease_stack.reset(spawn(lease_cr.get(), false));
-    }
-    while (!lease_cr->is_locked()) {
-      if (lease_cr->is_done()) {
-        tn->log(5, "failed to take lease");
-        set_status("lease lock failed, early abort");
-        return set_cr_error(lease_cr->get_ret_status());
-      }
-      set_sleeping(true);
-      yield;
-    }
-
-    tn->log(10, "took lease");
-#endif
     yield call(new RGWGetBucketPeersCR(sync_env, target_bucket, sc->source_zone, source_bucket, &pipes, tn));
     if (retcode < 0 && retcode != -ENOENT) {
       tn->log(0, "ERROR: failed to read sync status for bucket");
-#if 0
-      lease_cr->go_down();
-      drain_all();
-#endif
       return set_cr_error(retcode);
     }
+
     ldpp_dout(sync_env->dpp, 20) << __func__ << "(): requested source_bs=" << source_bs << " target_bs=" << target_bs << dendl;
 
     if (pipes.empty()) {
index 06c93880571435ee88f7959d598e86c578267bb7..4a6abb6d51350f1b8635d833e00e65951e9141d0 100644 (file)
@@ -886,13 +886,13 @@ void rgw_sync_bucket_entity::decode_json(JSONObj *obj)
   }
 }
 
-void rgw_sync_pipe_filter::_tag::dump(Formatter *f) const
+void rgw_sync_pipe_filter_tag::dump(Formatter *f) const
 {
   encode_json("key", key, f);
   encode_json("value", value, f);
 }
 
-void rgw_sync_pipe_filter::_tag::decode_json(JSONObj *obj)
+void rgw_sync_pipe_filter_tag::decode_json(JSONObj *obj)
 {
   JSONDecoder::decode_json("key", key, obj);
   JSONDecoder::decode_json("value", value, obj);
index cbc979df789c0d4d5559262442b1afcc50390441..47763e286c0e2a1abd5c1efead02982b358914c4 100644 (file)
@@ -11,7 +11,7 @@ string rgw_sync_bucket_entity::bucket_key() const
   return rgw_sync_bucket_entities::bucket_key(bucket);
 }
 
-bool rgw_sync_pipe_filter::_tag::from_str(const string& s)
+bool rgw_sync_pipe_filter_tag::from_str(const string& s)
 {
   if (s.empty()) {
     return false;
@@ -61,20 +61,45 @@ void rgw_sync_pipe_filter::set_tags(std::list<std::string>& tags_add,
                                     std::list<std::string>& tags_rm)
 {
   for (auto& t : tags_rm) {
-    _tag tag;
+    rgw_sync_pipe_filter_tag tag;
     if (tag.from_str(t)) {
       tags.erase(tag);
     }
   }
 
   for (auto& t : tags_add) {
-    _tag tag;
+    rgw_sync_pipe_filter_tag tag;
     if (tag.from_str(t)) {
       tags.insert(tag);
     }
   }
 }
 
+bool rgw_sync_pipe_filter::is_subset_of(const rgw_sync_pipe_filter& f) const
+{
+  if (f.prefix) {
+    if (!prefix) {
+      return false;
+    }
+    /* f.prefix exists, and this->prefix is either equal or bigger,
+     * therefore this->prefix also set */
+
+    if (!boost::starts_with(*prefix, *f.prefix)) {
+      return false;
+    }
+  }
+
+  /* prefix is subset, now check tags. All our tags should exist in f.tags */
+
+  for (auto& t : tags) {
+    if (f.tags.find(t) == f.tags.end()) {
+      return false;
+    }
+  }
+
+  return true;
+}
+
 void rgw_sync_bucket_entity::apply_bucket(std::optional<rgw_bucket> b)
 {
   if (!b) {
index 0fca7408fa59fd4cede3aeb0cfdc7ef03f6065fd..c61ce95bc6f2d2a94177d175d15568c74a4cd2fb 100644 (file)
@@ -176,43 +176,44 @@ struct rgw_sync_bucket_entity {
 };
 WRITE_CLASS_ENCODER(rgw_sync_bucket_entity)
 
-struct rgw_sync_pipe_filter {
-  struct _tag {
-    string key;
-    string value;
-
-    void encode(bufferlist& bl) const {
-      ENCODE_START(1, 1, bl);
-      encode(key, bl);
-      encode(value, bl);
-      ENCODE_FINISH(bl);
-    }
+struct rgw_sync_pipe_filter_tag {
+  string key;
+  string value;
 
-    void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(1, bl);
-      decode(key, bl);
-      decode(value, bl);
-      DECODE_FINISH(bl);
-    }
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(key, bl);
+    encode(value, bl);
+    ENCODE_FINISH(bl);
+  }
 
-    void dump(ceph::Formatter *f) const;
-    void decode_json(JSONObj *obj);
+  void decode(bufferlist::const_iterator& bl) {
+    DECODE_START(1, bl);
+    decode(key, bl);
+    decode(value, bl);
+    DECODE_FINISH(bl);
+  }
 
-    bool from_str(const string& s);
+  void dump(ceph::Formatter *f) const;
+  void decode_json(JSONObj *obj);
+
+  bool from_str(const string& s);
 
-    bool operator<(const _tag& t) const {
-      if (key < t.key) {
-        return true;
-      }
-      if (t.key < key) {
-        return false;
-      }
-      return (value < t.value);
+  bool operator<(const rgw_sync_pipe_filter_tag& t) const {
+    if (key < t.key) {
+      return true;
+    }
+    if (t.key < key) {
+      return false;
     }
-  };
+    return (value < t.value);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag)
 
+struct rgw_sync_pipe_filter {
   std::optional<string> prefix;
-  std::set<_tag> tags;
+  std::set<rgw_sync_pipe_filter_tag> tags;
 
   void set_prefix(std::optional<std::string> opt_prefix,
                   bool prefix_rm);
@@ -224,9 +225,10 @@ struct rgw_sync_pipe_filter {
 
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);
+
+  bool is_subset_of(const rgw_sync_pipe_filter& f) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)
-WRITE_CLASS_ENCODER(rgw_sync_pipe_filter::_tag)
 
 struct rgw_sync_pipe_acl_translation {
   rgw_user owner;