From: Seena Fallah Date: Sat, 7 Jun 2025 20:13:22 +0000 (+0200) Subject: rgw: introduce RGWCopyObjDPF for RGWCopyObj X-Git-Tag: testing/wip-vshankar-testing-20260219.051425~1^2~4^2~8 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e6a74d83cf8cc2339cfc780c2112c7fcb3b99134;p=ceph-ci.git rgw: introduce RGWCopyObjDPF for RGWCopyObj This will construct a DataProcessorFactory where it will apply filters such as decrypt and decompress when a data is being read and encrypt and compression when the data wants to be written. Co-authored-by: Marcus Watts Signed-off-by: Seena Fallah --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 20ca8caa31b..c2293309ac7 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -5735,6 +5735,148 @@ void RGWDeleteObj::execute(optional_yield y) } } +class RGWCopyObjDPF : public rgw::sal::DataProcessorFactory { + rgw::sal::Driver* driver; + req_state* s; + uint64_t &obj_size; + std::map& crypt_http_responses; + DataProcessorFilter cb; + RGWGetObj_Filter* filter{&cb}; + bool need_decompress{false}; + RGWCompressionInfo cs_info; + boost::optional decompress; + std::unique_ptr decrypt; + std::unique_ptr encrypt; + std::optional compressor; + off_t ofs_x{0}; + off_t end_x = obj_size; + +public: + RGWCopyObjDPF(rgw::sal::Driver* _driver, + req_state* _s, + uint64_t& _obj_size, + std::map& _crypt_http_responses) + : driver(_driver), + s(_s), + obj_size(_obj_size), + crypt_http_responses(_crypt_http_responses) + {} + ~RGWCopyObjDPF() override {} + + int get_encrypt_filter(std::unique_ptr *filter, + rgw::sal::DataProcessor *cb, + rgw::sal::Attrs& attrs) + { + std::unique_ptr block_crypt; + int res = rgw_s3_prepare_encrypt(s, s->yield, attrs, &block_crypt, + crypt_http_responses); + if (res == 0 && block_crypt != nullptr) { + filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield)); + } + return res; + } + + int set_writer(rgw::sal::DataProcessor* writer, + rgw::sal::Attrs& attrs, + const DoutPrefixProvider *dpp, + optional_yield y) override + { + /* RGWGetObj_Filter */ + // decompress + int ret = rgw_compression_info_from_attrset(s->src_object->get_attrs(), need_decompress, cs_info); + if (ret < 0) { + return ret; + } + + bool src_encrypted = s->src_object->get_attrs().count(RGW_ATTR_CRYPT_MODE); + if (need_decompress && !src_encrypted) { + obj_size = cs_info.orig_size; + s->src_object->set_obj_size(obj_size); + static constexpr bool partial_content = false; + decompress.emplace(s->cct, &cs_info, partial_content, filter); + filter = &*decompress; + end_x = obj_size; + } + + // decrypt + if (src_encrypted) { + auto attr_iter = s->src_object->get_attrs().find(RGW_ATTR_MANIFEST); + ret = get_decrypt_filter(&decrypt, filter, s, s->src_object->get_attrs(), + attr_iter != s->src_object->get_attrs().end() ? &attr_iter->second : nullptr, + nullptr); + if (ret < 0) { + return ret; + } + if (decrypt != nullptr) { + filter = decrypt.get(); + } + } + + filter->fixup_range(ofs_x, end_x); + + /* rgw::sal::DataProcessor */ + // encrypt + rgw::sal::DataProcessor* processor = writer; + + ret = get_encrypt_filter(&encrypt, processor, attrs); + if (ret < 0) { + return ret; + } + if (encrypt != nullptr) { + processor = &*encrypt; + } + + // compression + attrs.erase(RGW_ATTR_COMPRESSION); // remove any existing compression info from source object + // a zonegroup feature is required to combine compression and encryption + const RGWZoneGroup& zonegroup = s->penv.site->get_zonegroup(); + const bool compress_encrypted = zonegroup.supports(rgw::zone_features::compress_encrypted); + const auto& compression_type = driver->get_compression_type(s->dest_placement); + if (compression_type != "none" && + (encrypt == nullptr || compress_encrypted)) { + CompressorRef plugin = get_compressor_plugin(s, compression_type); + if (!plugin) { + ldpp_dout(s, 1) << "Cannot load plugin for compression type " + << compression_type << dendl; + } else { + compressor.emplace(s->cct, plugin, processor); + processor = &*compressor; + // always send incompressible hint when rgw is itself doing compression + s->object->set_compressed(); + } + } + + cb.set_processor(processor); + + return 0; + } + + bool need_copy_data() override { + // if source object is encrypted, we need to copy data + if (s->src_object->get_attrs().count(RGW_ATTR_CRYPT_MODE)) { + return true; + } + + // check if it's requested to be encrypted + static const string crypt_attrs[] = { + "x-amz-server-side-encryption-customer-algorithm", // SSE-C + "x-amz-server-side-encryption", // SSE-S3, SSE-KMS + }; + for (const auto& attr : crypt_attrs) { + if (s->info.crypt_attribute_map.find(attr) != + s->info.crypt_attribute_map.end()) { + return true; + } + } + + return false; + } + + RGWGetObj_Filter* get_filter() override { + return filter; + } +}; + bool RGWCopyObj::parse_copy_location(const std::string_view& url_src, string& bucket_name, rgw_obj_key& key,