From 82df2faa1dbacec1e105a23fe4907dc28d0ed940 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 19 Nov 2019 20:01:57 -0800 Subject: [PATCH] rgw: sync: user mode sync pipes fetch objects using effective uid Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 29 +++++++++++++++++++++++++++++ src/rgw/rgw_bucket_sync.cc | 16 +++++++++++++++- src/rgw/rgw_bucket_sync.h | 4 +++- src/rgw/rgw_cr_rados.cc | 3 +-- src/rgw/rgw_cr_rados.h | 10 +++++++++- src/rgw/rgw_data_sync.cc | 25 +++++++++++++++++++++++-- src/rgw/rgw_json_enc.cc | 2 ++ src/rgw/rgw_sync_policy.h | 7 ++++++- 8 files changed, 88 insertions(+), 8 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 7c68f405f8b..acfd7974152 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -2864,6 +2864,8 @@ class SyncPolicyContext rgw_sync_policy_info *policy{nullptr}; + std::optional owner; + public: SyncPolicyContext(const string& zonegroup_id, const string& zonegroup_name, @@ -2888,6 +2890,8 @@ public: return ret; } + owner = bucket_info.owner; + if (!bucket_info.sync_policy) { rgw_sync_policy_info new_policy; bucket_info.set_sync_policy(std::move(new_policy)); @@ -2921,6 +2925,9 @@ public: return *policy; } + std::optional& get_owner() { + return owner; + } }; void resolve_zone_id_opt(std::optional& zone_name, std::optional& zone_id) @@ -3203,6 +3210,7 @@ int main(int argc, const char **argv) std::optional opt_prefix_rm; std::optional opt_priority; + std::optional opt_mode; rgw::notify::EventTypeList event_types; @@ -3610,6 +3618,8 @@ int main(int argc, const char **argv) opt_prefix_rm = val; } else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) { opt_priority = atoi(val.c_str()); + } else if (ceph_argparse_witharg(args, i, &val, "--mode", (char*)NULL)) { + opt_mode = val; } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) { // do nothing } else if (strncmp(*i, "-", 1) == 0) { @@ -8283,6 +8293,25 @@ next: if (opt_priority) { pipe->params.priority = *opt_priority; } + if (opt_mode) { + if (*opt_mode == "system") { + pipe->params.mode = rgw_sync_pipe_params::MODE_SYSTEM; + } else if (*opt_mode == "user") { + pipe->params.mode = rgw_sync_pipe_params::MODE_USER; + } else { + cerr << "ERROR: bad mode value: should be one of the following: system, user" << std::endl; + return EINVAL; + } + } + + if (!user_id.empty()) { + pipe->params.user = user_id; + } else if (pipe->params.user.empty()) { + auto owner = sync_policy_ctx.get_owner(); + if (owner) { + pipe->params.user = *owner; + } + } ret = sync_policy_ctx.write_policy(); if (ret < 0) { diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index a76fd43b783..3a57f546a73 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -289,6 +289,7 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi } bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *user, std::optional *acl_translation_owner, std::optional *storage_class, rgw_sync_pipe_params::Mode *mode, @@ -345,6 +346,7 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg bool conflict = false; + std::optional _user; std::optional _acl_translation; std::optional _storage_class; rgw_sync_pipe_params::Mode _mode; @@ -353,13 +355,15 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg for (auto& iter : iters) { auto& rule_params = iter->second->params; if (++i == 0) { + _user = rule_params.user; _acl_translation = rule_params.dest.acl_translation; _storage_class = rule_params.dest.storage_class; _mode = rule_params.mode; continue; } - conflict = !(_acl_translation == rule_params.dest.acl_translation && + conflict = !(_user == rule_params.user && + _acl_translation == rule_params.dest.acl_translation && _storage_class == rule_params.dest.storage_class && _mode == rule_params.mode); if (conflict) { @@ -368,6 +372,7 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg } } + *user = _user; if (_acl_translation) { *acl_translation_owner = _acl_translation->owner; } @@ -668,6 +673,15 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy bucket_info(_bucket_info) { if (_bucket_info.sync_policy) { sync_policy = *_bucket_info.sync_policy; + + for (auto& entry : sync_policy.groups) { + for (auto& pipe : entry.second.pipes) { + if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER && + pipe.params.user.empty()) { + pipe.params.user = _bucket_info.owner; + } + } + } } bucket = _bucket_info.bucket; zone_svc = parent->zone_svc; diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index b6fa246877f..a57c461de85 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -147,6 +147,7 @@ public: void insert(const rgw_sync_bucket_pipe& pipe); bool find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *user, std::optional *acl_translation, std::optional *storage_class, rgw_sync_pipe_params::Mode *mode, @@ -183,6 +184,7 @@ public: } bool find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *user, std::optional *acl_translation, std::optional *storage_class, rgw_sync_pipe_params::Mode *mode, @@ -190,7 +192,7 @@ public: if (!rules) { return false; } - return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info); + return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info); } bool find_obj_params(const rgw_obj_key& key, diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 5ad6bf287ff..93748ef2456 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -584,7 +584,6 @@ int RGWAsyncFetchRemoteObj::_send_request() { RGWObjectCtx obj_ctx(store); - string user_id; char buf[16]; snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id()); map attrs; @@ -595,7 +594,7 @@ int RGWAsyncFetchRemoteObj::_send_request() std::optional bytes_transferred; int r = store->getRados()->fetch_remote_obj(obj_ctx, - rgw_user(user_id), + user_id.value_or(rgw_user()), NULL, /* req_info */ source_zone, dest_obj, diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index f5c8f67b2b8..289ff546486 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -920,6 +920,8 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { rgw::sal::RGWRadosStore *store; rgw_zone_id source_zone; + std::optional user_id; + rgw_bucket src_bucket; std::optional dest_placement_rule; RGWBucketInfo dest_bucket_info; @@ -941,6 +943,7 @@ protected: public: RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store, const rgw_zone_id& _source_zone, + std::optional& _user_id, const rgw_bucket& _src_bucket, std::optional _dest_placement_rule, const RGWBucketInfo& _dest_bucket_info, @@ -953,6 +956,7 @@ public: PerfCounters* counters, const DoutPrefixProvider *dpp) : RGWAsyncRadosRequest(caller, cn), store(_store), source_zone(_source_zone), + user_id(_user_id), src_bucket(_src_bucket), dest_placement_rule(_dest_placement_rule), dest_bucket_info(_dest_bucket_info), @@ -976,6 +980,8 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { rgw::sal::RGWRadosStore *store; rgw_zone_id source_zone; + std::optional user_id; + rgw_bucket src_bucket; std::optional dest_placement_rule; RGWBucketInfo dest_bucket_info; @@ -998,6 +1004,7 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { public: RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store, const rgw_zone_id& _source_zone, + std::optional _user_id, const rgw_bucket& _src_bucket, std::optional _dest_placement_rule, const RGWBucketInfo& _dest_bucket_info, @@ -1011,6 +1018,7 @@ public: : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), async_rados(_async_rados), store(_store), source_zone(_source_zone), + user_id(_user_id), src_bucket(_src_bucket), dest_placement_rule(_dest_placement_rule), dest_bucket_info(_dest_bucket_info), @@ -1036,7 +1044,7 @@ public: int send_request() override { req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, - source_zone, src_bucket, dest_placement_rule, dest_bucket_info, + source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info, key, dest_key, versioned_epoch, copy_if_newer, filter, zones_trace, counters, dpp); async_rados->queue(req); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 7e1b7494af9..101b6199741 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1974,6 +1974,10 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default { rgw_bucket_sync_pipe sync_pipe; + std::optional mtime; + std::optional etag; + std::optional obj_size; + public: RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) { } @@ -2018,6 +2022,7 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, std::optional > new_attrs; if (params.mode == rgw_sync_pipe_params::MODE_USER) { +#if 0 RGWAccessControlPolicy policy; auto iter = obj_attrs.find(RGW_ATTR_ACL); if (iter == obj_attrs.end()) { @@ -2031,7 +2036,7 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl; return abort_err; } - +#endif } @@ -2061,6 +2066,7 @@ class RGWObjFetchCR : public RGWCoroutine { rgw_zone_set *zones_trace; bool need_more_info{false}; + bool check_change{false}; ceph::real_time src_mtime; uint64_t src_size; @@ -2068,6 +2074,7 @@ class RGWObjFetchCR : public RGWCoroutine { map src_attrs; map src_headers; + std::optional param_user; std::optional param_acl_translation; std::optional param_storage_class; rgw_sync_pipe_params::Mode param_mode; @@ -2089,6 +2096,7 @@ public: { if (!sync_pipe.info.handler.find_basic_info_without_tags(key, + ¶m_user, ¶m_acl_translation, ¶m_storage_class, ¶m_mode, @@ -2137,12 +2145,25 @@ public: ¶ms)) { return set_cr_error(-ERR_PRECONDITION_FAILED); } + + param_user = params.user; + if (params.dest.acl_translation) { + param_acl_translation = params.dest.acl_translation->owner; + } + param_storage_class = params.dest.storage_class; + param_mode = params.mode; } yield { auto filter = make_shared(sync_pipe); + std::optional uid; + if (param_mode == rgw_sync_pipe_params::MODE_USER) { + uid = param_user; + } +#warning FIXME: race guard call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, + uid, sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, std::nullopt, versioned_epoch, @@ -2245,7 +2266,7 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck auto filter = make_shared(sync_pipe); - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, + return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, nullopt, sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info, key, dest_key, versioned_epoch, true, diff --git a/src/rgw/rgw_json_enc.cc b/src/rgw/rgw_json_enc.cc index 347925f902d..f0aa4b71de5 100644 --- a/src/rgw/rgw_json_enc.cc +++ b/src/rgw/rgw_json_enc.cc @@ -956,6 +956,7 @@ void rgw_sync_pipe_params::dump(Formatter *f) const s = "user"; } encode_json("mode", s, f); + encode_json("user", user, f); } void rgw_sync_pipe_params::decode_json(JSONObj *obj) @@ -970,6 +971,7 @@ void rgw_sync_pipe_params::decode_json(JSONObj *obj) } else { mode = MODE_USER; } + JSONDecoder::decode_json("user", user, obj); } diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index a4d2fb005a5..210544cc254 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -321,13 +321,15 @@ struct rgw_sync_pipe_params { MODE_USER = 1, } mode{MODE_SYSTEM}; int32_t priority{0}; + rgw_user user; void encode(bufferlist& bl) const { - ENCODE_START(1, 1, bl); + ENCODE_START(2, 1, bl); encode(source, bl); encode(dest, bl); encode(priority, bl); encode((uint8_t)mode, bl); + encode(user, bl); ENCODE_FINISH(bl); } @@ -339,6 +341,9 @@ struct rgw_sync_pipe_params { uint8_t m; decode(m, bl); mode = (Mode)m; + if (struct_v >= 2) { + decode(user, bl); + } DECODE_FINISH(bl); } -- 2.39.5