]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
copy object encryption fixes
authorMarcus Watts <mwatts@redhat.com>
Fri, 14 Apr 2023 09:19:59 +0000 (05:19 -0400)
committerThomas Serlin <tserlin@redhat.com>
Mon, 22 Sep 2025 19:18:18 +0000 (15:18 -0400)
This contains code to allow copyobject to copy encrypted objects.

It includes additional data paths to communicate data from the
rest layer down to the sal layer to handle decrypting
objects.  The data paths include logic to use filter chains
from get and put that process encryption and compression.
There are several hacks to deal with quirks of the filter chains.
The "get" path has to propgate flushes around the chain,
because a flush isn't guaranteed to propagate through it.
Also the "get" and "put" chains have conflicting uses of the
buffer list logic, so the buffer list has to be copied so that
they don't step on each other's toes.

Fixes: https://tracker.ceph.com/issues/23264
Signed-off-by: Marcus Watts <mwatts@redhat.com>
(cherry picked from commit bcaaf55f4182da0a980c87c1dbd7e1d3c868626c)

Resolves: rhbz#2300284

20 files changed:
src/rgw/driver/daos/rgw_sal_daos.cc
src/rgw/driver/daos/rgw_sal_daos.h
src/rgw/driver/motr/rgw_sal_motr.cc
src/rgw/driver/motr/rgw_sal_motr.h
src/rgw/driver/posix/rgw_sal_posix.cc
src/rgw/driver/posix/rgw_sal_posix.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_sal_rados.cc
src/rgw/driver/rados/rgw_sal_rados.h
src/rgw/rgw_crypt.cc
src/rgw/rgw_crypt.h
src/rgw/rgw_op.cc
src/rgw/rgw_rest_s3.cc
src/rgw/rgw_rest_s3.h
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_filter.cc
src/rgw/rgw_sal_filter.h

index c90a8770514ba5de879ed47fd7fbf25209afc4c8..4e21472cfa99dcad587ff6321e7bef4213ce0b23 100644 (file)
@@ -1229,7 +1229,8 @@ int DaosObject::copy_object(
     RGWObjCategory category, uint64_t olh_epoch,
     boost::optional<ceph::real_time> delete_at, std::string* version_id,
     std::string* tag, std::string* etag, void (*progress_cb)(off_t, void*),
-    void* progress_data, const DoutPrefixProvider* dpp, optional_yield y) {
+    void* progress_data, rgw::sal::ObjectFilter *read_filter,
+    const DoutPrefixProvider* dpp, optional_yield y) {
   return DAOS_NOT_IMPLEMENTED_LOG(dpp);
 }
 
index 65ecbdcbb28c8fc670ec86be97c65583fbc8d951..66c1bd3dce2fef3388a86cd0dfa5573184812348 100644 (file)
@@ -612,6 +612,7 @@ class DaosObject : public StoreObject {
       RGWObjCategory category, uint64_t olh_epoch,
       boost::optional<ceph::real_time> delete_at, std::string* version_id,
       std::string* tag, std::string* etag, void (*progress_cb)(off_t, void*),
+      rgw::sal::ObjectFilter *read_filter,
       void* progress_data, const DoutPrefixProvider* dpp,
       optional_yield y) override;
   virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
index 3cb654fe41bf4df878f74d5e67b8559e0a7a843a..f6d9279dac145ebf11f3447d5c307d4573786fa7 100644 (file)
@@ -1543,6 +1543,7 @@ int MotrObject::copy_object(const ACLOwner& owner,
     std::string* etag,
     void (*progress_cb)(off_t, void *),
     void* progress_data,
+    rgw::sal::ObjectFilter *read_filter,
     const DoutPrefixProvider* dpp,
     optional_yield y)
 {
index 709b77c34a565642432791bd3bbb04b61d9ac0b9..2dbff3cae32b844582576d57cce765dd8a36e046 100644 (file)
@@ -677,6 +677,7 @@ class MotrObject : public StoreObject {
         boost::optional<ceph::real_time> delete_at,
         std::string* version_id, std::string* tag, std::string* etag,
         void (*progress_cb)(off_t, void *), void* progress_data,
+        rgw::sal::ObjectFilter *read_filter,
         const DoutPrefixProvider* dpp, optional_yield y) override;
     virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
     virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
index 798a18bfb9e7e2a67dcc7054882ff46c93a397bf..49c5a08d2d35e382e9ed7bef2f070fcbdcb301c5 100644 (file)
@@ -2798,6 +2798,7 @@ int POSIXObject::copy_object(const ACLOwner& owner,
                               std::string* etag,
                               void (*progress_cb)(off_t, void *),
                               void* progress_data,
+                              rgw::sal::ObjectFilter *read_filter,
                               const DoutPrefixProvider* dpp,
                               optional_yield y)
 {
index b31acb3b860bd32d67137fbbe95d20198abf1544..32fe622ebc0892f74448224ef4385deeed65a2db 100644 (file)
@@ -654,6 +654,7 @@ public:
                boost::optional<ceph::real_time> delete_at,
                std::string* version_id, std::string* tag, std::string* etag,
                void (*progress_cb)(off_t, void *), void* progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider* dpp, optional_yield y) override;
   virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
   virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
index 552f2bca7469e4541be1a69ba3362bc1afb50cd5..adeb7e1da776496d8f6c342e821745a39b03c749 100644 (file)
@@ -3079,6 +3079,7 @@ int RGWRados::swift_versioning_copy(RGWObjectCtx& obj_ctx,
                NULL, /* string *petag */
                NULL, /* void (*progress_cb)(off_t, void *) */
                NULL, /* void *progress_data */
+               nullptr, /* rgw::sal::ObjectFilter *read_filter */
                dpp,
                y,
                no_trace);
@@ -3179,6 +3180,7 @@ int RGWRados::swift_versioning_restore(RGWObjectCtx& obj_ctx,
                        nullptr,       /* string *petag */
                        nullptr,       /* void (*progress_cb)(off_t, void *) */
                        nullptr,       /* void *progress_data */
+                       nullptr,       /* rgw::sal::ObjectFilter *read_filter */
                        dpp,
                        y,
                        no_trace);
@@ -3890,7 +3892,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj, c
   return copy_obj_data(octx, owner, dest_bucket_info,
                        dest_bucket_info.placement_rule,
                        read_op, obj_size - 1, obj, NULL, mtime,
-                       attrset, 0, real_time(), NULL, dpp, y);
+                       attrset, 0, real_time(), NULL, nullptr, dpp, y);
 }
 
 
@@ -4919,6 +4921,7 @@ int RGWRados::copy_obj(RGWObjectCtx& src_obj_ctx,
                string *petag,
                void (*progress_cb)(off_t, void *),
                void *progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider *dpp,
                optional_yield y,
                jspan_context& trace)
@@ -4982,16 +4985,10 @@ int RGWRados::copy_obj(RGWObjectCtx& src_obj_ctx,
   if (ret < 0) {
     return ret;
   }
-  if (src_attrs.count(RGW_ATTR_CRYPT_MODE)) {
-    // Current implementation does not follow S3 spec and even
-    // may result in data corruption silently when copying
-    // multipart objects across pools. So reject COPY operations
-    //on encrypted objects before it is fully functional.
-    ldpp_dout(dpp, 0) << "ERROR: copy op for encrypted object " << src_obj
-                  << " has not been implemented." << dendl;
-    return -ERR_NOT_IMPLEMENTED;
-  }
 
+  if (read_filter) {
+    read_filter->set_src_attrs(src_attrs);
+  }
   src_attrs[RGW_ATTR_ACL] = attrs[RGW_ATTR_ACL];
   src_attrs.erase(RGW_ATTR_DELETE_AT);
 
@@ -5078,6 +5075,10 @@ int RGWRados::copy_obj(RGWObjectCtx& src_obj_ctx,
     (*src_rule != dest_placement) ||
     (src_pool != dest_pool);
 
+  if (!copy_data && read_filter && read_filter->need_copy_data()) {
+    copy_data = true;
+  }
+
   bool copy_first = false;
   if (amanifest) {
     if (!amanifest->has_tail()) {
@@ -5105,7 +5106,8 @@ int RGWRados::copy_obj(RGWObjectCtx& src_obj_ctx,
   if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
     attrs.erase(RGW_ATTR_TAIL_TAG);
     return copy_obj_data(dest_obj_ctx, owner, dest_bucket_info, dest_placement, read_op, obj_size - 1, dest_obj,
-                         mtime, real_time(), attrs, olh_epoch, delete_at, petag, dpp, y);
+                         mtime, real_time(), attrs, olh_epoch, delete_at,
+                         petag, read_filter, dpp, y);
   }
 
   /* This has been in for 2 years, so we can safely assume amanifest is not NULL */
@@ -5273,6 +5275,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
                uint64_t olh_epoch,
               real_time delete_at,
                string *petag,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider *dpp,
                optional_yield y,
                bool log_op)
@@ -5283,10 +5286,26 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   auto aio = rgw::make_throttle(cct->_conf->rgw_put_obj_min_window_size, y);
   using namespace rgw::putobj;
   jspan_context no_trace{false, false};
-  AtomicObjectProcessor processor(aio.get(), this, dest_bucket_info,
-                                  &dest_placement, owner,
-                                  obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
-  int ret = processor.prepare(y);
+  struct PostPipe : public rgw::sal::DataProcessor {
+    const DoutPrefixProvider *dpp;     // in case of debugging
+    rgw::sal::DataProcessor *next;
+    explicit PostPipe(const DoutPrefixProvider *_dpp) : dpp(_dpp), next(nullptr) {} 
+    int process(bufferlist && data, uint64_t off) override {
+        auto ret = next->process(std::move(data), off);
+       return ret;
+    }
+  } pproc { dpp };
+  rgw::sal::DataProcessor &processor { read_filter ? read_filter->get_filter(pproc, y)
+    : static_cast< rgw::sal::DataProcessor&>(pproc)};
+  AtomicObjectProcessor aoproc(aio.get(), this, dest_bucket_info,
+                               &dest_placement, owner,
+                               obj_ctx, dest_obj, olh_epoch, tag, dpp, y, no_trace);
+  pproc.next = read_filter ? read_filter->get_output(aoproc, obj_ctx, dest_placement, y)
+    : static_cast< rgw::sal::DataProcessor* >(&aoproc);
+  if (!pproc.next) {
+    return read_filter->get_error();
+  }
+  int ret = aoproc.prepare(y);
   if (ret < 0)
     return ret;
 
@@ -5314,6 +5333,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   if (ret < 0) {
     return ret;
   }
+  if (read_filter) {
+    ret = read_filter->set_compression_attribute();
+    if (ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to set compression attributes" << dendl;
+      return ret;
+    }
+  }
 
   string etag;
   auto iter = attrs.find(RGW_ATTR_ETAG);
@@ -5339,7 +5365,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
   }
 
   const req_context rctx{dpp, y, nullptr};
-  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs,
+  return aoproc.complete(accounted_size, etag, mtime, set_mtime, attrs,
                            rgw::cksum::no_cksum, delete_at,
                             nullptr, nullptr, nullptr, nullptr, nullptr, rctx,
                             log_op ? rgw::sal::FLAG_LOG_OP : 0);
@@ -5404,6 +5430,7 @@ int RGWRados::transition_obj(RGWObjectCtx& obj_ctx,
                       olh_epoch,
                       real_time(),
                       nullptr /* petag */,
+                      nullptr,
                       dpp,
                       y,
                       log_op);
index 13acd2cae84cfcba4740cd13287449e23eacb9cc..de671b787d25a4e74d5e99eb7229a1d5692596ba 100644 (file)
@@ -1256,6 +1256,7 @@ public:
                std::string *petag,
                void (*progress_cb)(off_t, void *),
                void *progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider *dpp,
                optional_yield y,
                jspan_context& trace);
@@ -1272,6 +1273,7 @@ public:
                uint64_t olh_epoch,
               ceph::real_time delete_at,
                std::string *petag,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider *dpp,
                optional_yield y,
                bool log_op = true);
index 77d0ccb2d47efc7f82e9c4f11aea49bff190c115..ae01f2da28cc93671cbdceee32b2fe9a68c40791 100644 (file)
@@ -3640,6 +3640,7 @@ int RadosObject::copy_object(const ACLOwner& owner,
                                std::string* etag,
                                void (*progress_cb)(off_t, void *),
                                void* progress_data,
+                               rgw::sal::ObjectFilter *read_filter,
                                const DoutPrefixProvider* dpp,
                                optional_yield y)
 {
@@ -3672,6 +3673,7 @@ int RadosObject::copy_object(const ACLOwner& owner,
                                     etag,
                                     progress_cb,
                                     progress_data,
+                                    read_filter,
                                     dpp,
                                     y,
                                      dest_object->get_trace());
index ecfb4ba8c4352e4dec408882be15ef86c6f32a3a..6f4a1296ac311b34e01d9023928e6041571235ba 100644 (file)
@@ -583,6 +583,7 @@ class RadosObject : public StoreObject {
               boost::optional<ceph::real_time> delete_at,
                std::string* version_id, std::string* tag, std::string* etag,
                void (*progress_cb)(off_t, void *), void* progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider* dpp, optional_yield y) override;
     virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
     virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
index 2196e982953f0ce7301545aba961710960db3bef..3bcebd43e515fa73cb3c32966306580f7328791c 100644 (file)
@@ -924,6 +924,16 @@ struct CryptAttributes {
   }
 };
 
+static inline const std::string rgw_str_find(const std::map<std::string,
+                                               ceph::bufferlist>& attrs,
+                                             const std::string &name) {
+  auto i = attrs.find(name);
+  if (i != attrs.end()) {
+    return rgw_bl_str(i->second);
+  }
+  return std::string();
+}
+
 std::string fetch_bucket_key_id(req_state *s)
 {
   auto kek_iter = s->bucket_attrs.find(RGW_ATTR_BUCKET_ENCRYPTION_KEY_ID);
@@ -1300,79 +1310,76 @@ int rgw_s3_prepare_encrypt(req_state* s, optional_yield y,
 }
 
 
-int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
+int rgw_s3_prepare_decrypt(RGWDecryptContext &cb, optional_yield y,
                            map<string, bufferlist>& attrs,
                            std::unique_ptr<BlockCrypt>* block_crypt,
                            std::map<std::string, std::string>& crypt_http_responses)
 {
   int res = 0;
   std::string stored_mode = get_str_attribute(attrs, RGW_ATTR_CRYPT_MODE);
-  ldpp_dout(s, 15) << "Encryption mode: " << stored_mode << dendl;
+  ldpp_dout(cb.dpp, 15) << "Encryption mode: " << stored_mode << dendl;
 
-  const char *req_sse = s->info.env->get("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION", NULL);
-  if (nullptr != req_sse && (s->op == OP_GET || s->op == OP_HEAD)) {
+  const char *req_sse = cb.env->get("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION");
+  if (nullptr != req_sse && cb.get_or_head) {
     return -ERR_INVALID_REQUEST;
   }
 
   if (stored_mode == "SSE-C-AES256") {
-    if (s->cct->_conf->rgw_crypt_require_ssl &&
-        !rgw_transport_is_secure(s->cct, *s->info.env)) {
-      ldpp_dout(s, 5) << "ERROR: Insecure request, rgw_crypt_require_ssl is set" << dendl;
+    if (!cb.secure_channel) {
+      ldpp_dout(cb.dpp, 5) << "ERROR: Insecure request, rgw_crypt_require_ssl is set" << dendl;
       return -ERR_INVALID_REQUEST;
     }
-    const char *req_cust_alg =
-        s->info.env->get("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM", NULL);
+    const char *req_cust_alg = cb.get_customer_algorithm();
 
     if (nullptr == req_cust_alg)  {
-      ldpp_dout(s, 5) << "ERROR: Request for SSE-C encrypted object missing "
+      ldpp_dout(cb.dpp, 5) << "ERROR: Request for SSE-C encrypted object missing "
                        << "x-amz-server-side-encryption-customer-algorithm"
                        << dendl;
-      s->err.message = "Requests specifying Server Side Encryption with Customer "
+      cb.error_message = "Requests specifying Server Side Encryption with Customer "
                        "provided keys must provide a valid encryption algorithm.";
       return -EINVAL;
     } else if (strcmp(req_cust_alg, "AES256") != 0) {
-      ldpp_dout(s, 5) << "ERROR: The requested encryption algorithm is not valid, must be AES256." << dendl;
-      s->err.message = "The requested encryption algorithm is not valid, must be AES256.";
+      ldpp_dout(cb.dpp, 5) << "ERROR: The requested encryption algorithm is not valid, must be AES256." << dendl;
+      cb.error_message = "The requested encryption algorithm is not valid, must be AES256.";
       return -ERR_INVALID_ENCRYPTION_ALGORITHM;
     }
 
     std::string key_bin;
     try {
-      key_bin = from_base64(s->info.env->get("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY", ""));
+      key_bin = from_base64(cb.get_customer_key(""));
     } catch (...) {
-      ldpp_dout(s, 5) << "ERROR: rgw_s3_prepare_decrypt invalid encryption key "
+      ldpp_dout(cb.dpp, 5) << "ERROR: rgw_s3_prepare_decrypt invalid encryption key "
                        << "which contains character that is not base64 encoded."
                        << dendl;
-      s->err.message = "Requests specifying Server Side Encryption with Customer "
+      cb.error_message = "Requests specifying Server Side Encryption with Customer "
                        "provided keys must provide an appropriate secret key.";
       return -EINVAL;
     }
 
     if (key_bin.size() != AES_256_CBC::AES_256_KEYSIZE) {
-      ldpp_dout(s, 5) << "ERROR: Invalid encryption key size" << dendl;
-      s->err.message = "Requests specifying Server Side Encryption with Customer "
+      ldpp_dout(cb.dpp, 5) << "ERROR: Invalid encryption key size" << dendl;
+      cb.error_message = "Requests specifying Server Side Encryption with Customer "
                        "provided keys must provide an appropriate secret key.";
       return -EINVAL;
     }
 
-    std::string keymd5 =
-        s->info.env->get("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5", "");
+    std::string keymd5 = cb.get_customer_key_md5("");
     std::string keymd5_bin;
     try {
       keymd5_bin = from_base64(keymd5);
     } catch (...) {
-      ldpp_dout(s, 5) << "ERROR: rgw_s3_prepare_decrypt invalid encryption key md5 "
+      ldpp_dout(cb.dpp, 5) << "ERROR: rgw_s3_prepare_decrypt invalid encryption key md5 "
                        << "which contains character that is not base64 encoded."
                        << dendl;
-      s->err.message = "Requests specifying Server Side Encryption with Customer "
+      cb.error_message = "Requests specifying Server Side Encryption with Customer "
                        "provided keys must provide an appropriate secret key md5.";
       return -EINVAL;
     }
 
 
     if (keymd5_bin.size() != CEPH_CRYPTO_MD5_DIGESTSIZE) {
-      ldpp_dout(s, 5) << "ERROR: Invalid key md5 size " << dendl;
-      s->err.message = "Requests specifying Server Side Encryption with Customer "
+      ldpp_dout(cb.dpp, 5) << "ERROR: Invalid key md5 size " << dendl;
+      cb.error_message = "Requests specifying Server Side Encryption with Customer "
                        "provided keys must provide an appropriate secret key md5.";
       return -EINVAL;
     }
@@ -1386,10 +1393,10 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
 
     if ((memcmp(key_hash_res, keymd5_bin.c_str(), CEPH_CRYPTO_MD5_DIGESTSIZE) != 0) ||
         (get_str_attribute(attrs, RGW_ATTR_CRYPT_KEYMD5) != keymd5_bin)) {
-      s->err.message = "The calculated MD5 hash of the key did not match the hash that was provided.";
+      cb.error_message = "The calculated MD5 hash of the key did not match the hash that was provided.";
       return -EINVAL;
     }
-    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(s, s->cct));
+    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(cb.dpp, cb.cct));
     aes->set_key(reinterpret_cast<const uint8_t*>(key_bin.c_str()), AES_256_CBC::AES_256_KEYSIZE);
     if (block_crypt) *block_crypt = std::move(aes);
 
@@ -1399,28 +1406,27 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
   }
 
   if (stored_mode == "SSE-KMS") {
-    if (s->cct->_conf->rgw_crypt_require_ssl &&
-        !rgw_transport_is_secure(s->cct, *s->info.env)) {
-      ldpp_dout(s, 5) << "ERROR: Insecure request, rgw_crypt_require_ssl is set" << dendl;
+    if (!cb.secure_channel) {
+      ldpp_dout(cb.dpp, 5) << "ERROR: Insecure request, rgw_crypt_require_ssl is set" << dendl;
       return -ERR_INVALID_REQUEST;
     }
     /* try to retrieve actual key */
     std::string key_id = get_str_attribute(attrs, RGW_ATTR_CRYPT_KEYID);
     std::string actual_key;
-    res = reconstitute_actual_key_from_kms(s, attrs, y, actual_key);
+    res = reconstitute_actual_key_from_kms(cb.dpp, attrs, y, actual_key);
     if (res != 0) {
-      ldpp_dout(s, 10) << "ERROR: failed to retrieve actual key from key_id: " << key_id << dendl;
-      s->err.message = "Failed to retrieve the actual key, kms-keyid: " + key_id;
+      ldpp_dout(cb.dpp, 10) << "ERROR: failed to retrieve actual key from key_id: " << key_id << dendl;
+      cb.error_message = "Failed to retrieve the actual key, kms-keyid: " + key_id;
       return res;
     }
     if (actual_key.size() != AES_256_KEYSIZE) {
-      ldpp_dout(s, 0) << "ERROR: key obtained from key_id:" <<
+      ldpp_dout(cb.dpp, 0) << "ERROR: key obtained from key_id:" <<
           key_id << " is not 256 bit size" << dendl;
-      s->err.message = "KMS provided an invalid key for the given kms-keyid.";
+      cb.error_message = "KMS provided an invalid key for the given kms-keyid.";
       return -EINVAL;
     }
 
-    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(s, s->cct));
+    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(cb.dpp, cb.cct));
     aes->set_key(reinterpret_cast<const uint8_t*>(actual_key.c_str()), AES_256_KEYSIZE);
     actual_key.replace(0, actual_key.length(), actual_key.length(), '\000');
     if (block_crypt) *block_crypt = std::move(aes);
@@ -1433,26 +1439,26 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
   if (stored_mode == "RGW-AUTO") {
     std::string master_encryption_key;
     try {
-      master_encryption_key = from_base64(std::string(s->cct->_conf->rgw_crypt_default_encryption_key));
+      master_encryption_key = from_base64(std::string(cb.cct->_conf->rgw_crypt_default_encryption_key));
     } catch (...) {
-      ldpp_dout(s, 5) << "ERROR: rgw_s3_prepare_decrypt invalid default encryption key "
+      ldpp_dout(cb.dpp, 5) << "ERROR: rgw_s3_prepare_decrypt invalid default encryption key "
                        << "which contains character that is not base64 encoded."
                        << dendl;
-      s->err.message = "The default encryption key is not valid base64.";
+      cb.error_message = "The default encryption key is not valid base64.";
       return -EINVAL;
     }
 
     if (master_encryption_key.size() != 256 / 8) {
-      ldpp_dout(s, 0) << "ERROR: failed to decode 'rgw crypt default encryption key' to 256 bit string" << dendl;
+      ldpp_dout(cb.dpp, 0) << "ERROR: failed to decode 'rgw crypt default encryption key' to 256 bit string" << dendl;
       return -EIO;
     }
     std::string attr_key_selector = get_str_attribute(attrs, RGW_ATTR_CRYPT_KEYSEL);
     if (attr_key_selector.size() != AES_256_CBC::AES_256_KEYSIZE) {
-      ldpp_dout(s, 0) << "ERROR: missing or invalid " RGW_ATTR_CRYPT_KEYSEL << dendl;
+      ldpp_dout(cb.dpp, 0) << "ERROR: missing or invalid " RGW_ATTR_CRYPT_KEYSEL << dendl;
       return -EIO;
     }
     uint8_t actual_key[AES_256_KEYSIZE];
-    if (AES_256_ECB_encrypt(s, s->cct,
+    if (AES_256_ECB_encrypt(cb.dpp, cb.cct,
                             reinterpret_cast<const uint8_t*>(master_encryption_key.c_str()),
                             AES_256_KEYSIZE,
                             reinterpret_cast<const uint8_t*>(attr_key_selector.c_str()),
@@ -1460,7 +1466,7 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
       ::ceph::crypto::zeroize_for_security(actual_key, sizeof(actual_key));
       return -EIO;
     }
-    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(s, s->cct));
+    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(cb.dpp, cb.cct));
     aes->set_key(actual_key, AES_256_KEYSIZE);
     ::ceph::crypto::zeroize_for_security(actual_key, sizeof(actual_key));
     if (block_crypt) *block_crypt = std::move(aes);
@@ -1472,20 +1478,20 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
     /* try to retrieve actual key */
     std::string key_id = get_str_attribute(attrs, RGW_ATTR_CRYPT_KEYID);
     std::string actual_key;
-    res = reconstitute_actual_key_from_sse_s3(s, attrs, y, actual_key);
+    res = reconstitute_actual_key_from_sse_s3(cb.dpp, attrs, y, actual_key);
     if (res != 0) {
-      ldpp_dout(s, 10) << "ERROR: failed to retrieve actual key" << dendl;
-      s->err.message = "Failed to retrieve the actual key";
+      ldpp_dout(cb.dpp, 10) << "ERROR: failed to retrieve actual key" << dendl;
+      cb.error_message = "Failed to retrieve the actual key";
       return res;
     }
     if (actual_key.size() != AES_256_KEYSIZE) {
-      ldpp_dout(s, 0) << "ERROR: key obtained " <<
+      ldpp_dout(cb.dpp, 0) << "ERROR: key obtained " <<
           "is not 256 bit size" << dendl;
-      s->err.message = "SSE-S3 provided an invalid key for the given keyid.";
+      cb.error_message = "SSE-S3 provided an invalid key for the given keyid.";
       return -EINVAL;
     }
 
-    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(s, s->cct));
+    auto aes = std::unique_ptr<AES_256_CBC>(new AES_256_CBC(cb.dpp, cb.cct));
     aes->set_key(reinterpret_cast<const uint8_t*>(actual_key.c_str()), AES_256_KEYSIZE);
     actual_key.replace(0, actual_key.length(), actual_key.length(), '\000');
     if (block_crypt) *block_crypt = std::move(aes);
@@ -1499,6 +1505,17 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
   return 0;
 }
 
+int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
+                       map<string, bufferlist>& attrs,
+                       std::unique_ptr<BlockCrypt>* block_crypt,
+                       std::map<std::string, std::string>& crypt_http_responses)
+{
+  RGWDecryptContext dec_cb(s);
+  int res;
+  res = rgw_s3_prepare_decrypt(dec_cb, y, attrs, block_crypt, crypt_http_responses);
+  return res;
+}
+
 int rgw_remove_sse_s3_bucket_key(req_state *s, optional_yield y)
 {
   int res;
@@ -1532,6 +1549,66 @@ int rgw_remove_sse_s3_bucket_key(req_state *s, optional_yield y)
   return res;
 }
 
+// dry run of "rgw_s3_prepare_encrypt" to map
+// critical s3 encryption parameters to internal mappings.
+// This is for use by rgw_need_copy_data, to determine if it is
+// necessary to use copy_obj_data().  Only collects the data
+// necessary to determine compatibility; no need for the rest.
+static int dummy_prepare_encrypt(req_state* s,
+                           std::map<std::string, ceph::bufferlist>& attrs)
+{
+  CryptAttributes crypt_attributes { s };
+
+  std::string_view req_sse_ca =
+      crypt_attributes.get(X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM);
+  if (! req_sse_ca.empty()) {
+    if (req_sse_ca != "AES256") {
+//  The requested encryption algorithm is not valid, must be AES256.
+      return -ERR_INVALID_ENCRYPTION_ALGORITHM;
+    }
+    set_attr(attrs, RGW_ATTR_CRYPT_MODE, "SSE-C-AES256");
+    return 0;
+  }
+
+  /* AMAZON server side encryption with KMS (key management service) */
+  std::string_view req_sse =
+      crypt_attributes.get(X_AMZ_SERVER_SIDE_ENCRYPTION);
+  if (! req_sse.empty()) {
+    if (req_sse == "aws:kms") {
+      set_attr(attrs, RGW_ATTR_CRYPT_MODE, "SSE-KMS");
+      return 0;
+    } else if (req_sse != "AES256") {
+//  ERROR: Invalid value for header x-amz-server-side-encryption
+      return -EINVAL;
+    }
+    set_attr(attrs, RGW_ATTR_CRYPT_MODE, "AES256");
+//    set_attr(attrs, RGW_ATTR_CRYPT_KEYID, key_id);
+    return 0;
+  } else if (s->cct->_conf->rgw_crypt_default_encryption_key != "") {
+    set_attr(attrs, RGW_ATTR_CRYPT_MODE, "RGW-AUTO");
+  }
+  return 0;
+}
+
+// When copying an object, if it is encrypted, only when the key is the
+// same can we just copy the data.  Otherwise, must do the decrypt/re-encrypt
+// loop.  This routine determines when the original and proposed
+// encryption requires that we re-encrypt the object.
+
+bool rgw_need_copy_data( std::map<std::string, ceph::bufferlist>& src_attrs,
+  req_state* s) {
+  std::map<std::string, ceph::bufferlist> enc_attrs;
+  int err = dummy_prepare_encrypt( s, enc_attrs );
+  if (err) {
+    return true;
+  }
+  std::string src_mode = get_str_attribute(src_attrs, RGW_ATTR_CRYPT_MODE);
+  std::string enc_mode = get_str_attribute(enc_attrs, RGW_ATTR_CRYPT_MODE);
+
+  bool r = src_mode != "" || enc_mode !=  "";
+  return r;
+}
+
 /*********************************************************************
 *      "BOTTOM OF FILE"
 *      I've left some commented out lines above.  They are there for
index 51208388f4c80f5dd064e05841f6152b572129e8..37544bc3968f3a0cedec1a2d9076121251d9ca48 100644 (file)
@@ -146,6 +146,59 @@ public:
   int process(bufferlist&& data, uint64_t logical_offset) override;
 }; /* RGWPutObj_BlockEncrypt */
 
+struct RGWDecryptContext {
+  const DoutPrefixProvider *dpp;
+  CephContext* cct;
+  std::string &error_message;
+  bool get_or_head;
+  bool secure_channel;
+  const RGWEnv *env;
+  const char *sse_ca;
+  const char *sse_c_key;
+  const char *sse_c_md5;
+  const char *get_customer_algorithm(const char *def_val = nullptr) {
+    return env->get(sse_ca, def_val);
+  }
+  const char *get_customer_key(const char *def_val = nullptr) {
+    return env->get(sse_c_key, def_val);
+  }
+  const char *get_customer_key_md5(const char *def_val = nullptr) {
+    return env->get(sse_c_md5, def_val);
+  }
+  RGWDecryptContext(req_state *s) : dpp(s), cct(s->cct), error_message(s->err.message),
+       get_or_head(s->op == OP_GET || s->op == OP_HEAD),
+       secure_channel(!s->cct->_conf->rgw_crypt_require_ssl ||
+               rgw_transport_is_secure(s->cct, *s->info.env)),
+       env(s->info.env),
+       sse_ca("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM"),
+       sse_c_key("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY"),
+       sse_c_md5("HTTP_X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5") {
+  };
+#if 0
+  RGWDecryptContext(req_state *s, bool customer_side) : dpp(s), cct(s->cct),
+        error_message(s->err.message),
+       get_or_head(s->op == OP_GET || s->op == OP_HEAD),
+       secure_channel(customer_side),
+       env(s->info.env),
+       sse_ca("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM"),
+       sse_c_key("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY"),
+       sse_c_md5("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5") {
+  };
+#endif
+  RGWDecryptContext(const DoutPrefixProvider* _dpp, CephContext* _cct,
+            std::string &_error_message,
+            bool _get_or_head, bool _secure_channel,
+            const RGWEnv *env)
+        : dpp(_dpp), cct(_cct), error_message(_error_message),
+       get_or_head(_get_or_head),
+       secure_channel(_secure_channel),
+       env(env),
+        sse_ca("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM"),
+        sse_c_key("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY"),
+        sse_c_md5("HTTP_X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5") {
+  };
+}; /* RGWDecryptContext */
+
 
 int rgw_s3_prepare_encrypt(req_state* s, optional_yield y,
                            std::map<std::string, ceph::bufferlist>& attrs,
@@ -159,6 +212,12 @@ int rgw_s3_prepare_decrypt(req_state* s, optional_yield y,
                            std::map<std::string,
                                     std::string>& crypt_http_responses);
 
+int rgw_s3_prepare_decrypt(RGWDecryptContext &cb, optional_yield y,
+                           std::map<std::string, ceph::bufferlist>& attrs,
+                           std::unique_ptr<BlockCrypt>* block_crypt,
+                           std::map<std::string,
+                                    std::string>& crypt_http_responses);
+
 static inline void set_attr(std::map<std::string, bufferlist>& attrs,
                             const char* key,
                             std::string_view value)
@@ -179,3 +238,5 @@ static inline std::string get_str_attribute(const std::map<std::string, bufferli
 }
 
 int rgw_remove_sse_s3_bucket_key(req_state *s, optional_yield y);
+bool rgw_need_copy_data( std::map<std::string, ceph::bufferlist>& src_attrs,
+                    req_state *s);
index 3c723761796e8a1419c455b1de4b0352648f4838..f8f4c048baf64aa4056365201f77db5cc68d74f5 100644 (file)
@@ -77,6 +77,9 @@
 #include "cls/lock/cls_lock_client.h"
 #include "cls/rgw/cls_rgw_client.h"
 
+#include "rgw_auth.h"
+#include "rgw_auth_registry.h"
+
 
 #include "include/ceph_assert.h"
 
@@ -5941,6 +5944,281 @@ void RGWCopyObj::pre_exec()
   rgw_bucket_object_pre_exec(s);
 }
 
+class RGWCOE_filter_from_proc: public RGWGetObj_Filter {
+  const DoutPrefixProvider *dpp;
+  rgw::sal::DataProcessor* processor;
+  off_t ofs;
+  bool flushed;
+public:
+  RGWCOE_filter_from_proc() {}
+  RGWCOE_filter_from_proc(const DoutPrefixProvider *_dpp,
+    rgw::sal::DataProcessor &_p) : dpp(_dpp), processor(&_p), ofs(0), flushed(false) {
+  };
+  ~RGWCOE_filter_from_proc() {};
+  int handle_data(bufferlist &bl, off_t bl_ofs, off_t bl_len) override {
+    if (flushed) {
+      ldpp_dout(dpp, 0) << "ERROR: RGWCOE_filter_from_proc::handle_data: data after flush" << dendl;
+      return -EIO;
+    }
+    uint64_t read_len = bl_len;
+    bufferlist copy;
+    bl.begin(bl_ofs).copy(bl_len, copy);
+    int ret = processor->process(std::move(copy), ofs);
+    if (ret < 0) return ret;
+    ofs += read_len;
+    return read_len;
+  };
+  // this is the bottom filter; so allow for double flushing just in case
+  int flush() override {
+    if (flushed) return 0;
+    flushed = true;
+    int ret = processor->process({}, ofs);
+    return ret;
+  };
+  // no override: virtual int fixup_range(off&ofs, off_t&end){ return 0; }
+};
+
+// getobject filter pipeline
+// get_filter uses this as the apex of the filter chain it constructs
+class RGWCOE_proc_from_filters : public rgw::sal::DataProcessor
+{
+  const DoutPrefixProvider *dpp;       // in case of debugging...
+  RGWGetObj_Filter &filter;
+  RGWGetObj_Filter &cb;
+public:
+  RGWCOE_proc_from_filters(const DoutPrefixProvider *_dpp, RGWGetObj_Filter &f, RGWGetObj_Filter &n)
+        : dpp(_dpp), filter(f), cb(n) {
+  }
+  int process(bufferlist &&bl, uint64_t ofs) override {
+    off_t len = bl.length();
+    int ret;
+    if (len > 0) {
+      ret = filter.handle_data(bl, ofs, len);
+    } else {
+      ret = filter.flush();
+      if (ret >= 0) {          // GetObj filters not guaranteed to propagate
+        ret = cb.flush();      // flush, so call bottom filter directly.
+      }
+    }
+    return ret;
+  };
+};
+
+class RGWCOE_make_filter_pipeline : public rgw::sal::ObjectFilter {
+  CephContext *cct;
+  int op_ret;
+  map<string, bufferlist> &attrs;
+  bool need_decompress;
+  RGWCompressionInfo cs_info;
+  bool encrypted;
+  std::unique_ptr<RGWGetObj_Filter> decrypt;
+  int64_t ofs_x, end_x;
+  std::unique_ptr<RGWGetObj_Filter> cb;
+  std::map<std::string, ceph::buffer::list> src_attrs;
+  std::map<std::string, ceph::buffer::list> enc_attrs;
+  bool skip_decrypt;
+  DoutPrefixProvider *dpp;
+  boost::optional<RGWGetObj_Decompress> decompress;
+  bool partial_content = false;
+  std::map<std::string, std::string> crypt_http_responses;     // XXX who consumes?
+  std::unique_ptr<rgw::sal::DataProcessor> oproc;
+  const RGWEnv *env;
+  struct rgw_err &err;
+  std::unique_ptr<rgw::sal::Object> &object;
+  uint64_t &obj_size;
+  RGWDecryptContext dctx;
+  req_state *s;                        // destination only, not for source!
+  std::unique_ptr<rgw::sal::DataProcessor> encrypt;
+  boost::optional<RGWPutObj_Compress> compressor;
+  CompressorRef plugin;
+public:
+  RGWCOE_make_filter_pipeline(req_state *_s, DoutPrefixProvider *_dpp,
+      map<string, bufferlist> &_a, bool _skip_decrypt,
+      std::unique_ptr<rgw::sal::Object> & _object, uint64_t &_obj_size)
+    : cct(_s->cct), attrs(_a), encrypted( attrs.count(RGW_ATTR_CRYPT_MODE)),
+      skip_decrypt(_skip_decrypt), dpp(_dpp),
+      env(_s->info.env), err(_s->err),
+      object(_object),
+      obj_size(_obj_size),
+      dctx( dpp, cct,
+        err.message,
+        false, 
+        !cct->_conf->rgw_crypt_require_ssl
+            || rgw_transport_is_secure(cct, *env),
+        env),
+      s(_s), plugin(nullptr) {
+  };
+  int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter> *filter,
+      RGWGetObj_Filter* cb,
+      bufferlist* manifest_bl,
+      optional_yield y) {
+    if (skip_decrypt) {
+      return 0;
+    }
+    std::unique_ptr<BlockCrypt> block_crypt;
+    int res = rgw_s3_prepare_decrypt(dctx, y, src_attrs, &block_crypt, crypt_http_responses);
+    if (res < 0) {
+      return res;
+    }
+    if (block_crypt == nullptr) {
+      return 0;
+    }
+    std::vector<size_t> parts_len;
+    res = RGWGetObj_BlockDecrypt::read_manifest_parts(dpp, *manifest_bl, parts_len);
+    if (res < 0) {
+      return res;
+    }
+    *filter = std::make_unique<RGWGetObj_BlockDecrypt>(dpp, cct, cb,
+        std::move(block_crypt), std::move(parts_len),
+        y);
+    return res;
+  }
+  int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+      rgw::sal::DataProcessor *cb)
+  {
+    int res = 0;
+    std::unique_ptr<BlockCrypt> block_crypt;
+    res = rgw_s3_prepare_encrypt(s, s->yield, attrs, &block_crypt,
+                                crypt_http_responses);
+    if (res == 0 && block_crypt != nullptr) {
+      filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
+    }
+    return res;
+  }
+
+  int set_compression_attribute() override {
+    if (compressor && compressor->is_compressed()) {
+      ceph::bufferlist tmp;
+      RGWCompressionInfo cs_info;
+      assert(plugin != nullptr);
+      // plugin exists when the compressor does
+      // coverity[dereference:SUPPRESS]
+      cs_info.compression_type = plugin->get_type_name();
+      cs_info.orig_size = obj_size;
+      cs_info.compressor_message = compressor->get_compressor_message();
+      cs_info.blocks = std::move(compressor->get_compression_blocks());
+      encode(cs_info, tmp);
+      attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
+    }
+    return 0;
+  }
+  void clear_encryption_attrs( std::map<std::string, ceph::buffer::list> &a) {
+    for ( auto it = a.begin(); it != a.end(); ) {
+      if ( boost::algorithm::starts_with(it->first, RGW_ATTR_CRYPT_PREFIX)) {
+       it = a.erase(it);
+      }
+      else {
+        ++it;
+      }
+    }
+  }
+  std::map<std::string, ceph::buffer::list>
+  filter_encryption_compression_attrs(std::map<std::string, ceph::buffer::list> &_a, bool keep_manifest) {
+     std::map<std::string, ceph::buffer::list> r;
+     for ( auto& it : _a) {
+       const auto& attr_name = it.first;
+       bufferlist &val = it.second;
+       if ( !boost::algorithm::starts_with(attr_name, RGW_ATTR_CRYPT_PREFIX)
+         && (!keep_manifest || attr_name != RGW_ATTR_MANIFEST)
+         && attr_name != RGW_ATTR_COMPRESSION ) {
+         continue;
+       }
+       ceph::buffer::list temp;
+       temp.append(val);
+       r.emplace(std::string(attr_name), std::move(temp));
+     }
+     return r;
+   };
+  void merge_attrs(std::map<std::string, ceph::buffer::list> &additions,std::map<std::string, ceph::buffer::list> &target) {
+     for ( auto &it : additions) {
+       ceph::buffer::list temp;
+       temp.append(it.second);
+       target.emplace(std::string(it.first), std::move(temp));
+     }
+  }
+
+  rgw::sal::DataProcessor & get_filter(rgw::sal::DataProcessor&next, optional_yield y) override {
+    ofs_x = 0;
+    end_x = obj_size;
+    encrypted = false;
+    cb = std::make_unique<RGWCOE_filter_from_proc>(dpp, next);
+    RGWGetObj_Filter *filter = &*cb;
+    // decompress
+    op_ret = rgw_compression_info_from_attrset(src_attrs, need_decompress, cs_info);
+    if (op_ret < 0) {
+      ldpp_dout(dpp, 0) << "ERROR: failed to decode compression info, cannot decompress" << dendl;
+      throw op_ret;
+    }
+    if (need_decompress && (!encrypted || !skip_decrypt)) {
+      obj_size = cs_info.orig_size;                    // XXX where?
+      object->set_obj_size(cs_info.orig_size);         // XXX where?
+      decompress.emplace(cct, &cs_info, partial_content, filter);
+      filter = &*decompress;
+ldpp_dout(dpp, 0) << "TEMP: end_x=" << end_x << " obj_size=" << obj_size << dendl;
+      end_x = obj_size;
+    }
+    // decrypt
+    filter->fixup_range(ofs_x, end_x);
+
+    auto attr_iter = src_attrs.find(RGW_ATTR_MANIFEST);
+    op_ret = this->get_decrypt_filter(&decrypt, filter,
+                                     attr_iter != src_attrs.end() ? &(attr_iter->second) : nullptr, y);
+    if (decrypt != nullptr) {
+      filter = decrypt.get();
+      filter->fixup_range(ofs_x, end_x);
+    }
+    if (op_ret < 0) {
+      throw op_ret;
+    }
+    oproc = std::make_unique<RGWCOE_proc_from_filters>(RGWCOE_proc_from_filters(dpp, *filter,
+                                                       *cb));
+    return *oproc;
+  };
+  rgw::sal::DataProcessor * get_output(rgw::sal::DataProcessor&next, RGWObjectCtx& obj_ctx, const rgw_placement_rule& dest_placement, optional_yield y) override {
+    rgw::sal::DataProcessor *filter = &next;
+    // at this point, attrs enc/compression settings are a muddle of src and request
+    // we need it to be just the requested settings
+    clear_encryption_attrs(attrs);
+    merge_attrs(enc_attrs, attrs);     // request & bucket encryption defaults
+    attrs.erase(RGW_ATTR_COMPRESSION); // only true source: zone data
+    op_ret = get_encrypt_filter(&encrypt, filter);
+    if (op_ret < 0) {
+      return nullptr;
+    }
+    if (encrypt != nullptr) {
+      filter = encrypt.get();
+    }
+    // a zonegroup feature is required...
+    const RGWZoneGroup& zonegroup = s->penv.site->get_zonegroup();
+    const bool compress_encrypted = zonegroup.supports(rgw::zone_features::compress_encrypted);
+    const auto& compression_type = obj_ctx.get_driver()->get_compression_type(dest_placement);
+    if (compression_type != "none" &&
+       (encrypt == nullptr || compress_encrypted)) {
+      plugin = get_compressor_plugin(s, compression_type);
+      if (!plugin) {
+       ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
+         << compression_type << dendl;
+      } else {
+       compressor.emplace(s->cct, plugin, filter);
+        filter = &*compressor;
+        // always send incompressible hint when rgw is itself doing compression
+        s->object->set_compressed();
+      }
+    }
+    return filter;
+  };
+  int get_error() override {
+    return op_ret;
+  };
+  void set_src_attrs(std::map<std::string, ceph::buffer::list> &_src) override {
+    src_attrs = filter_encryption_compression_attrs(_src, true);
+    enc_attrs = filter_encryption_compression_attrs(attrs, false);
+  };
+  bool need_copy_data() override {
+    return rgw_need_copy_data( src_attrs, s );
+  }
+};
+
 void RGWCopyObj::execute(optional_yield y)
 {
   if (init_common() < 0)
@@ -6039,7 +6317,9 @@ void RGWCopyObj::execute(optional_yield y)
     return;
   }
 
-  op_ret = s->src_object->copy_object(s->owner,
+  try {
+    RGWCOE_make_filter_pipeline cb { s, this, attrs, false, s->src_object, obj_size };
+    op_ret = s->src_object->copy_object(s->owner,
           s->user->get_id(),
           &s->info,
           source_zone,
@@ -6064,8 +6344,16 @@ void RGWCopyObj::execute(optional_yield y)
           &s->req_id, /* use req_id as tag */
           &etag,
           copy_obj_progress_cb, (void *)this,
+          &cb,
           this,
           s->yield);
+  } catch (int caught_errno) {
+      std::stringstream os;
+      os << "Caught error " << caught_errno << " during copy object";
+      s->err.message = os.str();
+      op_ret = caught_errno;
+      return;
+  }
 
   int ret = rgw::bucketlogging::log_record(driver, rgw::bucketlogging::LoggingType::Standard, s->src_object.get(), s, "REST.COPY.OBJECT_GET", etag, obj_size, this, y, true, true);
   if (ret < 0) {
index 4b277f15ce8966dc23a5053837804dcfb8f16885..10395bcd39810d7f6afcdb8244611ffb58b2ba6a 100644 (file)
@@ -3795,6 +3795,11 @@ int RGWCopyObj_ObjStore_S3::get_params(optional_yield y)
   auto obj_lock_mode_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_MODE");
   auto obj_lock_date_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_RETAIN_UNTIL_DATE");
   auto obj_legal_hold_str = s->info.env->get("HTTP_X_AMZ_OBJECT_LOCK_LEGAL_HOLD");
+  int ret = get_encryption_defaults(s);
+  if (ret < 0) {
+    ldpp_dout(this, 5) << __func__ << "(): get_encryption_defaults() returned ret=" << ret << dendl;
+    return ret;
+  }
   if (obj_lock_mode_str && obj_lock_date_str) {
     boost::optional<ceph::real_time> date = ceph::from_iso_8601(obj_lock_date_str);
     if (boost::none == date || ceph::real_clock::to_time_t(*date) <= ceph_clock_now()) {
@@ -3906,6 +3911,9 @@ void RGWCopyObj_ObjStore_S3::send_response()
   if (!sent_header)
     send_partial_response(0);
 
+  for (auto &it : crypt_http_responses)
+    dump_header(s, it.first, it.second);
+
   if (op_ret == 0) {
     dump_time_exact_seconds(s, "LastModified", mtime);
     if (!etag.empty()) {
index a1a1132ca7fafcc6ce123ea1b3bbb612c4238041..b026cc2ac0a0a52c27f28aeea67778ca563e0b34 100644 (file)
@@ -344,6 +344,7 @@ public:
 };
 
 class RGWCopyObj_ObjStore_S3 : public RGWCopyObj_ObjStore {
+  std::map<std::string, std::string> crypt_http_responses;
   bool sent_header;
 public:
   RGWCopyObj_ObjStore_S3() : sent_header(false) {}
index b68129509fbef3063e5164a88a1a8b4d183c6655..d1abf17779c60e558625088367e8b9252ff80790 100644 (file)
@@ -263,6 +263,21 @@ struct TopicList {
   std::string next_marker;
 };
 
+/**
+ * @brief Read filter when copying data from object to another.
+ */
+class ObjectFilter {
+public:
+  ObjectFilter() { };
+  virtual ~ObjectFilter() = default;
+  virtual int set_compression_attribute() = 0;
+  virtual DataProcessor & get_filter(DataProcessor& next, optional_yield y) = 0;
+  virtual DataProcessor * get_output(DataProcessor& next, RGWObjectCtx& obj_ctx, const rgw_placement_rule&, optional_yield y) = 0;
+  virtual int get_error() = 0;
+  virtual void set_src_attrs(std::map<std::string, ceph::buffer::list> &src_attrs) = 0;
+  virtual bool need_copy_data() = 0;
+};
+
 /** A list of key-value attributes */
   using Attrs = std::map<std::string, ceph::buffer::list>;
 
@@ -1206,6 +1221,7 @@ class Object {
               boost::optional<ceph::real_time> delete_at,
                std::string* version_id, std::string* tag, std::string* etag,
                void (*progress_cb)(off_t, void *), void* progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider* dpp, optional_yield y) = 0;
 
     /** return logging subsystem */
index a73766b2dd51c920a15f6d083f5ef2c2a52be2f2..0770790a69b941f072ebd4bb4b5ad94fc1c5af81 100644 (file)
@@ -767,6 +767,7 @@ namespace rgw::sal {
       std::string* etag,
       void (*progress_cb)(off_t, void *),
       void* progress_data,
+      rgw::sal::ObjectFilter *read_filter,
       const DoutPrefixProvider* dpp,
       optional_yield y)
   {
index caebc28dabefb8fbff16c3bea9addbaef75383a3..5803390fc5bd224266bab22b60a5d31fa02acd2f 100644 (file)
@@ -554,6 +554,7 @@ protected:
           boost::optional<ceph::real_time> delete_at,
           std::string* version_id, std::string* tag, std::string* etag,
           void (*progress_cb)(off_t, void *), void* progress_data,
+          rgw::sal::ObjectFilter *read_filter,
           const DoutPrefixProvider* dpp, optional_yield y) override;
       virtual RGWAccessControlPolicy& get_acl(void) override { return acls; }
       virtual int set_acl(const RGWAccessControlPolicy& acl) override { acls = acl; return 0; }
index 25844f077485277a5291e57980acb19ed3ebb313..6aa0525d394658ba594c3ff90877623c01950c61 100644 (file)
@@ -1043,6 +1043,7 @@ int FilterObject::copy_object(const ACLOwner& owner,
                              std::string* etag,
                              void (*progress_cb)(off_t, void *),
                              void* progress_data,
+                             rgw::sal::ObjectFilter *read_filter,
                              const DoutPrefixProvider* dpp,
                              optional_yield y)
 {
@@ -1054,7 +1055,8 @@ int FilterObject::copy_object(const ACLOwner& owner,
                           mod_ptr, unmod_ptr, high_precision_time, if_match,
                           if_nomatch, attrs_mod, copy_if_newer, attrs,
                           category, olh_epoch, delete_at, version_id, tag,
-                          etag, progress_cb, progress_data, dpp, y);
+                          etag, progress_cb, progress_data, read_filter,
+                           dpp, y);
 }
 
 RGWAccessControlPolicy& FilterObject::get_acl()
index d58253a7de281c086212b3aa7d575492aca409a3..db9df25844126755f5b47074b94a13027dd79ab4 100644 (file)
@@ -780,6 +780,7 @@ public:
               boost::optional<ceph::real_time> delete_at,
                std::string* version_id, std::string* tag, std::string* etag,
                void (*progress_cb)(off_t, void *), void* progress_data,
+               rgw::sal::ObjectFilter *read_filter,
                const DoutPrefixProvider* dpp, optional_yield y) override;
   virtual RGWAccessControlPolicy& get_acl(void) override;
   virtual int set_acl(const RGWAccessControlPolicy& acl) override { return next->set_acl(acl); }