]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: introduce RGWCopyObjDPF for RGWCopyObj
authorSeena Fallah <seenafallah@gmail.com>
Sat, 7 Jun 2025 20:13:22 +0000 (22:13 +0200)
committerSeena Fallah <seenafallah@gmail.com>
Wed, 19 Nov 2025 19:00:46 +0000 (20:00 +0100)
This will construct a DataProcessorFactory where it will apply
filters such as decrypt and decompress when a data is being read
and encrypt and compression when the data wants to be written.

Co-authored-by: Marcus Watts <mwatts@redhat.com>
Signed-off-by: Seena Fallah <seenafallah@gmail.com>
src/rgw/rgw_op.cc

index 20ca8caa31b62183c05c63091abd6ad655503391..c2293309ac793fd2b82912be73b66638bf89fc76 100644 (file)
@@ -5735,6 +5735,148 @@ void RGWDeleteObj::execute(optional_yield y)
   }
 }
 
+class RGWCopyObjDPF : public rgw::sal::DataProcessorFactory {
+  rgw::sal::Driver* driver;
+  req_state* s;
+  uint64_t &obj_size;
+  std::map<std::string, std::string>& crypt_http_responses;
+  DataProcessorFilter cb;
+  RGWGetObj_Filter* filter{&cb};
+  bool need_decompress{false};
+  RGWCompressionInfo cs_info;
+  boost::optional<RGWGetObj_Decompress> decompress;
+  std::unique_ptr<RGWGetObj_Filter> decrypt;
+  std::unique_ptr<rgw::sal::DataProcessor> encrypt;
+  std::optional<RGWPutObj_Compress> compressor;
+  off_t ofs_x{0};
+  off_t end_x = obj_size;
+
+public:
+  RGWCopyObjDPF(rgw::sal::Driver* _driver,
+                req_state* _s,
+                uint64_t& _obj_size,
+                std::map<std::string, std::string>& _crypt_http_responses)
+    : driver(_driver),
+      s(_s),
+      obj_size(_obj_size),
+      crypt_http_responses(_crypt_http_responses)
+  {}
+  ~RGWCopyObjDPF() override {}
+
+  int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+                         rgw::sal::DataProcessor *cb,
+                         rgw::sal::Attrs& attrs)
+  {
+    std::unique_ptr<BlockCrypt> block_crypt;
+    int res = rgw_s3_prepare_encrypt(s, s->yield, attrs, &block_crypt,
+                                     crypt_http_responses);
+    if (res == 0 && block_crypt != nullptr) {
+      filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
+    }
+    return res;
+  }
+
+  int set_writer(rgw::sal::DataProcessor* writer,
+                 rgw::sal::Attrs& attrs,
+                 const DoutPrefixProvider *dpp,
+                 optional_yield y) override
+  {
+    /* RGWGetObj_Filter */
+    // decompress
+    int ret = rgw_compression_info_from_attrset(s->src_object->get_attrs(), need_decompress, cs_info);
+    if (ret < 0) {
+      return ret;
+    }
+
+    bool src_encrypted = s->src_object->get_attrs().count(RGW_ATTR_CRYPT_MODE);
+    if (need_decompress && !src_encrypted) {
+      obj_size = cs_info.orig_size;
+      s->src_object->set_obj_size(obj_size);
+      static constexpr bool partial_content = false;
+      decompress.emplace(s->cct, &cs_info, partial_content, filter);
+      filter = &*decompress;
+      end_x = obj_size;
+    }
+
+    // decrypt
+    if (src_encrypted) {
+      auto attr_iter = s->src_object->get_attrs().find(RGW_ATTR_MANIFEST);
+      ret = get_decrypt_filter(&decrypt, filter, s, s->src_object->get_attrs(),
+                               attr_iter != s->src_object->get_attrs().end() ? &attr_iter->second : nullptr,
+                               nullptr);
+      if (ret < 0) {
+        return ret;
+      }
+      if (decrypt != nullptr) {
+        filter = decrypt.get();
+      }
+    }
+
+    filter->fixup_range(ofs_x, end_x);
+
+    /* rgw::sal::DataProcessor */
+    // encrypt
+    rgw::sal::DataProcessor* processor = writer;
+
+    ret = get_encrypt_filter(&encrypt, processor, attrs);
+    if (ret < 0) {
+      return ret;
+    }
+    if (encrypt != nullptr) {
+      processor = &*encrypt;
+    }
+
+    // compression
+    attrs.erase(RGW_ATTR_COMPRESSION); // remove any existing compression info from source object
+    // a zonegroup feature is required to combine compression and encryption
+    const RGWZoneGroup& zonegroup = s->penv.site->get_zonegroup();
+    const bool compress_encrypted = zonegroup.supports(rgw::zone_features::compress_encrypted);
+    const auto& compression_type = driver->get_compression_type(s->dest_placement);
+    if (compression_type != "none" &&
+        (encrypt == nullptr || compress_encrypted)) {
+      CompressorRef plugin = get_compressor_plugin(s, compression_type);
+      if (!plugin) {
+        ldpp_dout(s, 1) << "Cannot load plugin for compression type "
+            << compression_type << dendl;
+      } else {
+        compressor.emplace(s->cct, plugin, processor);
+        processor = &*compressor;
+        // always send incompressible hint when rgw is itself doing compression
+        s->object->set_compressed();
+      }
+    }
+
+    cb.set_processor(processor);
+
+    return 0;
+  }
+
+  bool need_copy_data() override {
+    // if source object is encrypted, we need to copy data
+    if (s->src_object->get_attrs().count(RGW_ATTR_CRYPT_MODE)) {
+      return true;
+    }
+
+    // check if it's requested to be encrypted
+    static const string crypt_attrs[] = {
+      "x-amz-server-side-encryption-customer-algorithm", // SSE-C
+      "x-amz-server-side-encryption", // SSE-S3, SSE-KMS
+    };
+    for (const auto& attr : crypt_attrs) {
+      if (s->info.crypt_attribute_map.find(attr) !=
+          s->info.crypt_attribute_map.end()) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  RGWGetObj_Filter* get_filter() override {
+    return filter;
+  }
+};
+
 bool RGWCopyObj::parse_copy_location(const std::string_view& url_src,
                                     string& bucket_name,
                                     rgw_obj_key& key,