]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: data sync: incremental prefixes
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 13 Nov 2019 04:50:11 +0000 (20:50 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:38 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc

index bed629ee5c1832f2d52ef13d2d6f94961e96dfcb..a78b6daf06a96f6fc910df3f6806dc1c20342c3b 100644 (file)
@@ -3124,6 +3124,8 @@ int main(int argc, const char **argv)
   std::optional<string> opt_prefix;
   std::optional<string> opt_prefix_rm;
 
+  std::optional<int> opt_priority;
+
   rgw::notify::EventTypeList event_types;
 
   SimpleCmd cmd(all_cmds, cmd_aliases);
@@ -3520,6 +3522,8 @@ int main(int argc, const char **argv)
       opt_prefix = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--prefix-rm", (char*)NULL)) {
       opt_prefix_rm = val;
+    } else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) {
+      opt_priority = atoi(val.c_str());
     } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
       // do nothing
     } else if (strncmp(*i, "-", 1) == 0) {
@@ -8183,6 +8187,9 @@ next:
 
     pipe->params.filter.set_prefix(opt_prefix, !!opt_prefix_rm);
     pipe->params.filter.set_tags(tags_add, tags_rm);
+    if (opt_priority) {
+      pipe->params.priority = *opt_priority;
+    }
 
     ret = sync_policy_ctx.write_policy();
     if (ret < 0) {
index 7f4f82357fa1222260e3cd08a3630efeb51db590..ce212ba8f01c97bb56f9f1c2bd5b03fd9da46ac3 100644 (file)
@@ -3172,7 +3172,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine {
       return true;
     }
 
-    bool check_key(const rgw_obj_key& key) {
+    bool check_key_handled(const rgw_obj_key& key) {
       if (!rules) {
         return false;
       }
@@ -3227,6 +3227,8 @@ int RGWBucketShardFullSyncCR::operate()
       tn->log(20, "listing bucket for full sync");
 
       if (!prefix_handler.revalidate_marker(&list_marker)) {
+        set_status() << "finished iterating over all available prefixes: last marker=" << list_marker;
+        tn->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker));
         break;
       }
 
@@ -3250,7 +3252,9 @@ int RGWBucketShardFullSyncCR::operate()
             << bucket_shard_str{bs} << "/" << entries_iter->key));
         entry = &(*entries_iter);
         list_marker = entries_iter->key;
-        if (!prefix_handler.check_key(entries_iter->key)) {
+        if (!prefix_handler.check_key_handled(entries_iter->key)) {
+          set_status() << "skipping entry due to policy rules: " << entries_iter->key;
+          tn->log(20, SSTR("skipping entry due to policy rules: " << entries_iter->key));
           continue;
         }
         total_entries++;
@@ -3332,6 +3336,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
   rgw_bucket_sync_pipe& sync_pipe;
+  RGWBucketSyncFlowManager::pipe_rules_ref rules;
   rgw_bucket_shard& bs;
   boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
   list<rgw_bi_log_entry> list_result;
@@ -3351,6 +3356,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
   bool syncstopped{false};
 
   RGWSyncTraceNodeRef tn;
+
 public:
   RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
                                   rgw_bucket_sync_pipe& _sync_pipe,
@@ -3370,6 +3376,18 @@ public:
         << bucket_shard_str{bs};
     set_status("init");
     marker_tracker.set_tn(tn);
+    rules = sync_pipe.get_rules();
+  }
+
+  bool check_key_handled(const rgw_obj_key& key) {
+    if (!rules) {
+      return false;
+    }
+    auto iter = rules->prefix_search(key.name);
+    if (iter == rules->prefix_end()) {
+      return false;
+    }
+    return boost::starts_with(key.name, iter->first);
   }
 
   int operate() override;
@@ -3466,6 +3484,13 @@ int RGWBucketShardIncrementalSyncCR::operate()
           continue;
         }
 
+        if (!check_key_handled(key)) {
+          set_status() << "skipping entry due to policy rules: " << entry->object;
+          tn->log(20, SSTR("skipping entry due to policy rules: " << entry->object));
+          marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
+          continue;
+        }
+
         set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
         if (entry->op == CLS_RGW_OP_CANCEL) {
           set_status() << "canceled operation, skipping";