]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: prepare for system and user mode changes
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 20 Nov 2019 01:46:42 +0000 (17:46 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:39 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h

index ec76ef67c94add04cbc9282d953e3474ff91436d..a76fd43b7838e3fafdd63dd6d19cbfd138f24390 100644 (file)
@@ -288,7 +288,95 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi
   }
 }
 
-#warning add support for tags
+bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
+                                                                        std::optional<rgw_user> *acl_translation_owner,
+                                                                        std::optional<string> *storage_class,
+                                                                        rgw_sync_pipe_params::Mode *mode,
+                                                                        bool *need_more_info) const
+{
+  std::optional<string> owner;
+
+  *need_more_info = false;
+
+  if (prefix_refs.empty()) {
+    return false;
+  }
+
+  auto iter = prefix_refs.upper_bound(key.name);
+  if (iter != prefix_refs.begin()) {
+    --iter;
+  }
+  if (iter == prefix_refs.end()) {
+    return false;
+  }
+
+  auto end = prefix_refs.upper_bound(key.name);
+
+  std::vector<decltype(iter)> iters;
+
+  std::optional<int> priority;
+
+  for (; iter != end; ++iter) {
+    auto& prefix = iter->first;
+    if (!boost::starts_with(key.name, prefix)) {
+      continue;
+    }
+
+    auto& rule_params = iter->second->params;
+    auto& filter = rule_params.source.filter;
+
+    if (rule_params.priority > priority) {
+      priority = rule_params.priority;
+
+      if (!filter.has_tags()) {
+        iters.clear();
+      }
+      iters.push_back(iter);
+
+      *need_more_info = filter.has_tags(); /* if highest priority filter has tags, then
+                                              we can't be sure if it would be used.
+                                              We need to first read the info from the source object */
+    }
+  }
+
+  if (iters.empty()) {
+    return false;
+  }
+
+  bool conflict = false;
+
+  std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
+  std::optional<string> _storage_class;
+  rgw_sync_pipe_params::Mode _mode;
+
+  int i = 0;
+  for (auto& iter : iters) {
+    auto& rule_params = iter->second->params;
+    if (++i == 0) {
+      _acl_translation = rule_params.dest.acl_translation;
+      _storage_class = rule_params.dest.storage_class;
+      _mode = rule_params.mode;
+      continue;
+    }
+
+    conflict = !(_acl_translation == rule_params.dest.acl_translation &&
+                 _storage_class == rule_params.dest.storage_class &&
+                 _mode == rule_params.mode);
+    if (conflict) {
+      *need_more_info = true;
+      return false;
+    }
+  }
+
+  if (_acl_translation) {
+    *acl_translation_owner = _acl_translation->owner;
+  }
+  *storage_class = _storage_class;
+  *mode = _mode;
+
+  return true;
+}
+
 bool RGWBucketSyncFlowManager::pipe_rules::find_obj_params(const rgw_obj_key& key,
                                                            const RGWObjTags::tag_map_t& tags,
                                                            rgw_sync_pipe_params *params) const
index 63da7836a875ab6301eb59595213e297b5238c23..b6fa246877fae72a309f50c3c97b40f69aab7db4 100644 (file)
@@ -146,6 +146,11 @@ public:
 
     void insert(const rgw_sync_bucket_pipe& pipe);
 
+    bool find_basic_info_without_tags(const rgw_obj_key& key,
+                                      std::optional<rgw_user> *acl_translation,
+                                      std::optional<string> *storage_class,
+                                      rgw_sync_pipe_params::Mode *mode,
+                                      bool *need_more_info) const;
     bool find_obj_params(const rgw_obj_key& key, 
                          const RGWObjTags::tag_map_t& tags,
                          rgw_sync_pipe_params *params) const;
@@ -177,6 +182,17 @@ public:
       return source.specific() && dest.specific();
     }
     
+    bool find_basic_info_without_tags(const rgw_obj_key& key,
+                                      std::optional<rgw_user> *acl_translation,
+                                      std::optional<string> *storage_class,
+                                      rgw_sync_pipe_params::Mode *mode,
+                                      bool *need_more_info) const {
+      if (!rules) {
+        return false;
+      }
+      return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info);
+    }
+
     bool find_obj_params(const rgw_obj_key& key,
                          const RGWObjTags::tag_map_t& tags,
                          rgw_sync_pipe_params *params) const {
index 93eb73265172ed672eb4af75818792e16660a832..7e1b7494af98d2823a8467e141a86d9cae57fa3d 100644 (file)
@@ -1993,13 +1993,15 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
                                    const map<string, bufferlist>& obj_attrs,
                                    const rgw_placement_rule **prule)
 {
+  int abort_err = -ERR_PRECONDITION_FAILED;
+
   rgw_sync_pipe_params params;
 
   RGWObjTags obj_tags;
 
   auto iter = obj_attrs.find(RGW_ATTR_TAGS);
   if (iter != obj_attrs.end()) {
-    try{
+    try {
       auto it = iter->second.cbegin();
       obj_tags.decode(it);
     } catch (buffer::error &err) {
@@ -2010,7 +2012,27 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
   if (!sync_pipe.info.handler.find_obj_params(source_key,
                                               obj_tags.get_tags(),
                                               &params)) {
-    return -ERR_PRECONDITION_FAILED;
+    return abort_err;
+  }
+
+  std::optional<std::map<string, bufferlist> > new_attrs;
+
+  if (params.mode == rgw_sync_pipe_params::MODE_USER) {
+    RGWAccessControlPolicy policy;
+    auto iter = obj_attrs.find(RGW_ATTR_ACL);
+    if (iter == obj_attrs.end()) {
+      ldout(cct, 0) << "ERROR: " << __func__ << ": No policy header to object: aborting sync" << dendl;
+      return abort_err;
+    }
+    try {
+      auto it = iter->second.cbegin();
+      decode(policy, it);
+    } catch (buffer::error &err) {
+      ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl;
+      return abort_err;
+    }
+
+    
   }
 
   if (!dest_placement_rule &&
@@ -2030,19 +2052,121 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
 }
 
 
+class RGWObjFetchCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  rgw_bucket_sync_pipe& sync_pipe;
+  rgw_obj_key& key;
+  std::optional<uint64_t> versioned_epoch;
+  rgw_zone_set *zones_trace;
+
+  bool need_more_info{false};
+
+  ceph::real_time src_mtime;
+  uint64_t src_size;
+  string src_etag;
+  map<string, bufferlist> src_attrs;
+  map<string, string> src_headers;
+
+  std::optional<rgw_user> param_acl_translation;
+  std::optional<string> param_storage_class;
+  rgw_sync_pipe_params::Mode param_mode;
+public:
+  RGWObjFetchCR(RGWDataSyncCtx *_sc,
+                rgw_bucket_sync_pipe& _sync_pipe,
+                rgw_obj_key& _key,
+                std::optional<uint64_t> _versioned_epoch,
+                rgw_zone_set *_zones_trace) : RGWCoroutine(_sc->cct),
+                                              sc(_sc), sync_env(_sc->env),
+                                              sync_pipe(_sync_pipe),
+                                              key(_key),
+                                              versioned_epoch(_versioned_epoch),
+                                              zones_trace(_zones_trace) {}
+
+
+  int operate() override {
+    reenter(this) {
+
+      {
+        if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+                                                                 &param_acl_translation,
+                                                                 &param_storage_class,
+                                                                 &param_mode,
+                                                                 &need_more_info)) {
+          if (!need_more_info) {
+            return set_cr_error(-ERR_PRECONDITION_FAILED);
+          }
+        }
+      }
+
+      if (need_more_info) {
+        ldout(cct, 20) << "Could not determine exact policy rule for obj=" << key << ", will read source object attributes" << dendl;
+        /*
+         * we need to fetch info about source object, so that we can determine
+         * the correct policy configuration. This can happen if there are multiple
+         * policy rules, and some depend on the object tagging */
+        yield call(new RGWStatRemoteObjCR(sync_env->async_rados,
+                                          sync_env->store,
+                                          sc->source_zone,
+                                          sync_pipe.info.source_bs.bucket,
+                                          key,
+                                          &src_mtime,
+                                          &src_size,
+                                          &src_etag,
+                                          &src_attrs,
+                                          &src_headers));
+        if (retcode < 0) {
+          return set_cr_error(retcode);
+        }
+    
+        RGWObjTags obj_tags;
+
+        auto iter = src_attrs.find(RGW_ATTR_TAGS);
+        if (iter != src_attrs.end()) {
+          try {
+            auto it = iter->second.cbegin();
+            obj_tags.decode(it);
+          } catch (buffer::error &err) {
+            ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+          }
+        }
+
+        rgw_sync_pipe_params params;
+        if (!sync_pipe.info.handler.find_obj_params(key,
+                                                    obj_tags.get_tags(),
+                                                    &params)) {
+          return set_cr_error(-ERR_PRECONDITION_FAILED);
+        }
+      }
+
+      yield {
+        auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
+        call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+                                     sync_pipe.info.source_bs.bucket,
+                                     std::nullopt, sync_pipe.dest_bucket_info,
+                                     key, std::nullopt, versioned_epoch,
+                                     true,
+                                     std::static_pointer_cast<RGWFetchObjFilter>(filter),
+                                     zones_trace, sync_env->counters, sync_env->dpp));
+      }
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      return set_cr_done();
+    }
+    return 0;
+  }
+};
+
 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   auto sync_env = sc->env;
 
   auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
 
-  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
-                                 sync_pipe.info.source_bs.bucket,
-                                std::nullopt, sync_pipe.dest_bucket_info,
-                                 key, std::nullopt, versioned_epoch,
-                                 true,
-                                 std::static_pointer_cast<RGWFetchObjFilter>(filter),
-                                 zones_trace, sync_env->counters, sync_env->dpp);
+  return new RGWObjFetchCR(sc, sync_pipe, key, versioned_epoch, zones_trace);
 }
 
 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
index 9047b4b1862b9526f87ac8f171be9f67331687a2..1e700299f350585e667ccdc1627126c65086950f 100644 (file)
@@ -135,6 +135,11 @@ bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const
   return (iter != tags.end());
 }
 
+bool rgw_sync_pipe_filter::has_tags() const
+{
+  return !tags.empty();
+}
+
 bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const
 {
   if (tags.empty()) {
index 700025486fd4e8050812d9810055a933c2c9190f..a4d2fb005a59fd3be06a1eec40611ffc1db071a8 100644 (file)
@@ -238,6 +238,7 @@ struct rgw_sync_pipe_filter {
 
   bool is_subset_of(const rgw_sync_pipe_filter& f) const;
 
+  bool has_tags() const;
   bool check_tag(const string& s) const;
   bool check_tag(const string& k, const string& v) const;
   bool check_tags(const std::vector<string>& tags) const;
@@ -262,6 +263,10 @@ struct rgw_sync_pipe_acl_translation {
 
   void dump(ceph::Formatter *f) const;
   void decode_json(JSONObj *obj);
+
+  bool operator==(const rgw_sync_pipe_acl_translation& aclt) const {
+    return (owner == aclt.owner);
+  }
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_acl_translation)