]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: filter fetch remote obj after reading remote metadata
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 19 Nov 2019 18:59:59 +0000 (10:59 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:39 +0000 (10:20 -0800)
In the sync case: find the appropriate pipe params that match this
remote object (if matches tags / prefix), and adjust fetch if needed --
abort if doesn't match tags. Later will adjust acls as needed (if user
sync and not system).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_putobj_processor.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_sync_policy.cc
src/rgw/rgw_sync_policy.h

index b34337ff4315eb2868f483ed58480df96e8a8f22..5ad6bf287ff35d97e1d4cb266592f42d38bc6f13 100644 (file)
@@ -621,6 +621,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
                        NULL, /* void (*progress_cb)(off_t, void *), */
                        NULL, /* void *progress_data*); */
                        dpp,
+                       filter.get(),
                        &zones_trace,
                        &bytes_transferred);
 
index 5b689e6d493d76753b1668030f4cd0e2a043e638..f5c8f67b2b8ee07a3e01f3ba6053a1f315ffbc2b 100644 (file)
@@ -931,6 +931,7 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
   real_time src_mtime;
 
   bool copy_if_newer;
+  std::shared_ptr<RGWFetchObjFilter> filter;
   rgw_zone_set zones_trace;
   PerfCounters* counters;
   const DoutPrefixProvider *dpp;
@@ -946,7 +947,9 @@ public:
                          const rgw_obj_key& _key,
                          const std::optional<rgw_obj_key>& _dest_key,
                          std::optional<uint64_t> _versioned_epoch,
-                         bool _if_newer, rgw_zone_set *_zones_trace,
+                         bool _if_newer,
+                         std::shared_ptr<RGWFetchObjFilter> _filter,
+                         rgw_zone_set *_zones_trace,
                          PerfCounters* counters, const DoutPrefixProvider *dpp)
     : RGWAsyncRadosRequest(caller, cn), store(_store),
       source_zone(_source_zone),
@@ -956,7 +959,9 @@ public:
       key(_key),
       dest_key(_dest_key),
       versioned_epoch(_versioned_epoch),
-      copy_if_newer(_if_newer), counters(counters),
+      copy_if_newer(_if_newer),
+      filter(_filter),
+      counters(counters),
       dpp(dpp)
   {
     if (_zones_trace) {
@@ -983,6 +988,8 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
 
   bool copy_if_newer;
 
+  std::shared_ptr<RGWFetchObjFilter> filter;
+
   RGWAsyncFetchRemoteObj *req;
   rgw_zone_set *zones_trace;
   PerfCounters* counters;
@@ -997,7 +1004,9 @@ public:
                       const rgw_obj_key& _key,
                       const std::optional<rgw_obj_key>& _dest_key,
                       std::optional<uint64_t> _versioned_epoch,
-                      bool _if_newer, rgw_zone_set *_zones_trace,
+                      bool _if_newer,
+                      std::shared_ptr<RGWFetchObjFilter> _filter,
+                      rgw_zone_set *_zones_trace,
                       PerfCounters* counters, const DoutPrefixProvider *dpp)
     : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
       async_rados(_async_rados), store(_store),
@@ -1008,7 +1017,9 @@ public:
       key(_key),
       dest_key(_dest_key),
       versioned_epoch(_versioned_epoch),
-      copy_if_newer(_if_newer), req(NULL),
+      copy_if_newer(_if_newer),
+      filter(_filter),
+      req(NULL),
       zones_trace(_zones_trace), counters(counters), dpp(dpp) {}
 
 
@@ -1026,7 +1037,7 @@ public:
   int send_request() override {
     req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
                                     source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
-                                     key, dest_key, versioned_epoch, copy_if_newer,
+                                     key, dest_key, versioned_epoch, copy_if_newer, filter,
                                      zones_trace, counters, dpp);
     async_rados->queue(req);
     return 0;
index e0665db88118ecd4f7a350556e78fd26f05c13c0..b1a6aeca3e314fcb3bc273ffa39537258d7a733e 100644 (file)
@@ -1971,13 +1971,78 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
   return 0;
 }
 
+class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
+  rgw_bucket_sync_pipe sync_pipe;
+
+public:
+  RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
+  }
+
+  int filter(CephContext *cct,
+             const rgw_obj_key& source_key,
+             const RGWBucketInfo& dest_bucket_info,
+             std::optional<rgw_placement_rule> dest_placement_rule,
+             const map<string, bufferlist>& obj_attrs,
+             const rgw_placement_rule **prule) override;
+};
+
+int RGWFetchObjFilter_Sync::filter(CephContext *cct,
+                                   const rgw_obj_key& source_key,
+                                   const RGWBucketInfo& dest_bucket_info,
+                                   std::optional<rgw_placement_rule> dest_placement_rule,
+                                   const map<string, bufferlist>& obj_attrs,
+                                   const rgw_placement_rule **prule)
+{
+  rgw_sync_pipe_params params;
+
+  RGWObjTags obj_tags;
+
+  auto iter = obj_attrs.find(RGW_ATTR_TAGS);
+  if (iter != obj_attrs.end()) {
+    try{
+      auto it = iter->second.cbegin();
+      obj_tags.decode(it);
+    } catch (buffer::error &err) {
+      ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+    }
+  }
+
+  if (!sync_pipe.info.handler.find_obj_params(source_key,
+                                              obj_tags.get_tags(),
+                                              &params)) {
+    return -ERR_PRECONDITION_FAILED;
+  }
+
+  if (!dest_placement_rule &&
+      params.dest.storage_class) {
+    dest_rule.storage_class = *params.dest.storage_class;
+    dest_rule.inherit_from(dest_bucket_info.placement_rule);
+    dest_placement_rule = dest_rule;
+    *prule = &dest_rule;
+  }
+
+  return RGWFetchObjFilter_Default::filter(cct,
+                                           source_key,
+                                           dest_bucket_info,
+                                           dest_placement_rule,
+                                           obj_attrs,
+                                           prule);
+}
+
+
 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   auto sync_env = sc->env;
-  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, sync_pipe.info.source_bs.bucket,
+
+  auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+                                 sync_pipe.info.source_bs.bucket,
                                 std::nullopt, sync_pipe.dest_bucket_info,
                                  key, std::nullopt, versioned_epoch,
-                                 true, zones_trace, sync_env->counters, sync_env->dpp);
+                                 true,
+                                 std::static_pointer_cast<RGWFetchObjFilter>(filter),
+                                 zones_trace, sync_env->counters, sync_env->dpp);
 }
 
 RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
@@ -2054,10 +2119,14 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
     }
   }
 
+  auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
+
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
                                  sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
                                  key, dest_key, versioned_epoch,
-                                 true, zones_trace, nullptr, sync_env->dpp);
+                                 true,
+                                 std::static_pointer_cast<RGWFetchObjFilter>(filter),
+                                 zones_trace, nullptr, sync_env->dpp);
 }
 
 RGWCoroutine *RGWArchiveDataSyncModule::remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key,
index 499b6b097761afc404fb798cf0ab9a0d7a0be452..76ea90e3e5ff012623be14b10910fdef7b015e36 100644 (file)
@@ -154,6 +154,9 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
         }
       }
 
+  void set_tail_placement(const rgw_placement_rule& tpr) {
+    tail_placement_rule = tpr;
+  }
   void set_tail_placement(const rgw_placement_rule&& tpr) {
     tail_placement_rule = tpr;
   }
index c68c9addb5b5106643b93eef6b11ad91a5b89f72..b211cf4033b8776771b9396935dd785728c86267 100644 (file)
@@ -3616,6 +3616,28 @@ int RGWRados::stat_remote_obj(RGWObjectCtx& obj_ctx,
   return 0;
 }
 
+int RGWFetchObjFilter_Default::filter(CephContext *cct,
+                                      const rgw_obj_key& source_key,
+                                      const RGWBucketInfo& dest_bucket_info,
+                                      std::optional<rgw_placement_rule> dest_placement_rule,
+                                      const map<string, bufferlist>& obj_attrs,
+                                      const rgw_placement_rule **prule)
+{
+  const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
+  if (!ptail_rule) {
+    auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
+    if (iter != obj_attrs.end()) {
+      dest_rule.storage_class = iter->second.to_str();
+      dest_rule.inherit_from(dest_bucket_info.placement_rule);
+      ptail_rule = &dest_rule;
+    } else {
+      ptail_rule = &dest_bucket_info.placement_rule;
+    }
+  }
+  *prule = ptail_rule;
+  return 0;
+}
+
 int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                const rgw_user& user_id,
                req_info *info,
@@ -3643,6 +3665,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                void (*progress_cb)(off_t, void *),
                void *progress_data,
                const DoutPrefixProvider *dpp,
+               RGWFetchObjFilter *filter,
                rgw_zone_set *zones_trace,
                std::optional<uint64_t>* bytes_transferred)
 {
@@ -3658,8 +3681,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
 
   rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
-  AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, ptail_rule, user_id,
+  AtomicObjectProcessor processor(&aio, this->store, dest_bucket_info, nullptr, user_id,
                                   obj_ctx, dest_obj, olh_epoch, tag, dpp, null_yield);
   RGWRESTConn *conn;
   auto& zone_conn_map = svc.zone->get_zone_conn_map();
@@ -3685,25 +3707,30 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     conn = iter->second;
   }
 
-  string obj_name = dest_obj.bucket.name + "/" + dest_obj.get_oid();
-
   boost::optional<RGWPutObj_Compress> compressor;
   CompressorRef plugin;
 
-  rgw_placement_rule dest_rule;
+  RGWFetchObjFilter_Default source_filter;
+  if (!filter) {
+    filter = &source_filter;
+  }
+  
   RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data,
                     [&](const map<string, bufferlist>& obj_attrs) {
-                      if (!ptail_rule) {
-                        auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
-                        if (iter != obj_attrs.end()) {
-                          dest_rule.storage_class = iter->second.to_str();
-                          dest_rule.inherit_from(dest_bucket_info.placement_rule);
-                          processor.set_tail_placement(std::move(dest_rule));
-                          ptail_rule = &dest_rule;
-                        } else {
-                          ptail_rule = &dest_bucket_info.placement_rule;
-                        }
+                      const rgw_placement_rule *ptail_rule;
+                      int ret = filter->filter(cct,
+                                               src_obj.key,
+                                               dest_bucket_info,
+                                               dest_placement_rule,
+                                               obj_attrs,
+                                               &ptail_rule);
+                      if (ret < 0) {
+                        ldout(cct, 5) << "Aborting fetch: source object filter returned ret=" << ret << dendl;
+                        return ret;
                       }
+
+                      processor.set_tail_placement(*ptail_rule);
+
                       const auto& compression_type = svc.zone->get_zone_params().get_compression_type(*ptail_rule);
                       if (compression_type != "none") {
                         plugin = Compressor::create(cct, compression_type);
@@ -3713,7 +3740,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                         }
                       }
 
-                      int ret = processor.prepare(null_yield);
+                      ret = processor.prepare(null_yield);
                       if (ret < 0) {
                         return ret;
                       }
@@ -3992,7 +4019,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
                dest_placement, src_mtime, mtime, mod_ptr,
                unmod_ptr, high_precision_time,
                if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
-               olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp);
+               olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, dpp,
+               nullptr /* filter */);
   }
 
   map<string, bufferlist> src_attrs;
index e730a2f45aba64d85d3f8c51d974ea7472b49090..e9451ea19096b75964ebe61c0a740c934f0a759d 100644 (file)
@@ -202,6 +202,32 @@ struct RGWObjState {
   }
 };
 
+class RGWFetchObjFilter {
+public:
+  virtual ~RGWFetchObjFilter() {}
+
+  virtual int filter(CephContext *cct,
+                     const rgw_obj_key& source_key,
+                     const RGWBucketInfo& dest_bucket_info,
+                     std::optional<rgw_placement_rule> dest_placement_rule,
+                     const map<string, bufferlist>& obj_attrs,
+                     const rgw_placement_rule **prule) = 0;
+};
+
+class RGWFetchObjFilter_Default : public RGWFetchObjFilter {
+protected:
+  rgw_placement_rule dest_rule;
+public:
+  RGWFetchObjFilter_Default() {}
+
+  int filter(CephContext *cct,
+             const rgw_obj_key& source_key,
+             const RGWBucketInfo& dest_bucket_info,
+             std::optional<rgw_placement_rule> dest_placement_rule,
+             const map<string, bufferlist>& obj_attrs,
+             const rgw_placement_rule **prule) override;
+};
+
 class RGWObjectCtx {
   rgw::sal::RGWRadosStore *store;
   ceph::shared_mutex lock = ceph::make_shared_mutex("RGWObjectCtx");
@@ -1101,6 +1127,7 @@ public:
                        void (*progress_cb)(off_t, void *),
                        void *progress_data,
                        const DoutPrefixProvider *dpp,
+                       RGWFetchObjFilter *filter,
                        rgw_zone_set *zones_trace= nullptr,
                        std::optional<uint64_t>* bytes_transferred = 0);
   /**
index 01a1fb6ccde92dd066c1f34d7b61bd287c5ca380..9047b4b1862b9526f87ac8f171be9f67331687a2 100644 (file)
@@ -121,18 +121,42 @@ bool rgw_sync_pipe_filter::check_tag(const string& s) const
     return true;
   }
 
-  for (auto& t : tags) {
-    if (t == s) {
+  auto iter = tags.find(rgw_sync_pipe_filter_tag(s));
+  return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tag(const string& k, const string& v) const
+{
+  if (tags.empty()) { /* tag filter wasn't defined */
+    return true;
+  }
+
+  auto iter = tags.find(rgw_sync_pipe_filter_tag(k, v));
+  return (iter != tags.end());
+}
+
+bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& _tags) const
+{
+  if (tags.empty()) {
+    return true;
+  }
+
+  for (auto& t : _tags) {
+    if (check_tag(t)) {
       return true;
     }
   }
   return false;
 }
 
-bool rgw_sync_pipe_filter::check_tags(const std::vector<string>& tags) const
+bool rgw_sync_pipe_filter::check_tags(const RGWObjTags::tag_map_t& _tags) const
 {
-  for (auto& t : tags) {
-    if (check_tag(t)) {
+  if (tags.empty()) {
+    return true;
+  }
+
+  for (auto& item : _tags) {
+    if (check_tag(item.first, item.second)) {
       return true;
     }
   }
index 07623583a3bbe506a54b8f9fb46153cebb4ff762..700025486fd4e8050812d9810055a933c2c9190f 100644 (file)
@@ -180,6 +180,14 @@ struct rgw_sync_pipe_filter_tag {
   string key;
   string value;
 
+  rgw_sync_pipe_filter_tag() {}
+  rgw_sync_pipe_filter_tag(const string& s) {
+    from_str(s);
+  }
+  rgw_sync_pipe_filter_tag(const string& _key,
+                           const string& _value) : key(_key),
+                                                   value(_value) {}
+
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
     encode(key, bl);
@@ -231,7 +239,9 @@ struct rgw_sync_pipe_filter {
   bool is_subset_of(const rgw_sync_pipe_filter& f) const;
 
   bool check_tag(const string& s) const;
+  bool check_tag(const string& k, const string& v) const;
   bool check_tags(const std::vector<string>& tags) const;
+  bool check_tags(const RGWObjTags::tag_map_t& tags) const;
 };
 WRITE_CLASS_ENCODER(rgw_sync_pipe_filter)