}
}
-#warning add support for tags
+bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *acl_translation_owner,
+ std::optional<string> *storage_class,
+ rgw_sync_pipe_params::Mode *mode,
+ bool *need_more_info) const
+{
+ std::optional<string> owner;
+
+ *need_more_info = false;
+
+ if (prefix_refs.empty()) {
+ return false;
+ }
+
+ auto iter = prefix_refs.upper_bound(key.name);
+ if (iter != prefix_refs.begin()) {
+ --iter;
+ }
+ if (iter == prefix_refs.end()) {
+ return false;
+ }
+
+ auto end = prefix_refs.upper_bound(key.name);
+
+ std::vector<decltype(iter)> iters;
+
+ std::optional<int> priority;
+
+ for (; iter != end; ++iter) {
+ auto& prefix = iter->first;
+ if (!boost::starts_with(key.name, prefix)) {
+ continue;
+ }
+
+ auto& rule_params = iter->second->params;
+ auto& filter = rule_params.source.filter;
+
+ if (rule_params.priority > priority) {
+ priority = rule_params.priority;
+
+ if (!filter.has_tags()) {
+ iters.clear();
+ }
+ iters.push_back(iter);
+
+ *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then
+ we can't be sure if it would be used.
+ We need to first read the info from the source object */
+ }
+ }
+
+ if (iters.empty()) {
+ return false;
+ }
+
+ bool conflict = false;
+
+ std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
+ std::optional<string> _storage_class;
+ rgw_sync_pipe_params::Mode _mode;
+
+ int i = 0;
+ for (auto& iter : iters) {
+ auto& rule_params = iter->second->params;
+ if (++i == 0) {
+ _acl_translation = rule_params.dest.acl_translation;
+ _storage_class = rule_params.dest.storage_class;
+ _mode = rule_params.mode;
+ continue;
+ }
+
+ conflict = !(_acl_translation == rule_params.dest.acl_translation &&
+ _storage_class == rule_params.dest.storage_class &&
+ _mode == rule_params.mode);
+ if (conflict) {
+ *need_more_info = true;
+ return false;
+ }
+ }
+
+ if (_acl_translation) {
+ *acl_translation_owner = _acl_translation->owner;
+ }
+ *storage_class = _storage_class;
+ *mode = _mode;
+
+ return true;
+}
+
bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
const RGWObjTags::tag_map_t& tags,
rgw_sync_pipe_params *params) const
void insert(const rgw_sync_bucket_pipe& pipe);
+ bool find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *acl_translation,
+ std::optional<string> *storage_class,
+ rgw_sync_pipe_params::Mode *mode,
+ bool *need_more_info) const;
bool find_obj_params(const rgw_obj_key& key,
const RGWObjTags::tag_map_t& tags,
rgw_sync_pipe_params *params) const;
return source.specific() && dest.specific();
}
+ bool find_basic_info_without_tags(const rgw_obj_key& key,
+ std::optional<rgw_user> *acl_translation,
+ std::optional<string> *storage_class,
+ rgw_sync_pipe_params::Mode *mode,
+ bool *need_more_info) const {
+ if (!rules) {
+ return false;
+ }
+ return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info);
+ }
+
bool find_obj_params(const rgw_obj_key& key,
const RGWObjTags::tag_map_t& tags,
rgw_sync_pipe_params *params) const {
const map<string, bufferlist>& obj_attrs,
const rgw_placement_rule **prule)
{
+ int abort_err = -ERR_PRECONDITION_FAILED;
+
rgw_sync_pipe_params params;
RGWObjTags obj_tags;
auto iter = obj_attrs.find(RGW_ATTR_TAGS);
if (iter != obj_attrs.end()) {
- try{
+ try {
auto it = iter->second.cbegin();
obj_tags.decode(it);
} catch (buffer::error &err) {
if (!sync_pipe.info.handler.find_obj_params(source_key,
obj_tags.get_tags(),
¶ms)) {
- return -ERR_PRECONDITION_FAILED;
+ return abort_err;
+ }
+
+ std::optional<std::map<string, bufferlist> > new_attrs;
+
+ if (params.mode == rgw_sync_pipe_params::MODE_USER) {
+ RGWAccessControlPolicy policy;
+ auto iter = obj_attrs.find(RGW_ATTR_ACL);
+ if (iter == obj_attrs.end()) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": No policy header to object: aborting sync" << dendl;
+ return abort_err;
+ }
+ try {
+ auto it = iter->second.cbegin();
+ decode(policy, it);
+ } catch (buffer::error &err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl;
+ return abort_err;
+ }
+
+
}
if (!dest_placement_rule &&
}
+class RGWObjFetchCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ rgw_bucket_sync_pipe& sync_pipe;
+ rgw_obj_key& key;
+ std::optional<uint64_t> versioned_epoch;
+ rgw_zone_set *zones_trace;
+
+ bool need_more_info{false};
+
+ ceph::real_time src_mtime;
+ uint64_t src_size;
+ string src_etag;
+ map<string, bufferlist> src_attrs;
+ map<string, string> src_headers;
+
+ std::optional<rgw_user> param_acl_translation;
+ std::optional<string> param_storage_class;
+ rgw_sync_pipe_params::Mode param_mode;
+public:
+ RGWObjFetchCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe,
+ rgw_obj_key& _key,
+ std::optional<uint64_t> _versioned_epoch,
+ rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env),
+ sync_pipe(_sync_pipe),
+ key(_key),
+ versioned_epoch(_versioned_epoch),
+ zones_trace(_zones_trace) {}
+
+
+ int operate() override {
+ reenter(this) {
+
+ {
+ if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+ ¶m_acl_translation,
+ ¶m_storage_class,
+ ¶m_mode,
+ &need_more_info)) {
+ if (!need_more_info) {
+ return set_cr_error(-ERR_PRECONDITION_FAILED);
+ }
+ }
+ }
+
+ if (need_more_info) {
+ ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
+ /*
+ * we need to fetch info about source object, so that we can determine
+ * the correct policy configuration. This can happen if there are multiple
+ * policy rules, and some depend on the object tagging */
+ yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
+ sync_env->store,
+ sc->source_zone,
+ sync_pipe.info.source_bs.bucket,
+ key,
+ &src_mtime,
+ &src_size,
+ &src_etag,
+ &src_attrs,
+ &src_headers));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ RGWObjTags obj_tags;
+
+ auto iter = src_attrs.find(RGW_ATTR_TAGS);
+ if (iter != src_attrs.end()) {
+ try {
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error &err) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ }
+ }
+
+ rgw_sync_pipe_params params;
+ if (!sync_pipe.info.handler.find_obj_params(key,
+ obj_tags.get_tags(),
+ ¶ms)) {
+ return set_cr_error(-ERR_PRECONDITION_FAILED);
+ }
+ }
+
+ yield {
+ auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
+ call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+ sync_pipe.info.source_bs.bucket,
+ std::nullopt, sync_pipe.dest_bucket_info,
+ key, std::nullopt, versioned_epoch,
+ true,
+ std::static_pointer_cast<RGWFetchObjFilter>(filter),
+ zones_trace, sync_env->counters, sync_env->dpp));
+ }
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ return set_cr_done();
+ }
+ return 0;
+ }
+};
+
RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
{
auto sync_env = sc->env;
auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
- return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
- sync_pipe.info.source_bs.bucket,
- std::nullopt, sync_pipe.dest_bucket_info,
- key, std::nullopt, versioned_epoch,
- true,
- std::static_pointer_cast<RGWFetchObjFilter>(filter),
- zones_trace, sync_env->counters, sync_env->dpp);
+ return new RGWObjFetchCR(sc, sync_pipe, key, versioned_epoch, zones_trace);
}
RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,