std::optional<string> opt_prefix;
std::optional<string> opt_prefix_rm;
+ std::optional<int> opt_priority;
+
rgw::notify::EventTypeList event_types;
SimpleCmd cmd(all_cmds, cmd_aliases);
opt_prefix = val;
} else if (ceph_argparse_witharg(args, i, &val, "--prefix-rm", (char*)NULL)) {
opt_prefix_rm = val;
+ } else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) {
+ opt_priority = atoi(val.c_str());
} else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
// do nothing
} else if (strncmp(*i, "-", 1) == 0) {
pipe->params.filter.set_prefix(opt_prefix, !!opt_prefix_rm);
pipe->params.filter.set_tags(tags_add, tags_rm);
+ if (opt_priority) {
+ pipe->params.priority = *opt_priority;
+ }
ret = sync_policy_ctx.write_policy();
if (ret < 0) {
return true;
}
- bool check_key(const rgw_obj_key& key) {
+ bool check_key_handled(const rgw_obj_key& key) {
if (!rules) {
return false;
}
tn->log(20, "listing bucket for full sync");
if (!prefix_handler.revalidate_marker(&list_marker)) {
+ set_status() << "finished iterating over all available prefixes: last marker=" << list_marker;
+ tn->log(20, SSTR("finished iterating over all available prefixes: last marker=" << list_marker));
break;
}
<< bucket_shard_str{bs} << "/" << entries_iter->key));
entry = &(*entries_iter);
list_marker = entries_iter->key;
- if (!prefix_handler.check_key(entries_iter->key)) {
+ if (!prefix_handler.check_key_handled(entries_iter->key)) {
+ set_status() << "skipping entry due to policy rules: " << entries_iter->key;
+ tn->log(20, SSTR("skipping entry due to policy rules: " << entries_iter->key));
continue;
}
total_entries++;
RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
rgw_bucket_sync_pipe& sync_pipe;
+ RGWBucketSyncFlowManager::pipe_rules_ref rules;
rgw_bucket_shard& bs;
boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
list<rgw_bi_log_entry> list_result;
bool syncstopped{false};
RGWSyncTraceNodeRef tn;
+
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncCtx *_sc,
rgw_bucket_sync_pipe& _sync_pipe,
<< bucket_shard_str{bs};
set_status("init");
marker_tracker.set_tn(tn);
+ rules = sync_pipe.get_rules();
+ }
+
+ bool check_key_handled(const rgw_obj_key& key) {
+ if (!rules) {
+ return false;
+ }
+ auto iter = rules->prefix_search(key.name);
+ if (iter == rules->prefix_end()) {
+ return false;
+ }
+ return boost::starts_with(key.name, iter->first);
}
int operate() override;
continue;
}
+ if (!check_key_handled(key)) {
+ set_status() << "skipping entry due to policy rules: " << entry->object;
+ tn->log(20, SSTR("skipping entry due to policy rules: " << entry->object));
+ marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
+ continue;
+ }
+
set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";