From 8a6863f515c5f524959cbe13c4a9f39467ab8415 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 12 Nov 2019 20:50:11 -0800 Subject: [PATCH] rgw: data sync: incremental prefixes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 7 +++++++ src/rgw/rgw_data_sync.cc | 29 +++++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index bed629ee5c183..a78b6daf06a96 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -3124,6 +3124,8 @@ int main(int argc, const char **argv) std::optional opt_prefix; std::optional opt_prefix_rm; + std::optional opt_priority; + rgw::notify::EventTypeList event_types; SimpleCmd cmd(all_cmds, cmd_aliases); @@ -3520,6 +3522,8 @@ int main(int argc, const char **argv) opt_prefix = val; } else if (ceph_argparse_witharg(args, i, &val, "--prefix-rm", (char*)NULL)) { opt_prefix_rm = val; + } else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) { + opt_priority = atoi(val.c_str()); } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (strncmp(*i, "-", 1) == 0) { @@ -8183,6 +8187,9 @@ next: pipe->params.filter.set_prefix(opt_prefix, !!opt_prefix_rm); pipe->params.filter.set_tags(tags_add, tags_rm); + if (opt_priority) { + pipe->params.priority = *opt_priority; + } ret = sync_policy_ctx.write_policy(); if (ret < 0) { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 7f4f82357fa12..ce212ba8f01c9 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -3172,7 +3172,7 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { return true; } - bool check_key(const rgw_obj_key& key) { + bool check_key_handled(const rgw_obj_key& key) { if (!rules) { return false; } @@ -3227,6 +3227,8 @@ int RGWBucketShardFullSyncCR::operate() tn->log(20, "listing bucket for full sync"); if (!prefix_handler.revalidate_marker(&list_marker)) { + set_status() << "finished iterating over all available prefixes: last marker=" << list_marker; + tn->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker)); break; } @@ -3250,7 +3252,9 @@ int RGWBucketShardFullSyncCR::operate() << bucket_shard_str{bs} << "/" << entries_iter->key)); entry = &(*entries_iter); list_marker = entries_iter->key; - if (!prefix_handler.check_key(entries_iter->key)) { + if (!prefix_handler.check_key_handled(entries_iter->key)) { + set_status() << "skipping entry due to policy rules: " << entries_iter->key; + tn->log(20, SSTR("skipping entry due to policy rules: " << entries_iter->key)); continue; } total_entries++; @@ -3332,6 +3336,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWDataSyncCtx *sc; RGWDataSyncEnv *sync_env; rgw_bucket_sync_pipe& sync_pipe; + RGWBucketSyncFlowManager::pipe_rules_ref rules; rgw_bucket_shard& bs; boost::intrusive_ptr lease_cr; list list_result; @@ -3351,6 +3356,7 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { bool syncstopped{false}; RGWSyncTraceNodeRef tn; + public: RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc, rgw_bucket_sync_pipe& _sync_pipe, @@ -3370,6 +3376,18 @@ public: << bucket_shard_str{bs}; set_status("init"); marker_tracker.set_tn(tn); + rules = sync_pipe.get_rules(); + } + + bool check_key_handled(const rgw_obj_key& key) { + if (!rules) { + return false; + } + auto iter = rules->prefix_search(key.name); + if (iter == rules->prefix_end()) { + return false; + } + return boost::starts_with(key.name, iter->first); } int operate() override; @@ -3466,6 +3484,13 @@ int RGWBucketShardIncrementalSyncCR::operate() continue; } + if (!check_key_handled(key)) { + set_status() << "skipping entry due to policy rules: " << entry->object; + tn->log(20, SSTR("skipping entry due to policy rules: " << entry->object)); + marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp); + continue; + } + set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op; if (entry->op == CLS_RGW_OP_CANCEL) { set_status() << "canceled operation, skipping"; -- 2.39.5