From 6d070766f159545aedae1c0e82d12f67da39a013 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 19 Nov 2019 17:46:42 -0800 Subject: [PATCH] rgw: prepare for system and user mode changes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_bucket_sync.cc | 90 ++++++++++++++++++++++- src/rgw/rgw_bucket_sync.h | 16 +++++ src/rgw/rgw_data_sync.cc | 142 ++++++++++++++++++++++++++++++++++--- src/rgw/rgw_sync_policy.cc | 5 ++ src/rgw/rgw_sync_policy.h | 5 ++ 5 files changed, 248 insertions(+), 10 deletions(-) diff --git a/src/rgw/rgw_bucket_sync.cc b/src/rgw/rgw_bucket_sync.cc index ec76ef67c94ad..a76fd43b7838e 100644 --- a/src/rgw/rgw_bucket_sync.cc +++ b/src/rgw/rgw_bucket_sync.cc @@ -288,7 +288,95 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi } } -#warning add support for tags +bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *acl_translation_owner, + std::optional *storage_class, + rgw_sync_pipe_params::Mode *mode, + bool *need_more_info) const +{ + std::optional owner; + + *need_more_info = false; + + if (prefix_refs.empty()) { + return false; + } + + auto iter = prefix_refs.upper_bound(key.name); + if (iter != prefix_refs.begin()) { + --iter; + } + if (iter == prefix_refs.end()) { + return false; + } + + auto end = prefix_refs.upper_bound(key.name); + + std::vector iters; + + std::optional priority; + + for (; iter != end; ++iter) { + auto& prefix = iter->first; + if (!boost::starts_with(key.name, prefix)) { + continue; + } + + auto& rule_params = iter->second->params; + auto& filter = rule_params.source.filter; + + if (rule_params.priority > priority) { + priority = rule_params.priority; + + if (!filter.has_tags()) { + iters.clear(); + } + iters.push_back(iter); + + *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then + we can't be sure if it would be used. + We need to first read the info from the source object */ + } + } + + if (iters.empty()) { + return false; + } + + bool conflict = false; + + std::optional _acl_translation; + std::optional _storage_class; + rgw_sync_pipe_params::Mode _mode; + + int i = 0; + for (auto& iter : iters) { + auto& rule_params = iter->second->params; + if (++i == 0) { + _acl_translation = rule_params.dest.acl_translation; + _storage_class = rule_params.dest.storage_class; + _mode = rule_params.mode; + continue; + } + + conflict = !(_acl_translation == rule_params.dest.acl_translation && + _storage_class == rule_params.dest.storage_class && + _mode == rule_params.mode); + if (conflict) { + *need_more_info = true; + return false; + } + } + + if (_acl_translation) { + *acl_translation_owner = _acl_translation->owner; + } + *storage_class = _storage_class; + *mode = _mode; + + return true; +} + bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key, const RGWObjTags::tag_map_t& tags, rgw_sync_pipe_params *params) const diff --git a/src/rgw/rgw_bucket_sync.h b/src/rgw/rgw_bucket_sync.h index 63da7836a875a..b6fa246877fae 100644 --- a/src/rgw/rgw_bucket_sync.h +++ b/src/rgw/rgw_bucket_sync.h @@ -146,6 +146,11 @@ public: void insert(const rgw_sync_bucket_pipe& pipe); + bool find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *acl_translation, + std::optional *storage_class, + rgw_sync_pipe_params::Mode *mode, + bool *need_more_info) const; bool find_obj_params(const rgw_obj_key& key, const RGWObjTags::tag_map_t& tags, rgw_sync_pipe_params *params) const; @@ -177,6 +182,17 @@ public: return source.specific() && dest.specific(); } + bool find_basic_info_without_tags(const rgw_obj_key& key, + std::optional *acl_translation, + std::optional *storage_class, + rgw_sync_pipe_params::Mode *mode, + bool *need_more_info) const { + if (!rules) { + return false; + } + return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info); + } + bool find_obj_params(const rgw_obj_key& key, const RGWObjTags::tag_map_t& tags, rgw_sync_pipe_params *params) const { diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 93eb73265172e..7e1b7494af98d 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1993,13 +1993,15 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, const map& obj_attrs, const rgw_placement_rule **prule) { + int abort_err = -ERR_PRECONDITION_FAILED; + rgw_sync_pipe_params params; RGWObjTags obj_tags; auto iter = obj_attrs.find(RGW_ATTR_TAGS); if (iter != obj_attrs.end()) { - try{ + try { auto it = iter->second.cbegin(); obj_tags.decode(it); } catch (buffer::error &err) { @@ -2010,7 +2012,27 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, if (!sync_pipe.info.handler.find_obj_params(source_key, obj_tags.get_tags(), ¶ms)) { - return -ERR_PRECONDITION_FAILED; + return abort_err; + } + + std::optional > new_attrs; + + if (params.mode == rgw_sync_pipe_params::MODE_USER) { + RGWAccessControlPolicy policy; + auto iter = obj_attrs.find(RGW_ATTR_ACL); + if (iter == obj_attrs.end()) { + ldout(cct, 0) << "ERROR: " << __func__ << ": No policy header to object: aborting sync" << dendl; + return abort_err; + } + try { + auto it = iter->second.cbegin(); + decode(policy, it); + } catch (buffer::error &err) { + ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl; + return abort_err; + } + + } if (!dest_placement_rule && @@ -2030,19 +2052,121 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct, } +class RGWObjFetchCR : public RGWCoroutine { + RGWDataSyncCtx *sc; + RGWDataSyncEnv *sync_env; + rgw_bucket_sync_pipe& sync_pipe; + rgw_obj_key& key; + std::optional versioned_epoch; + rgw_zone_set *zones_trace; + + bool need_more_info{false}; + + ceph::real_time src_mtime; + uint64_t src_size; + string src_etag; + map src_attrs; + map src_headers; + + std::optional param_acl_translation; + std::optional param_storage_class; + rgw_sync_pipe_params::Mode param_mode; +public: + RGWObjFetchCR(RGWDataSyncCtx *_sc, + rgw_bucket_sync_pipe& _sync_pipe, + rgw_obj_key& _key, + std::optional _versioned_epoch, + rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct), + sc(_sc), sync_env(_sc->env), + sync_pipe(_sync_pipe), + key(_key), + versioned_epoch(_versioned_epoch), + zones_trace(_zones_trace) {} + + + int operate() override { + reenter(this) { + + { + if (!sync_pipe.info.handler.find_basic_info_without_tags(key, + ¶m_acl_translation, + ¶m_storage_class, + ¶m_mode, + &need_more_info)) { + if (!need_more_info) { + return set_cr_error(-ERR_PRECONDITION_FAILED); + } + } + } + + if (need_more_info) { + ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl; + /* + * we need to fetch info about source object, so that we can determine + * the correct policy configuration. This can happen if there are multiple + * policy rules, and some depend on the object tagging */ + yield call(new RGWStatRemoteObjCR(sync_env->async_rados, + sync_env->store, + sc->source_zone, + sync_pipe.info.source_bs.bucket, + key, + &src_mtime, + &src_size, + &src_etag, + &src_attrs, + &src_headers)); + if (retcode < 0) { + return set_cr_error(retcode); + } + + RGWObjTags obj_tags; + + auto iter = src_attrs.find(RGW_ATTR_TAGS); + if (iter != src_attrs.end()) { + try { + auto it = iter->second.cbegin(); + obj_tags.decode(it); + } catch (buffer::error &err) { + ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl; + } + } + + rgw_sync_pipe_params params; + if (!sync_pipe.info.handler.find_obj_params(key, + obj_tags.get_tags(), + ¶ms)) { + return set_cr_error(-ERR_PRECONDITION_FAILED); + } + } + + yield { + auto filter = make_shared(sync_pipe); + + call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, + sync_pipe.info.source_bs.bucket, + std::nullopt, sync_pipe.dest_bucket_info, + key, std::nullopt, versioned_epoch, + true, + std::static_pointer_cast(filter), + zones_trace, sync_env->counters, sync_env->dpp)); + } + if (retcode < 0) { + return set_cr_error(retcode); + } + + return set_cr_done(); + } + return 0; + } +}; + RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional versioned_epoch, rgw_zone_set *zones_trace) { auto sync_env = sc->env; auto filter = make_shared(sync_pipe); - return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, - sync_pipe.info.source_bs.bucket, - std::nullopt, sync_pipe.dest_bucket_info, - key, std::nullopt, versioned_epoch, - true, - std::static_pointer_cast(filter), - zones_trace, sync_env->counters, sync_env->dpp); + return new RGWObjFetchCR(sc, sync_pipe, key, versioned_epoch, zones_trace); } RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, diff --git a/src/rgw/rgw_sync_policy.cc b/src/rgw/rgw_sync_policy.cc index 9047b4b1862b9..1e700299f3505 100644 --- a/src/rgw/rgw_sync_policy.cc +++ b/src/rgw/rgw_sync_policy.cc @@ -135,6 +135,11 @@ bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const return (iter != tags.end()); } +bool rgw_sync_pipe_filter::has_tags() const +{ + return !tags.empty(); +} + bool rgw_sync_pipe_filter::check_tags(const std::vector& _tags) const { if (tags.empty()) { diff --git a/src/rgw/rgw_sync_policy.h b/src/rgw/rgw_sync_policy.h index 700025486fd4e..a4d2fb005a59f 100644 --- a/src/rgw/rgw_sync_policy.h +++ b/src/rgw/rgw_sync_policy.h @@ -238,6 +238,7 @@ struct rgw_sync_pipe_filter { bool is_subset_of(const rgw_sync_pipe_filter& f) const; + bool has_tags() const; bool check_tag(const string& s) const; bool check_tag(const string& k, const string& v) const; bool check_tags(const std::vector& tags) const; @@ -262,6 +263,10 @@ struct rgw_sync_pipe_acl_translation { void dump(ceph::Formatter *f) const; void decode_json(JSONObj *obj); + + bool operator==(const rgw_sync_pipe_acl_translation& aclt) const { + return (owner == aclt.owner); + } }; WRITE_CLASS_ENCODER(rgw_sync_pipe_acl_translation) -- 2.39.5