]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: full sync checks for rules prefixes
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 12 Nov 2019 23:50:00 +0000 (15:50 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h

index 144cd640b7a5759a60bc75b9096f1e57612fc0bb..8ec6001bd03c0405ad1a58b7bd74a39b1ddc8873 100644 (file)
@@ -267,13 +267,17 @@ vector<rgw_sync_bucket_pipe> rgw_sync_group_pipe_map::find_pipes(const string& s
   return vector<rgw_sync_bucket_pipe>();
 }
 
-void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe)
+void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pipe)
 {
+  pipes.push_back(pipe);
+
+  auto ppipe = &pipes.back();
   auto prefix = ppipe->params.filter.prefix.value_or(string());
 
-  prefix_by_size.insert(make_pair(prefix.size(), ppipe));
+  prefix_refs.insert(make_pair(prefix, ppipe));
 
-  for (auto& tag : ppipe->params.filter.tags) {
+  for (auto& t : ppipe->params.filter.tags) {
+    string tag = t.key + "=" + t.value;
     auto titer = tag_refs.find(tag);
     if (titer != tag_refs.end() &&
         ppipe->params.priority > titer->second->params.priority) {
@@ -284,55 +288,80 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(rgw_sync_bucket_pipe *ppipe)
   }
 }
 
-void RGWBucketSyncFlowManager::pipe_set::finish_init()
+#warning add support for tags
+bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
+                                                           const vector<string>& tags,
+                                                           rgw_sync_pipe_params *params) const
 {
-  for (auto& entry : rules) {
-    entry.second.finish_init();
+  auto iter = prefix_refs.lower_bound(key.name);
+  if (iter == prefix_refs.end()) {
+    return false;
   }
-}
-
-void RGWBucketSyncFlowManager::pipe_rules::resolve_prefix(rgw_sync_bucket_pipe *ppipe)
-{
-  auto prefix = ppipe->params.filter.prefix.value_or(string());
-  auto iter = prefix_refs.lower_bound(prefix);
 
-  while (iter != prefix_refs.end()) {
-    auto cur_iter = iter++;
+  auto max = prefix_refs.end();
 
-    auto& cur_pipe = *cur_iter->second;
-    auto cur_prefix = cur_pipe.params.filter.prefix.value_or(string());
+  std::optional<int> priority;
 
-    if (!boost::starts_with(cur_prefix, prefix)) {
-      return;
+  for (; iter != prefix_refs.end(); ++iter) {
+    auto& prefix = iter->first;
+    if (!boost::starts_with(key.name, prefix)) {
+      break;
     }
 
-    if (cur_pipe.params.priority > ppipe->params.priority) {
+    auto& rule_params = iter->second->params;
+    auto& filter = rule_params.filter;
+
+    if (!filter.check_tags(tags)) {
       continue;
     }
 
-    prefix_refs.erase(cur_iter);
+    if (rule_params.priority > priority) {
+      priority = rule_params.priority;
+      max = iter;
+    }
+  }
+
+  if (max == prefix_refs.end()) {
+    return false;
   }
+
+  *params = max->second->params;
+  return true;
 }
 
-void RGWBucketSyncFlowManager::pipe_rules::finish_init()
+/*
+ * return either the current prefix for s, or the next one if s is not within a prefix
+ */
+
+RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator RGWBucketSyncFlowManager::pipe_rules::prefix_search(const std::string& s) const
 {
-  /* go from the bigger prefixes to the shorter ones, this way we know that we covered all
-   * overlapping prefixes
-   */
-  for (auto iter = prefix_by_size.rbegin(); iter != prefix_by_size.rend(); ++iter) {
-    resolve_prefix(iter->second);
+  if (prefix_refs.empty()) {
+    return prefix_refs.end();
+  }
+  auto next = prefix_refs.upper_bound(s);
+  auto iter = next;
+  if (iter != prefix_refs.begin()) {
+    --iter;
+  }
+  if (!boost::starts_with(s, iter->first)) {
+    return next;
   }
+
+  return iter;
 }
 
 void RGWBucketSyncFlowManager::pipe_set::insert(const rgw_sync_bucket_pipe& pipe) {
-  auto ppipe = &pipe_map[pipe.id];
-  *ppipe = pipe;
+  pipe_map[pipe.id] = pipe;
+
+  auto& rules_ref = rules[endpoints_pair(pipe)];
 
-  auto prules = &rules[endpoints_pair(*ppipe)];
+  if (!rules_ref) {
+    rules_ref = make_shared<RGWBucketSyncFlowManager::pipe_rules>();
+  }
 
-  prules->insert(ppipe);
+  rules_ref->insert(pipe);
 
-  pipe_handler h(prules, pipe);
+  pipe_handler h(rules_ref, pipe);
 
   handlers.insert(h);
 }
@@ -460,9 +489,6 @@ void RGWBucketSyncFlowManager::reflect(std::optional<rgw_bucket> effective_bucke
       dest_pipes->insert(pipe);
     }
   }
-
-  source_pipes->finish_init();
-  dest_pipes->finish_init();
 }
 
 
@@ -632,9 +658,6 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
     }
     _sources[*new_pipe.source.zone].insert(new_pipe);
   }
-  for (auto& s : _sources) {
-    s.second.finish_init();
-  }
 
   for (auto& entry : _targets_by_name.pipe_map) {
     auto& pipe = entry.second;
@@ -649,9 +672,6 @@ void RGWBucketSyncPolicyHandler::reflect(RGWBucketSyncFlowManager::pipe_set *pso
     }
     _targets[*new_pipe.dest.zone].insert(new_pipe);
   }
-  for (auto& t : _targets) {
-    t.second.finish_init();
-  }
 
   if (psources_by_name) {
     *psources_by_name = std::move(_sources_by_name);
index 46a5a58952be286a6f06342d7a3b63e06411da6e..23446bffadda50b61b72be9a9da04436f6a2c94c 100644 (file)
@@ -136,36 +136,59 @@ public:
    * pipe_rules: deal with a set of pipes that have common endpoints_pair
    */
   class pipe_rules {
-    void resolve_prefix(rgw_sync_bucket_pipe *ppipe);
+    std::vector<rgw_sync_bucket_pipe> pipes;
 
   public:
-    std::multimap<size_t, rgw_sync_bucket_pipe *> prefix_by_size;
+    using prefix_map_t = multimap<string, rgw_sync_bucket_pipe *>;
 
-    map<rgw_sync_pipe_filter_tag, rgw_sync_bucket_pipe *> tag_refs;
-    map<string, rgw_sync_bucket_pipe *> prefix_refs;
+    map<string, rgw_sync_bucket_pipe *> tag_refs;
+    prefix_map_t prefix_refs;
 
-    void insert(rgw_sync_bucket_pipe *pipe);
+    void insert(const rgw_sync_bucket_pipe& pipe);
+
+    bool find_obj_params(const rgw_obj_key& key, 
+                         const vector<string>& tags,
+                         rgw_sync_pipe_params *params) const;
+
+    void scan_prefixes(std::vector<string> *prefixes) const;
 
-    void finish_init();
+    prefix_map_t::const_iterator prefix_begin() const {
+      return prefix_refs.begin();
+    }
+    prefix_map_t::const_iterator prefix_search(const std::string& s) const;
+    prefix_map_t::const_iterator prefix_end() const {
+      return prefix_refs.end();
+    }
   };
 
+  using pipe_rules_ref = std::shared_ptr<pipe_rules>;
+
   /*
    * pipe_handler: extends endpoints_rule to point at the corresponding rules handler
    */
   struct pipe_handler : public endpoints_pair {
-    pipe_rules *rules;
+    pipe_rules_ref rules;
 
     pipe_handler() {}
-    pipe_handler(pipe_rules *_rules,
+    pipe_handler(pipe_rules_ref& _rules,
                  const rgw_sync_bucket_pipe& _pipe) : endpoints_pair(_pipe),
                                                       rules(_rules) {}
     bool specific() const {
       return source.specific() && dest.specific();
     }
+    
+    bool find_obj_params(const rgw_obj_key& key,
+                         const std::vector<string>& tags,
+                         rgw_sync_pipe_params *params) const {
+      if (!rules) {
+        return false;
+      }
+      return rules->find_obj_params(key, tags, params);
+    }
   };
 
   struct pipe_set {
-    std::map<endpoints_pair, pipe_rules> rules;
+    std::map<endpoints_pair, pipe_rules_ref> rules;
     std::map<string, rgw_sync_bucket_pipe> pipe_map;
 
     std::set<pipe_handler> handlers;
@@ -179,7 +202,6 @@ public:
     }
 
     void insert(const rgw_sync_bucket_pipe& pipe);
-    void finish_init();
 
     iterator begin() const {
       return handlers.begin();
@@ -229,7 +251,7 @@ public:
 
 };
 
-ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
+static inline ostream& operator<<(ostream& os, const RGWBucketSyncFlowManager::endpoints_pair& e) {
   os << e.dest << " -> " << e.source;
   return os;
 }
index dea3cec3cad60b6e77db3f905daff75d77a19cfc..7f4f82357fa1222260e3cd08a3630efeb51db590 100644 (file)
@@ -3144,6 +3144,51 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
   rgw_zone_set zones_trace;
 
   RGWSyncTraceNodeRef tn;
+
+  struct _prefix_handler {
+    RGWBucketSyncFlowManager::pipe_rules_ref rules;
+    RGWBucketSyncFlowManager::pipe_rules::prefix_map_t::const_iterator iter;
+    std::optional<string> cur_prefix;
+
+    void set_rules(RGWBucketSyncFlowManager::pipe_rules_ref& _rules) {
+      rules = _rules;
+    }
+
+    bool revalidate_marker(rgw_obj_key *marker) {
+      if (cur_prefix &&
+          boost::starts_with(marker->name, *cur_prefix)) {
+        return true;
+      }
+      if (!rules) {
+        return false;
+      }
+      iter = rules->prefix_search(marker->name);
+      if (iter == rules->prefix_end()) {
+        return false;
+      }
+      cur_prefix = iter->first;
+      marker->name = *cur_prefix;
+      marker->instance.clear();
+      return true;
+    }
+
+    bool check_key(const rgw_obj_key& key) {
+      if (!rules) {
+        return false;
+      }
+      if (cur_prefix &&
+          boost::starts_with(key.name, *cur_prefix)) {
+        return true;
+      }
+      iter = rules->prefix_search(key.name);
+      if (iter == rules->prefix_end()) {
+        return false;
+      }
+      cur_prefix = iter->first;
+      return boost::starts_with(key.name, iter->first);
+    }
+  } prefix_handler;
+
 public:
   RGWBucketShardFullSyncCR(RGWDataSyncCtx *_sc,
                            rgw_bucket_sync_pipe& _sync_pipe,
@@ -3160,6 +3205,7 @@ public:
                                          SSTR(bucket_shard_str{bs}))) {
     zones_trace.insert(sc->source_zone);
     marker_tracker.set_tn(tn);
+    prefix_handler.set_rules(sync_pipe.get_rules());
   }
 
   int operate() override;
@@ -3179,6 +3225,11 @@ int RGWBucketShardFullSyncCR::operate()
       }
       set_status("listing remote bucket");
       tn->log(20, "listing bucket for full sync");
+
+      if (!prefix_handler.revalidate_marker(&list_marker)) {
+        break;
+      }
+
       yield call(new RGWListBucketShardCR(sc, bs, list_marker,
                                           &list_result));
       if (retcode < 0 && retcode != -ENOENT) {
@@ -3198,11 +3249,15 @@ int RGWBucketShardFullSyncCR::operate()
         tn->log(20, SSTR("[full sync] syncing object: "
             << bucket_shard_str{bs} << "/" << entries_iter->key));
         entry = &(*entries_iter);
-        total_entries++;
         list_marker = entries_iter->key;
+        if (!prefix_handler.check_key(entries_iter->key)) {
+          continue;
+        }
+        total_entries++;
         if (!marker_tracker.start(entry->key, total_entries, real_time())) {
           tn->log(0, SSTR("ERROR: cannot start syncing " << entry->key << ". Duplicate entry?"));
         } else {
+#warning look in here
           using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
           yield spawn(new SyncCR(sc, sync_pipe, entry->key,
                                  false, /* versioned, only matters for object removal */
index 4ff43c0ed7ccabc661b7d36218e2535a98bd9c27..4ea134400686fdcbb6ccb3207c48ee37b99beff0 100644 (file)
@@ -44,6 +44,10 @@ struct rgw_bucket_sync_pipe {
   rgw_bucket_sync_pair_info info;
   RGWBucketInfo source_bucket_info;
   RGWBucketInfo dest_bucket_info;
+
+  RGWBucketSyncFlowManager::pipe_rules_ref& get_rules() {
+    return info.handler.rules;
+  }
 };
 
 inline ostream& operator<<(ostream& out, const rgw_bucket_sync_pipe& p) {
index 47763e286c0e2a1abd5c1efead02982b358914c4..5d0cad7e1e48f268b8fb6ece6dbb3f2c25c333b2 100644 (file)
@@ -31,6 +31,21 @@ bool rgw_sync_pipe_filter_tag::from_str(const string& s)
   return true;
 }
 
+bool rgw_sync_pipe_filter_tag::operator==(const string& s) const
+{
+  if (s.empty()) {
+    return false;
+  }
+
+  auto pos = s.find('=');
+  if (pos == string::npos) {
+    return value.empty() && (s == key);
+  }
+
+  return s.compare(0, pos, s) == 0 &&
+         s.compare(pos + 1, s.size() - pos - 1, value) == 0;
+}
+
 void rgw_sync_pipe_filter::encode(bufferlist& bl) const
 {
   ENCODE_START(1, 1, bl);
@@ -100,6 +115,30 @@ bool rgw_sync_pipe_filter::is_subset_of(const rgw_sync_pipe_filter& f) const
   return true;
 }
 
+bool rgw_sync_pipe_filter::check_tag(const string& s) const
+{
+  if (tags.empty()) { /* tag filter wasn't defined */
+    return true;
+  }
+
+  for (auto& t : tags) {
+    if (t == s) {
+      return true;
+    }
+  }
+  return false;
+}
+
+bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const
+{
+  for (auto& t : tags) {
+    if (check_tag(t)) {
+      return true;
+    }
+  }
+  return false;
+}
+
 void rgw_sync_bucket_entity::apply_bucket(std::optional<rgw_bucket> b)
 {
   if (!b) {
index c61ce95bc6f2d2a94177d175d15568c74a4cd2fb..d312f7e790845c1062f87170272413a752f718da 100644 (file)
@@ -208,6 +208,8 @@ struct rgw_sync_pipe_filter_tag {
     }
     return (value < t.value);
   }
+
+  bool operator==(const string& s) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter_tag)
 
@@ -227,6 +229,9 @@ struct rgw_sync_pipe_filter {
   void decode_json(JSONObj *obj);
 
   bool is_subset_of(const rgw_sync_pipe_filter& f) const;
+
+  bool check_tag(const string& s) const;
+  bool check_tags(const std::vector<string>& tags) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)