]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Crypto: Add QAT batch mode
authorFeng Hualong <hualong.feng@intel.com>
Mon, 25 Apr 2022 03:34:49 +0000 (11:34 +0800)
committerFeng Hualong <hualong.feng@intel.com>
Fri, 10 Mar 2023 09:02:42 +0000 (17:02 +0800)
Now, the current code does not allow qat to exert
its acceleration advantage, but leads to poor performance
of qat. The QAT batch mode is implemented here, so that
the QAT performance can be displayed.

When we scale up the number of concurrent requests and worry about
QAT instance will be bottleneck, we can fall back to CPU.

And there add a parameter to set the times of the number of QAT
instance to wait for free instance, which can avoid QAT to be
not busy and make sure QAT full of utilization as much as possible.

max_queue_size is up to max_requests

Add optional_yield in RGWPutObj_BlockEncrypt and
RGWGetObj_BlockDecrypt. Make it with coroutine and non-coroutine mode.

Signed-off-by: Feng Hualong <hualong.feng@intel.com>
19 files changed:
src/crypto/crypto_accel.h
src/crypto/crypto_plugin.h
src/crypto/isa-l/CMakeLists.txt
src/crypto/isa-l/isal_crypto_accel.cc
src/crypto/isa-l/isal_crypto_accel.h
src/crypto/isa-l/isal_crypto_plugin.h
src/crypto/openssl/CMakeLists.txt
src/crypto/openssl/openssl_crypto_accel.cc
src/crypto/openssl/openssl_crypto_accel.h
src/crypto/openssl/openssl_crypto_plugin.h
src/crypto/qat/CMakeLists.txt
src/crypto/qat/qat_crypto_accel.cc
src/crypto/qat/qat_crypto_accel.h
src/crypto/qat/qat_crypto_plugin.h
src/crypto/qat/qcccrypto.cc
src/crypto/qat/qcccrypto.h
src/rgw/rgw_crypt.cc
src/rgw/rgw_crypt.h
src/rgw/rgw_rest_s3.cc

index 5c159360992736c2e0cc76c8764c03bfa4d84009..f2ba61906b4acab5514cb7804444de7a7d552437 100644 (file)
 #include <cstddef>
 #include "include/Context.h"
 
+class optional_yield;
+
 class CryptoAccel;
 typedef std::shared_ptr<CryptoAccel> CryptoAccelRef;
 
 class CryptoAccel {
  public:
   CryptoAccel() {}
+  CryptoAccel(const size_t chunk_size, const size_t max_requests) {}
   virtual ~CryptoAccel() {}
 
   static const int AES_256_IVSIZE = 128/8;
   static const int AES_256_KEYSIZE = 256/8;
   virtual bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) = 0;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) = 0;
   virtual bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) = 0;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) = 0;
+  virtual bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) = 0;
+  virtual bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) = 0;
 };
 #endif
index cf22d5cb42505d97f28d13255bdae917216ef01a..1319659715237a347fb2fcbccb22c646b7128d12 100644 (file)
@@ -20,6 +20,7 @@
 #include "ostream"
 
 #include "crypto/crypto_accel.h"
+#include <boost/asio/io_context.hpp>
 // -----------------------------------------------------------------------------
 
 class CryptoPlugin : public ceph::Plugin {
@@ -31,6 +32,8 @@ public:
   ~CryptoPlugin()
   {}
   virtual int factory(CryptoAccelRef *cs,
-                      std::ostream *ss) = 0;
+                      std::ostream *ss,
+                      const size_t chunk_size,
+                      const size_t max_requests) = 0;
 };
 #endif
index 2a2ec0bc0cb286f897779f9f5de080be46e14306..c8d832247d92e924b239e8aa7ab8611ac2dc540a 100644 (file)
@@ -29,6 +29,9 @@ endif(HAVE_NASM_X64)
 
 add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs})
 target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include)
+
+target_link_libraries(ceph_crypto_isal PRIVATE spawn)
+
 set_target_properties(ceph_crypto_isal PROPERTIES
   VERSION 1.0.0
   SOVERSION 1
index 7dccf64fd09c6a0b3370c50dd9d6201e96fabd92..a22cd2c4fa00fed5523f29d14c7bfcce2ce01664 100644 (file)
 
 bool ISALCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
                              const unsigned char (&iv)[AES_256_IVSIZE],
-                             const unsigned char (&key)[AES_256_KEYSIZE])
+                             const unsigned char (&key)[AES_256_KEYSIZE],
+                             optional_yield y)
 {
-  if ((size % AES_256_IVSIZE) != 0) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
   alignas(16) struct cbc_key_data keys_blk;
@@ -31,9 +32,10 @@ bool ISALCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, s
 }
 bool ISALCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
                              const unsigned char (&iv)[AES_256_IVSIZE],
-                             const unsigned char (&key)[AES_256_KEYSIZE])
+                             const unsigned char (&key)[AES_256_KEYSIZE],
+                             optional_yield y)
 {
-  if ((size % AES_256_IVSIZE) != 0) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
   alignas(16) struct cbc_key_data keys_blk;
index 84331bbddd47a839f24a38a57410192f788debdc..7fffd5122bc97a97178e6e8c8db1fb4e042c401d 100644 (file)
@@ -15,6 +15,7 @@
 #ifndef ISAL_CRYPTO_ACCEL_H
 #define ISAL_CRYPTO_ACCEL_H
 #include "crypto/crypto_accel.h"
+#include "common/async/yield_context.h"
 
 class ISALCryptoAccel : public CryptoAccel {
  public:
@@ -23,9 +24,19 @@ class ISALCryptoAccel : public CryptoAccel {
 
   bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) override;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override;
   bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) override;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override;
+  bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override { return false; }
+  bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override { return false; }
 };
 #endif
index 68e782e69fc0a07ac8fc96c799fcbc465e304ade..50789b777225d7b4fc47b26b8dab19a7d3bfd854 100644 (file)
@@ -31,7 +31,9 @@ public:
   ~ISALCryptoPlugin()
   {}
   virtual int factory(CryptoAccelRef *cs,
-                      std::ostream *ss)
+                      std::ostream *ss,
+                      const size_t chunk_size,
+                      const size_t max_requests)
   {
     if (cryptoaccel == nullptr)
     {
index 6ede1567f218da60ff5d0ab0e8d7692acac27097..5365ab9a6ca2241b30e807e0b6c38934f06d4af8 100644 (file)
@@ -7,7 +7,8 @@ set(openssl_crypto_plugin_srcs
 add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs})
 target_link_libraries(ceph_crypto_openssl
     PRIVATE OpenSSL::Crypto
-    $<$<PLATFORM_ID:Windows>:ceph-common>)
+    $<$<PLATFORM_ID:Windows>:ceph-common>
+    spawn)
 target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR})
 add_dependencies(crypto_plugins ceph_crypto_openssl)
 set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "")
index e6ea0fa7290cd385bf4483d8f34190fdd6d5e4a7..f99844a3848b488311ebb6101b8f32dc30a82f14 100644 (file)
@@ -77,9 +77,10 @@ bool evp_transform(unsigned char* out, const unsigned char* in, size_t size,
                         
 bool OpenSSLCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
                              const unsigned char (&iv)[AES_256_IVSIZE],
-                             const unsigned char (&key)[AES_256_KEYSIZE])
+                             const unsigned char (&key)[AES_256_KEYSIZE],
+                             optional_yield y)
 {
-  if ((size % AES_256_IVSIZE) != 0) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
 
@@ -91,9 +92,10 @@ bool OpenSSLCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in
                              
 bool OpenSSLCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
                              const unsigned char (&iv)[AES_256_IVSIZE],
-                             const unsigned char (&key)[AES_256_KEYSIZE])
+                             const unsigned char (&key)[AES_256_KEYSIZE],
+                             optional_yield y)
 {
-  if ((size % AES_256_IVSIZE) != 0) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
 
index ad90cbeceaf3e8f0edae86fb101478e0717e3283..90edf1ec6ecb76c7cc0d2fbd8af71a8ea51a2030 100644 (file)
@@ -16,6 +16,7 @@
 #define OPENSSL_CRYPTO_ACCEL_H
 
 #include "crypto/crypto_accel.h"
+#include "common/async/yield_context.h"
 
 class OpenSSLCryptoAccel : public CryptoAccel {
  public:
@@ -24,9 +25,19 @@ class OpenSSLCryptoAccel : public CryptoAccel {
 
   bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) override;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override;
   bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
                    const unsigned char (&iv)[AES_256_IVSIZE],
-                   const unsigned char (&key)[AES_256_KEYSIZE]) override;
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override;
+  bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override { return false; }
+  bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+                   const unsigned char iv[][AES_256_IVSIZE],
+                   const unsigned char (&key)[AES_256_KEYSIZE],
+                   optional_yield y) override { return false; }
 };
 #endif
index 408d9ebdaa7b8eb93055bda1cd82c35924b09f01..86faf770146a976b77ff2c87a28c00ca2baa0de8 100644 (file)
@@ -25,7 +25,10 @@ class OpenSSLCryptoPlugin : public CryptoPlugin {
 public:
   explicit OpenSSLCryptoPlugin(CephContext* cct) : CryptoPlugin(cct)
   {}
-  int factory(CryptoAccelRef *cs, std::ostream *ss) override {
+  int factory(CryptoAccelRef *cs,
+              std::ostream *ss,
+              const size_t chunk_size,
+              const size_t max_requests) override {
     if (cryptoaccel == nullptr)
       cryptoaccel = CryptoAccelRef(new OpenSSLCryptoAccel);
 
index fb751967a97a1cae215b7cdb722be72cc16ea912..77791cacf79b386d5864ded4dbecf42a3ffc2180 100644 (file)
@@ -13,7 +13,8 @@ add_dependencies(crypto_plugins ceph_crypto_qat)
 
 target_link_libraries(ceph_crypto_qat PRIVATE
                       QatDrv::qat_s
-                      QatDrv::usdm_drv_s)
+                      QatDrv::usdm_drv_s
+                      spawn)
 
 add_dependencies(crypto_plugins ceph_crypto_qat)
 set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1)
index 23f86edfa7e25410fcb4919a1ef64e10b9c0ae44..6ff601de4ec6bc1faffab174769d388dba284ab1 100644 (file)
 
 #include "crypto/qat/qat_crypto_accel.h"
 
-bool QccCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
-    const unsigned char (&iv)[AES_256_IVSIZE],
-    const unsigned char (&key)[AES_256_KEYSIZE])
-{
-  if ((size % AES_256_IVSIZE) != 0) {
+
+
+
+bool QccCryptoAccel::cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+        const unsigned char iv[][AES_256_IVSIZE],
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
 
-  return qcccrypto.perform_op(out, in, size,
-      const_cast<unsigned char *>(&iv[0]),
-      const_cast<unsigned char *>(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT);
+  return qcccrypto.perform_op_batch(out, in, size,
+      const_cast<unsigned char *>(&iv[0][0]),
+      const_cast<unsigned char *>(&key[0]),
+      CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT, y);
 }
 
-bool QccCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
-    const unsigned char (&iv)[AES_256_IVSIZE],
-    const unsigned char (&key)[AES_256_KEYSIZE])
-{
-  if ((size % AES_256_IVSIZE) != 0) {
+bool QccCryptoAccel::cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+        const unsigned char iv[][AES_256_IVSIZE],
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) {
+  if (unlikely((size % AES_256_IVSIZE) != 0)) {
     return false;
   }
 
-  return qcccrypto.perform_op(out, in, size,
-      const_cast<unsigned char *>(&iv[0]),
-      const_cast<unsigned char *>(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT);
+  return qcccrypto.perform_op_batch(out, in, size,
+      const_cast<unsigned char *>(&iv[0][0]),
+      const_cast<unsigned char *>(&key[0]),
+      CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT, y);
 }
index 5badefc2883451412f25ac93547f1f107574ae24..714575799a9e16a004a619e84f8cdb9f78442e56 100644 (file)
 
 #include "crypto/crypto_accel.h"
 #include "crypto/qat/qcccrypto.h"
+#include "common/async/yield_context.h"
 
 class QccCryptoAccel : public CryptoAccel {
   public:
     QccCrypto qcccrypto;
-    QccCryptoAccel() { qcccrypto.init(); };
+    QccCryptoAccel(const size_t chunk_size, const size_t max_requests):qcccrypto() { qcccrypto.init(chunk_size, max_requests); };
     ~QccCryptoAccel() { qcccrypto.destroy(); };
 
     bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
         const unsigned char (&iv)[AES_256_IVSIZE],
-        const unsigned char (&key)[AES_256_KEYSIZE]) override;
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) override { return false; }
     bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
         const unsigned char (&iv)[AES_256_IVSIZE],
-        const unsigned char (&key)[AES_256_KEYSIZE]) override;
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) override { return false; }
+    bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+        const unsigned char iv[][AES_256_IVSIZE],
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) override;
+    bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+        const unsigned char iv[][AES_256_IVSIZE],
+        const unsigned char (&key)[AES_256_KEYSIZE],
+        optional_yield y) override;
 };
 #endif
index a8d4df7cb8f7e4796cc7fac2be0ba6bf3246cef4..52b8e9578d41756537c5aed14c23bc6b7700de29 100644 (file)
@@ -29,11 +29,11 @@ public:
   {}
   ~QccCryptoPlugin()
   {}
-  virtual int factory(CryptoAccelRef *cs, std::ostream *ss)
+  virtual int factory(CryptoAccelRef *cs, std::ostream *ss, const size_t chunk_size, const size_t max_requests)
   {
     std::lock_guard<std::mutex> l(qat_init);
     if (cryptoaccel == nullptr)
-      cryptoaccel = CryptoAccelRef(new QccCryptoAccel);
+      cryptoaccel = CryptoAccelRef(new QccCryptoAccel(chunk_size, max_requests));
 
     *cs = cryptoaccel;
     return 0;
index a3f2537264348fcb79dec5ff2a0ec48785836b8f..35bb5d3459f775e4036a52f92fd356ee9e412e42 100644 (file)
@@ -2,10 +2,17 @@
 #include  <iostream>
 #include "string.h"
 #include <pthread.h>
+#include <condition_variable>
 #include "common/debug.h"
 #include "include/scope_guard.h"
 #include "common/dout.h"
 #include "common/errno.h"
+#include <atomic>
+#include <utility>
+#include <future>
+#include <chrono>
+
+#include "boost/container/static_vector.hpp"
 
 // -----------------------------------------------------------------------------
 #define dout_context g_ceph_context
@@ -20,51 +27,111 @@ static std::ostream& _prefix(std::ostream* _dout)
 // -----------------------------------------------------------------------------
 
 /*
- * Poller thread & functions
-*/
+ * Callback function
+ */
+static void symDpCallback(CpaCySymDpOpData *pOpData,
+                        CpaStatus status,
+                        CpaBoolean verifyResult)
+{
+  if (nullptr != pOpData->pCallbackTag)
+  {
+    static_cast<QatCrypto*>(pOpData->pCallbackTag)->complete();
+  }
+}
+
 static std::mutex qcc_alloc_mutex;
 static std::mutex qcc_eng_mutex;
 static std::atomic<bool> init_called = { false };
-
-void* QccCrypto::crypt_thread(void *args) {
-  struct qcc_thread_args *thread_args = (struct qcc_thread_args *)args;
-  thread_args->qccinstance->do_crypt(thread_args);
-  return thread_args;
+static std::mutex poll_inst_mutex;
+static std::condition_variable poll_inst_cv;
+
+#define NON_INSTANCE -1
+#define RETRY_MAX_NUM 100
+
+template <typename CompletionToken>
+auto QccCrypto::async_get_instance(CompletionToken&& token) {
+  using boost::asio::async_completion;
+  using Signature = void(int);
+  async_completion<CompletionToken, Signature> init(token);
+
+  auto ex = boost::asio::get_associated_executor(init.completion_handler);
+
+  boost::asio::post(my_pool, [this, ex, handler = std::move(init.completion_handler)]()mutable{
+    auto handler1 = std::move(handler);
+    if (!open_instances.empty()) {
+      int avail_inst = open_instances.front();
+      open_instances.pop_front();
+      boost::asio::post(ex, std::bind(handler1, avail_inst));
+    } else if (!instance_completions.full()) {
+      // keep a few objects to wait QAT instance to make sure qat full utilization as much as possible,
+      // that is, QAT don't need to wait for new objects to ensure
+      // that QAT will not be in a free state as much as possible
+      instance_completions.push_back([this, ex, handler2 = std::move(handler1)](int inst)mutable{
+        boost::asio::post(ex, std::bind(handler2, inst));
+      });
+    } else {
+      boost::asio::post(ex, std::bind(handler1, NON_INSTANCE));
+    }
+  });
+  return init.result.get();
 }
 
 void QccCrypto::QccFreeInstance(int entry) {
-  std::lock_guard<std::mutex> freeinst(qcc_alloc_mutex);
-  open_instances.push(entry);
-}
-
-int QccCrypto::QccGetFreeInstance() {
-  int ret = -1;
-  std::lock_guard<std::mutex> getinst(qcc_alloc_mutex);
-  if (!open_instances.empty()) {
-    ret = open_instances.front();
-    open_instances.pop();
-  }
-  return ret;
+  boost::asio::post(my_pool, [this, entry]()mutable{
+    if (!instance_completions.empty()) {
+      instance_completions.front()(entry);
+      instance_completions.pop_front();
+    } else {
+      open_instances.push_back(entry);
+    }
+  });
 }
 
 void QccCrypto::cleanup() {
   icp_sal_userStop();
   qaeMemDestroy();
   is_init = false;
-  init_stat = stat;
   init_called = false;
   derr << "Failure during QAT init sequence. Quitting" << dendl;
 }
 
+void QccCrypto::poll_instances(void) {
+  CpaStatus stat = CPA_STATUS_SUCCESS;
+  poll_retry_num = RETRY_MAX_NUM;
+  while (!thread_stop) {
+    int free_instance_num = 0;
+    for (int iter = 0; iter < qcc_inst->num_instances; iter++) {
+      if (qcc_inst->is_polled[iter] == CPA_TRUE) {
+        stat = icp_sal_CyPollDpInstance(qcc_inst->cy_inst_handles[iter], 0);
+        if (stat != CPA_STATUS_SUCCESS) {
+          free_instance_num++;
+        }
+      }
+    }
+    if (free_instance_num == qcc_inst->num_instances) {
+      poll_retry_num--;
+    } else {
+      poll_retry_num = RETRY_MAX_NUM;
+    }
+    if (0 == poll_retry_num) {
+      std::unique_lock lock{poll_inst_mutex};
+      poll_inst_cv.wait_for(lock, std::chrono::milliseconds(1), [this](){return poll_retry_num > 0;});
+      poll_retry_num = RETRY_MAX_NUM;
+    }
+  }
+}
+
 /*
  * We initialize QAT instance and everything that is common for all ops
 */
-bool QccCrypto::init()
-{
+bool QccCrypto::init(const size_t chunk_size, const size_t max_requests) {
 
   std::lock_guard<std::mutex> l(qcc_eng_mutex);
+  CpaStatus stat = CPA_STATUS_SUCCESS;
+  this->chunk_size = chunk_size;
+  this->max_requests = max_requests;
 
-  if(init_called) {
+  if (init_called) {
     dout(10) << "Init sequence already called. Skipping duplicate call" << dendl;
     return true;
   }
@@ -76,21 +143,21 @@ bool QccCrypto::init()
   // Find if the usermode memory driver is available. We need to this to
   // create contiguous memory needed by QAT.
   stat = qaeMemInit();
-  if(stat != CPA_STATUS_SUCCESS) {
+  if (stat != CPA_STATUS_SUCCESS) {
     derr << "Unable to load memory driver" << dendl;
     this->cleanup();
     return false;
   }
 
   stat = icp_sal_userStart("CEPH");
-  if(stat != CPA_STATUS_SUCCESS) {
+  if (stat != CPA_STATUS_SUCCESS) {
     derr << "Unable to start qat device" << dendl;
     this->cleanup();
     return false;
   }
 
   qcc_os_mem_alloc((void **)&qcc_inst, sizeof(QCCINST));
-  if(qcc_inst == NULL) {
+  if (qcc_inst == NULL) {
     derr << "Unable to alloc mem for instance struct" << dendl;
     this->cleanup();
     return false;
@@ -121,12 +188,17 @@ bool QccCrypto::init()
     this->cleanup();
     return false;
   }
+  dout(1) << "Get instances num: " << qcc_inst->num_instances << dendl;
+  if (max_requests > qcc_inst->num_instances) {
+    instance_completions.set_capacity(max_requests - qcc_inst->num_instances);
+  }
+  open_instances.set_capacity(qcc_inst->num_instances);
 
   int iter = 0;
   //Start Instances
-  for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+  for (iter = 0; iter < qcc_inst->num_instances; iter++) {
     stat = cpaCyStartInstance(qcc_inst->cy_inst_handles[iter]);
-    if(stat != CPA_STATUS_SUCCESS) {
+    if (stat != CPA_STATUS_SUCCESS) {
       derr << "Unable to start instance" << dendl;
       this->cleanup();
       return false;
@@ -136,7 +208,7 @@ bool QccCrypto::init()
   qcc_os_mem_alloc((void **)&qcc_inst->is_polled,
       ((int)qcc_inst->num_instances * sizeof(CpaBoolean)));
   CpaInstanceInfo2 info;
-  for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+  for (iter = 0; iter < qcc_inst->num_instances; iter++) {
     qcc_inst->is_polled[iter] = cpaCyInstanceGetInfo2(qcc_inst->cy_inst_handles[iter],
         &info) == CPA_STATUS_SUCCESS ? info.isPolled : CPA_FALSE;
   }
@@ -144,7 +216,7 @@ bool QccCrypto::init()
   // Allocate memory structures for all instances
   qcc_os_mem_alloc((void **)&qcc_sess,
       ((int)qcc_inst->num_instances * sizeof(QCCSESS)));
-  if(qcc_sess == NULL) {
+  if (qcc_sess == NULL) {
     derr << "Unable to allocate memory for session struct" << dendl;
     this->cleanup();
     return false;
@@ -152,53 +224,34 @@ bool QccCrypto::init()
 
   qcc_os_mem_alloc((void **)&qcc_op_mem,
       ((int)qcc_inst->num_instances * sizeof(QCCOPMEM)));
-  if(qcc_sess == NULL) {
+  if (qcc_sess == NULL) {
     derr << "Unable to allocate memory for opmem struct" << dendl;
     this->cleanup();
     return false;
   }
 
-  qcc_os_mem_alloc((void **)&cypollthreads,
-      ((int)qcc_inst->num_instances * sizeof(pthread_t)));
-  if(cypollthreads == NULL) {
-    derr << "Unable to allocate memory for pthreads" << dendl;
-    this->cleanup();
-    return false;
-  }
-
   //At this point we are only doing an user-space version.
-  //To-Do: Maybe a kernel based one
-  for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+  for (iter = 0; iter < qcc_inst->num_instances; iter++) {
     stat = cpaCySetAddressTranslation(qcc_inst->cy_inst_handles[iter],
                                        qaeVirtToPhysNUMA);
-    if(stat == CPA_STATUS_SUCCESS) {
-      // Start HW Polling Thread
-      // To-Do: Enable epoll & interrupt based later?
-      // QccCyStartPoll(iter);
-      // Setup the session structures for crypto operation and populate
-      // whatever we can now. Rest will be filled in when crypto operation
-      // happens.
-      qcc_sess[iter].sess_ctx_sz = 0;
-      qcc_sess[iter].sess_ctx = NULL;
-      qcc_sess[iter].sess_stp_data.sessionPriority = CPA_CY_PRIORITY_NORMAL;
-      qcc_sess[iter].sess_stp_data.symOperation = CPA_CY_SYM_OP_CIPHER;
-      open_instances.push(iter);
+    if (stat == CPA_STATUS_SUCCESS) {
+      open_instances.push_back(iter);
       qcc_op_mem[iter].is_mem_alloc = false;
-      qcc_op_mem[iter].op_complete = false;
-      qcc_op_mem[iter].op_result = CPA_STATUS_SUCCESS;
-      qcc_op_mem[iter].sym_op_data = NULL;
-      qcc_op_mem[iter].buff_meta_size = qcc_op_mem[iter].buff_size = 0;
-      qcc_op_mem[iter].src_buff_meta = qcc_op_mem[iter].src_buff
-        = qcc_op_mem[iter].iv_buff = NULL;
-      qcc_op_mem[iter].src_buff_list = NULL;
-      qcc_op_mem[iter].src_buff_flat = NULL;
-      qcc_op_mem[iter].num_buffers = 1;
+
+      stat = cpaCySymDpRegCbFunc(qcc_inst->cy_inst_handles[iter], symDpCallback);
+      if (stat != CPA_STATUS_SUCCESS) {
+        dout(1) << "Unable to register callback function for instance " << iter << " with status = " << stat << dendl;
+        return false;
+      }
     } else {
-      derr << "Unable to find address translations of instance " << iter << dendl;
+      dout(1) << "Unable to find address translations of instance " << iter << dendl;
       this->cleanup();
       return false;
     }
   }
+
+  qat_poll_thread = make_named_thread("qat_poll", &QccCrypto::poll_instances, this);
+
   is_init = true;
   dout(10) << "Init complete" << dendl;
   return true;
@@ -210,37 +263,32 @@ bool QccCrypto::destroy() {
     return false;
   }
 
-  unsigned int retry = 0;
-  while(retry <= QCC_MAX_RETRIES) {
-    if(open_instances.size() == qcc_inst->num_instances) {
-      break;
-    } else {
-      retry++;
-    }
-    dout(5) << "QAT is still busy and cannot free resources yet" << dendl;
-    return false;
+  thread_stop = true;
+  if (qat_poll_thread.joinable()) {
+    qat_poll_thread.join();
   }
+  my_pool.join();
 
   dout(10) << "Destroying QAT crypto & related memory" << dendl;
   int iter = 0;
 
   // Free up op related memory
   for (iter =0; iter < qcc_inst->num_instances; iter++) {
-    qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff));
-    qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff));
-    qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_list));
-    qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_flat));
-    qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data));
+    for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) {
+      qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff[i]));
+      qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff[i]));
+      qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data[i]));
+    }
   }
 
   // Free up Session memory
-  for(iter = 0; iter < qcc_inst->num_instances; iter++) {
-    cpaCySymRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx);
+  for (iter = 0; iter < qcc_inst->num_instances; iter++) {
+    cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx);
     qcc_contig_mem_free((void **)&(qcc_sess[iter].sess_ctx));
   }
 
   // Stop QAT Instances
-  for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+  for (iter = 0; iter < qcc_inst->num_instances; iter++) {
     cpaCyStopInstance(qcc_inst->cy_inst_handles[iter]);
   }
 
@@ -249,7 +297,6 @@ bool QccCrypto::destroy() {
   qcc_os_mem_free((void **)&qcc_sess);
   qcc_os_mem_free((void **)&(qcc_inst->cy_inst_handles));
   qcc_os_mem_free((void **)&(qcc_inst->is_polled));
-  qcc_os_mem_free((void **)&cypollthreads);
   qcc_os_mem_free((void **)&qcc_inst);
 
   //Un-init memory driver and QAT HW
@@ -260,57 +307,43 @@ bool QccCrypto::destroy() {
   return true;
 }
 
-void QccCrypto::do_crypt(qcc_thread_args *thread_args) {
-  auto entry = thread_args->entry;
-  qcc_op_mem[entry].op_result = cpaCySymPerformOp(qcc_inst->cy_inst_handles[entry],
-      NULL,
-      qcc_op_mem[entry].sym_op_data,
-      qcc_op_mem[entry].src_buff_list,
-      qcc_op_mem[entry].src_buff_list,
-      NULL);
-  qcc_op_mem[entry].op_complete = true;
-  free(thread_args);
-}
-
-bool QccCrypto::perform_op(unsigned char* out, const unsigned char* in,
-    size_t size, uint8_t *iv, uint8_t *key, CpaCySymCipherDirection op_type)
+bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, size_t size,
+    Cpa8U *iv,
+    Cpa8U *key,
+    CpaCySymCipherDirection op_type,
+    optional_yield y)
 {
   if (!init_called) {
     dout(10) << "QAT not intialized yet. Initializing now..." << dendl;
-    if(!QccCrypto::init()) {
+    if (!QccCrypto::init(chunk_size, max_requests)) {
       derr << "QAT init failed" << dendl;
       return false;
     }
   }
 
-  if(!is_init)
+  if (!is_init)
   {
-    dout(10) << "QAT not initialized in this instance or init failed with possible error " << (int)init_stat << dendl;
+    dout(10) << "QAT not initialized in this instance or init failed" << dendl;
     return is_init;
   }
-
-  int avail_inst = -1;
-  unsigned int retrycount = 0;
-  while(retrycount <= QCC_MAX_RETRIES) {
-    avail_inst = QccGetFreeInstance();
-    if(avail_inst != -1) {
-      break;
-    } else {
-      retrycount++;
-      usleep(qcc_sleep_duration);
-    }
+  CpaStatus status = CPA_STATUS_SUCCESS;
+  int avail_inst = NON_INSTANCE;
+
+  if (y) {
+    yield_context yield = y.get_yield_context();
+    avail_inst = async_get_instance(yield);
+  } else {
+    auto result = async_get_instance(boost::asio::use_future);
+    avail_inst = result.get();
   }
 
-  if(avail_inst == -1) {
-    derr << "Unable to get an QAT instance. Failing request" << dendl;
+  if (avail_inst == NON_INSTANCE) {
     return false;
   }
 
-  dout(15) << "Using inst " << avail_inst << dendl;
-  // Start polling threads for this instance
-  //QccCyStartPoll(avail_inst);
+  dout(15) << "Using dp_batch inst " << avail_inst << dendl;
 
-  auto sg = make_scope_guard([=] {
+  auto sg = make_scope_guard([this, avail_inst] {
       //free up the instance irrespective of the op status
       dout(15) << "Completed task under " << avail_inst << dendl;
       qcc_op_mem[avail_inst].op_complete = false;
@@ -322,150 +355,221 @@ bool QccCrypto::perform_op(unsigned char* out, const unsigned char* in,
    * Hold onto to most of them until destructor is called.
   */
   if (qcc_op_mem[avail_inst].is_mem_alloc == false) {
-
-    qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherAlgorithm =
-                                                     CPA_CY_SYM_CIPHER_AES_CBC;
-    qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherKeyLenInBytes =
-                                                              AES_256_KEY_SIZE;
-
-    // Allocate contig memory for buffers that are independent of the
-    // input/output
-    stat = cpaCyBufferListGetMetaSize(qcc_inst->cy_inst_handles[avail_inst],
-        qcc_op_mem[avail_inst].num_buffers, &(qcc_op_mem[avail_inst].buff_meta_size));
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to get buff meta size" << dendl;
-      return false;
-    }
-
-    // Allocate Buffer List Private metadata
-    stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_meta),
-        qcc_op_mem[avail_inst].buff_meta_size, 1);
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to allocate private metadata memory" << dendl;
-      return false;
-    }
-
-    // Allocate Buffer List Memory
-    qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_list), sizeof(CpaBufferList));
-    qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_flat),
-              (qcc_op_mem[avail_inst].num_buffers * sizeof(CpaFlatBuffer)));
-    if(qcc_op_mem[avail_inst].src_buff_list == NULL || qcc_op_mem[avail_inst].src_buff_flat == NULL) {
-      derr << "Unable to allocate bufferlist memory" << dendl;
-      return false;
-    }
-
-    // Allocate IV memory
-    stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff), AES_256_IV_LEN);
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to allocate bufferlist memory" << dendl;
-      return false;
-    }
-
-    //Assign src stuff for the operation
-    (qcc_op_mem[avail_inst].src_buff_list)->pBuffers = qcc_op_mem[avail_inst].src_buff_flat;
-    (qcc_op_mem[avail_inst].src_buff_list)->numBuffers = qcc_op_mem[avail_inst].num_buffers;
-    (qcc_op_mem[avail_inst].src_buff_list)->pPrivateMetaData = qcc_op_mem[avail_inst].src_buff_meta;
-
-    //Setup OpData
-    stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data),
-        sizeof(CpaCySymOpData));
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to allocate opdata memory" << dendl;
-      return false;
-    }
-
-    // Assuming op to be encryption for initiation. This will be reset when we
-    // exit this block
-    qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection =
-                                            CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT;
-    // Allocate Session memory
-    stat = cpaCySymSessionCtxGetSize(qcc_inst->cy_inst_handles[avail_inst],
-        &(qcc_sess[avail_inst].sess_stp_data), &(qcc_sess[avail_inst].sess_ctx_sz));
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to find session size" << dendl;
-      return false;
-    }
-
-    stat = qcc_contig_mem_alloc((void **)&(qcc_sess[avail_inst].sess_ctx),
-        qcc_sess[avail_inst].sess_ctx_sz);
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to allocate contig memory" << dendl;
-      return false;
+    for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) {
+      // Allocate IV memory
+      status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff[i]), AES_256_IV_LEN, 8);
+      if (status != CPA_STATUS_SUCCESS) {
+        derr << "Unable to allocate iv_buff memory" << dendl;
+        return false;
+      }
+
+      // Allocate src memory
+      status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff[i]), chunk_size, 8);
+      if (status != CPA_STATUS_SUCCESS) {
+        derr << "Unable to allocate src_buff memory" << dendl;
+        return false;
+      }
+
+      //Setup OpData
+      status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data[i]),
+          sizeof(CpaCySymDpOpData), 8);
+      if (status != CPA_STATUS_SUCCESS) {
+        derr << "Unable to allocate opdata memory" << dendl;
+        return false;
+      }
     }
 
     // Set memalloc flag so that we don't go through this exercise again.
     qcc_op_mem[avail_inst].is_mem_alloc = true;
-    dout(15) << "Instantiation complete for " << avail_inst << dendl;
+    qcc_sess[avail_inst].sess_ctx = nullptr;
+    status = initSession(qcc_inst->cy_inst_handles[avail_inst],
+                             &(qcc_sess[avail_inst].sess_ctx),
+                             (Cpa8U *)key,
+                             op_type);
+  } else {
+    do {
+      cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[avail_inst], qcc_sess[avail_inst].sess_ctx);
+      status = initSession(qcc_inst->cy_inst_handles[avail_inst],
+                           &(qcc_sess[avail_inst].sess_ctx),
+                           (Cpa8U *)key,
+                           op_type);
+      if (unlikely(status == CPA_STATUS_RETRY)) {
+        dout(1) << "cpaCySymDpRemoveSession and initSession retry" << dendl;
+      }
+    } while (status == CPA_STATUS_RETRY);
   }
-
-  // Section that runs on every call
-  // Identify the operation and assign to session
-  qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection = op_type;
-  qcc_sess[avail_inst].sess_stp_data.cipherSetupData.pCipherKey = (Cpa8U *)key;
-
-  stat = cpaCySymInitSession(qcc_inst->cy_inst_handles[avail_inst],
-      NULL,
-      &(qcc_sess[avail_inst].sess_stp_data),
-      qcc_sess[avail_inst].sess_ctx);
-  if (stat != CPA_STATUS_SUCCESS) {
-    derr << "Unable to init session" << dendl;
+  if (unlikely(status != CPA_STATUS_SUCCESS)) {
+    derr << "Unable to init session with status =" << status << dendl;
     return false;
   }
 
-  // Allocate actual buffers that will hold data
-  if (qcc_op_mem[avail_inst].buff_size != (Cpa32U)size) {
-    qcc_contig_mem_free((void **)&(qcc_op_mem[avail_inst].src_buff));
-    qcc_op_mem[avail_inst].buff_size = (Cpa32U)size;
-    stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff),
-              qcc_op_mem[avail_inst].buff_size);
-    if(stat != CPA_STATUS_SUCCESS) {
-      derr << "Unable to allocate contig memory" << dendl;
-      return false;
-    }
-  }
-
-  // Copy src & iv into the respective buffers
-  memcpy(qcc_op_mem[avail_inst].src_buff, in, size);
-  memcpy(qcc_op_mem[avail_inst].iv_buff, iv, AES_256_IV_LEN);
-
-  //Assign the reminder of the stuff
-  qcc_op_mem[avail_inst].src_buff_flat->dataLenInBytes = qcc_op_mem[avail_inst].buff_size;
-  qcc_op_mem[avail_inst].src_buff_flat->pData = qcc_op_mem[avail_inst].src_buff;
+  return symPerformOp(avail_inst,
+                      qcc_sess[avail_inst].sess_ctx,
+                      in,
+                      out,
+                      size,
+                      reinterpret_cast<Cpa8U*>(iv),
+                      AES_256_IV_LEN, y);
+}
 
-  //OpData assignment
-  qcc_op_mem[avail_inst].sym_op_data->sessionCtx = qcc_sess[avail_inst].sess_ctx;
-  qcc_op_mem[avail_inst].sym_op_data->packetType = CPA_CY_SYM_PACKET_TYPE_FULL;
-  qcc_op_mem[avail_inst].sym_op_data->pIv = qcc_op_mem[avail_inst].iv_buff;
-  qcc_op_mem[avail_inst].sym_op_data->ivLenInBytes = AES_256_IV_LEN;
-  qcc_op_mem[avail_inst].sym_op_data->cryptoStartSrcOffsetInBytes = 0;
-  qcc_op_mem[avail_inst].sym_op_data->messageLenToCipherInBytes = qcc_op_mem[avail_inst].buff_size;
+/*
+ * Perform session update
+ */
+CpaStatus QccCrypto::updateSession(CpaCySymSessionCtx sessionCtx,
+                               Cpa8U *pCipherKey,
+                               CpaCySymCipherDirection cipherDirection) {
+  CpaStatus status = CPA_STATUS_SUCCESS;
+  CpaCySymSessionUpdateData sessionUpdateData = {0};
+
+  sessionUpdateData.flags = CPA_CY_SYM_SESUPD_CIPHER_KEY;
+  sessionUpdateData.flags |= CPA_CY_SYM_SESUPD_CIPHER_DIR;
+  sessionUpdateData.pCipherKey = pCipherKey;
+  sessionUpdateData.cipherDirection = cipherDirection;
+
+  status = cpaCySymUpdateSession(sessionCtx, &sessionUpdateData);
+
+  if (unlikely(status != CPA_STATUS_SUCCESS)) {
+    dout(10) << "cpaCySymUpdateSession failed with status = " << status << dendl;
+  }
 
-  // Perform cipher operation in a thread
-  qcc_thread_args* thread_args = new qcc_thread_args();
-  thread_args->qccinstance = this;
-  thread_args->entry = avail_inst;
+  return status;
+}
 
-  if (pthread_create(&cypollthreads[avail_inst], NULL, crypt_thread, (void *)thread_args) != 0) {
-    derr << "Unable to create thread for crypt operation" << dendl;
-    return false;
+CpaStatus QccCrypto::initSession(CpaInstanceHandle cyInstHandle,
+                             CpaCySymSessionCtx *sessionCtx,
+                             Cpa8U *pCipherKey,
+                             CpaCySymCipherDirection cipherDirection) {
+  CpaStatus status = CPA_STATUS_SUCCESS;
+  Cpa32U sessionCtxSize = 0;
+  CpaCySymSessionSetupData sessionSetupData;
+  memset(&sessionSetupData, 0, sizeof(sessionSetupData));
+
+  sessionSetupData.sessionPriority = CPA_CY_PRIORITY_NORMAL;
+  sessionSetupData.symOperation = CPA_CY_SYM_OP_CIPHER;
+  sessionSetupData.cipherSetupData.cipherAlgorithm = CPA_CY_SYM_CIPHER_AES_CBC;
+  sessionSetupData.cipherSetupData.cipherKeyLenInBytes = AES_256_KEY_SIZE;
+  sessionSetupData.cipherSetupData.pCipherKey = pCipherKey;
+  sessionSetupData.cipherSetupData.cipherDirection = cipherDirection;
+
+  if (nullptr == *sessionCtx) {
+    status = cpaCySymDpSessionCtxGetSize(cyInstHandle, &sessionSetupData, &sessionCtxSize);
+    if (likely(CPA_STATUS_SUCCESS == status)) {
+      status = qcc_contig_mem_alloc((void **)(sessionCtx), sessionCtxSize);
+    } else {
+      dout(1) << "cpaCySymDpSessionCtxGetSize failed with status = " << status << dendl;
+    }
   }
-  if (qcc_inst->is_polled[avail_inst] == CPA_TRUE) {
-    while (!qcc_op_mem[avail_inst].op_complete) {
-      icp_sal_CyPollInstance(qcc_inst->cy_inst_handles[avail_inst], 0);
+  if (likely(CPA_STATUS_SUCCESS == status)) {
+    status = cpaCySymDpInitSession(cyInstHandle,
+                                   &sessionSetupData,
+                                   *sessionCtx);
+    if (unlikely(status != CPA_STATUS_SUCCESS)) {
+      dout(1) << "cpaCySymDpInitSession failed with status = " << status << dendl;
     }
+  } else {
+    dout(1) << "Session alloc failed with status = " << status << dendl;
   }
-  pthread_join(cypollthreads[avail_inst], NULL);
+  return status;
+}
 
-  if(qcc_op_mem[avail_inst].op_result != CPA_STATUS_SUCCESS) {
-    derr << "Unable to perform crypt operation" << dendl;
-    return false;
+template <typename CompletionToken>
+auto QatCrypto::async_perform_op(int avail_inst, std::span<CpaCySymDpOpData*> pOpDataVec, CompletionToken&& token) {
+  CpaStatus status = CPA_STATUS_SUCCESS;
+  using boost::asio::async_completion;
+  using Signature = void(CpaStatus);
+  async_completion<CompletionToken, Signature> init(token);
+  auto ex = boost::asio::get_associated_executor(init.completion_handler);
+  completion_handler = [this, ex, handler = init.completion_handler](CpaStatus stat) {
+    boost::asio::post(ex, std::bind(handler, stat));
+  };
+
+  count = pOpDataVec.size();
+  poll_inst_cv.notify_one();
+  status = cpaCySymDpEnqueueOpBatch(pOpDataVec.size(), pOpDataVec.data(), CPA_TRUE);
+
+  if (status != CPA_STATUS_SUCCESS) {
+    completion_handler(status);
   }
+  return init.result.get();
+}
 
-  //Copy data back to out buffer
-  memcpy(out, qcc_op_mem[avail_inst].src_buff, size);
-  //Always cleanup memory holding user-data at the end
-  memset(qcc_op_mem[avail_inst].iv_buff, 0, AES_256_IV_LEN);
-  memset(qcc_op_mem[avail_inst].src_buff, 0, qcc_op_mem[avail_inst].buff_size);
+bool QccCrypto::symPerformOp(int avail_inst,
+                              CpaCySymSessionCtx sessionCtx,
+                              const Cpa8U *pSrc,
+                              Cpa8U *pDst,
+                              Cpa32U size,
+                              Cpa8U *pIv,
+                              Cpa32U ivLen,
+                              optional_yield y) {
+  CpaStatus status = CPA_STATUS_SUCCESS;
+  Cpa32U one_batch_size = chunk_size * MAX_NUM_SYM_REQ_BATCH;
+  Cpa32U iv_index = 0;
+  size_t perform_retry_num = 0;
+  for (Cpa32U off = 0; off < size; off += one_batch_size) {
+    QatCrypto helper;
+    boost::container::static_vector<CpaCySymDpOpData*, MAX_NUM_SYM_REQ_BATCH> pOpDataVec;
+    for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) {
+      CpaCySymDpOpData *pOpData = qcc_op_mem[avail_inst].sym_op_data[i];
+      Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i];
+      Cpa8U *pIvBuffer = qcc_op_mem[avail_inst].iv_buff[i];
+      Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset;
+      // copy source into buffer
+      memcpy(pSrcBuffer, pSrc + offset, process_size);
+      // copy IV into buffer
+      memcpy(pIvBuffer, &pIv[iv_index * ivLen], ivLen);
+      iv_index++;
+
+      //pOpData assignment
+      pOpData->thisPhys = qaeVirtToPhysNUMA(pOpData);
+      pOpData->instanceHandle = qcc_inst->cy_inst_handles[avail_inst];
+      pOpData->sessionCtx = sessionCtx;
+      pOpData->pCallbackTag = &helper;
+      pOpData->cryptoStartSrcOffsetInBytes = 0;
+      pOpData->messageLenToCipherInBytes = process_size;
+      pOpData->iv = qaeVirtToPhysNUMA(pIvBuffer);
+      pOpData->pIv = pIvBuffer;
+      pOpData->ivLenInBytes = ivLen;
+      pOpData->srcBuffer = qaeVirtToPhysNUMA(pSrcBuffer);
+      pOpData->srcBufferLen = process_size;
+      pOpData->dstBuffer = qaeVirtToPhysNUMA(pSrcBuffer);
+      pOpData->dstBufferLen = process_size;
+
+      pOpDataVec.push_back(pOpData);
+    }
 
-  return true;
+    do {
+      poll_retry_num = RETRY_MAX_NUM;
+      if (y) {
+        yield_context yield = y.get_yield_context();
+        status = helper.async_perform_op(avail_inst, std::span<CpaCySymDpOpData*>(pOpDataVec), yield);
+      } else {
+        auto result = helper.async_perform_op(avail_inst, std::span<CpaCySymDpOpData*>(pOpDataVec), boost::asio::use_future);
+        status = result.get();
+      }
+      if (status == CPA_STATUS_RETRY) {
+        if (++perform_retry_num > 3) {
+          cpaCySymDpPerformOpNow(qcc_inst->cy_inst_handles[avail_inst]);
+          return false;
+        }
+      }
+    } while (status == CPA_STATUS_RETRY);
+
+    if (likely(CPA_STATUS_SUCCESS == status)) {
+      for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) {
+        Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i];
+        Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset;
+        memcpy(pDst + offset, pSrcBuffer, process_size);
+      }
+    } else {
+      dout(1) << "async_perform_op failed with status = " << status << dendl;
+      break;
+    }
+  }
+
+  Cpa32U max_used_buffer_num = iv_index > MAX_NUM_SYM_REQ_BATCH ? MAX_NUM_SYM_REQ_BATCH : iv_index;
+  for (Cpa32U i = 0; i < max_used_buffer_num; i++) {
+    memset(qcc_op_mem[avail_inst].src_buff[i], 0, chunk_size);
+    memset(qcc_op_mem[avail_inst].iv_buff[i], 0, ivLen);
+  }
+  return (CPA_STATUS_SUCCESS == status);
 }
index a36b0898b73305103964158c4b23f60f06a69216..04cd4d9cafa3ac4c62253ab698b37a7524d6c275 100644 (file)
@@ -6,9 +6,22 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <pthread.h>
+#include <thread>
+#include <mutex>
 #include <queue>
+#include <memory>
+#include "common/async/yield_context.h"
+#include <memory>
+#include "common/ceph_mutex.h"
+#include <vector>
+#include <functional>
+#include <span>
+#include "boost/circular_buffer.hpp"
+#include "boost/asio/thread_pool.hpp"
 extern "C" {
 #include "cpa.h"
+#include "cpa_cy_sym_dp.h"
+#include "cpa_cy_im.h"
 #include "lac/cpa_cy_sym.h"
 #include "lac/cpa_cy_im.h"
 #include "qae_mem.h"
@@ -18,27 +31,37 @@ extern "C" {
 }
 
 class QccCrypto {
+    friend class QatCrypto;
+    size_t chunk_size{0};
+    size_t max_requests{0};
+
+    boost::asio::thread_pool my_pool{1};
+
+    boost::circular_buffer<std::function<void(int)>> instance_completions;
+
+    template <typename CompletionToken>
+    auto async_get_instance(CompletionToken&& token);
 
   public:
     CpaCySymCipherDirection qcc_op_type;
 
-    QccCrypto() {};
-    ~QccCrypto() {};
+    QccCrypto()  {};
+    ~QccCrypto() { destroy(); };
 
-    bool init();
+    bool init(const size_t chunk_size, const size_t max_requests);
     bool destroy();
-    bool perform_op(unsigned char* out, const unsigned char* in, size_t size,
-        uint8_t *iv,
-        uint8_t *key,
-        CpaCySymCipherDirection op_type);
+    bool perform_op_batch(unsigned char* out, const unsigned char* in, size_t size,
+                          Cpa8U *iv,
+                          Cpa8U *key,
+                          CpaCySymCipherDirection op_type,
+                          optional_yield y);
 
   private:
-
     // Currently only supporting AES_256_CBC.
     // To-Do: Needs to be expanded
     static const size_t AES_256_IV_LEN = 16;
     static const size_t AES_256_KEY_SIZE = 32;
-    static const size_t QCC_MAX_RETRIES = 5000;
+    static const size_t MAX_NUM_SYM_REQ_BATCH = 32;
 
     /*
      * Struct to hold an instance of QAT to handle the crypto operations. These
@@ -62,7 +85,6 @@ class QccCrypto {
      * single crypto or multi-buffer crypto.
      */
     struct QCCSESS {
-      CpaCySymSessionSetupData sess_stp_data;
       Cpa32U sess_ctx_sz;
       CpaCySymSessionCtx sess_ctx;
     } *qcc_sess;
@@ -76,39 +98,18 @@ class QccCrypto {
       // Op common  items
       bool is_mem_alloc;
       bool op_complete;
-      CpaStatus op_result;
-      CpaCySymOpData *sym_op_data;
-      Cpa32U buff_meta_size;
-      Cpa32U num_buffers;
-      Cpa32U buff_size;
-
-      //Src data items
-      Cpa8U *src_buff_meta;
-      CpaBufferList *src_buff_list;
-      CpaFlatBuffer *src_buff_flat;
-      Cpa8U *src_buff;
-      Cpa8U *iv_buff;
+      CpaCySymDpOpData *sym_op_data[MAX_NUM_SYM_REQ_BATCH];
+      Cpa8U *src_buff[MAX_NUM_SYM_REQ_BATCH];
+      Cpa8U *iv_buff[MAX_NUM_SYM_REQ_BATCH];
     } *qcc_op_mem;
 
-    //QAT HW polling thread input structure
-    struct qcc_thread_args {
-      QccCrypto* qccinstance;
-      int entry;
-    };
-
-
-    /*
-     * Function to handle the crypt operation. Will run while the main thread
-     * runs the polling function on the instance doing the op
-     */
-    void do_crypt(qcc_thread_args *thread_args);
-
     /*
      * Handle queue with free instances to handle op
      */
-    std::queue<int> open_instances;
-    int QccGetFreeInstance();
+    boost::circular_buffer<int> open_instances;
     void QccFreeInstance(int entry);
+    std::thread qat_poll_thread;
+    bool thread_stop{false};
 
     /*
      * Contiguous Memory Allocator and de-allocator. We are using the usdm
@@ -153,7 +154,6 @@ class QccCrypto {
     }
 
     std::atomic<bool> is_init = { false };
-    CpaStatus init_stat, stat;
 
     /*
      * Function to cleanup memory if constructor fails
@@ -166,11 +166,49 @@ class QccCrypto {
      * associated callbacks. For synchronous operation (like this one), QAT
      * library creates an internal callback for the operation.
      */
-    static void* crypt_thread(void* entry);
-    CpaStatus QccCyStartPoll(int entry);
-    void poll_instance(int entry);
+    void poll_instances(void);
+    std::atomic<size_t> poll_retry_num{0};
+
+    bool symPerformOp(int avail_inst,
+                      CpaCySymSessionCtx sessionCtx,
+                      const Cpa8U *pSrc,
+                      Cpa8U *pDst,
+                      Cpa32U size,
+                      Cpa8U *pIv,
+                      Cpa32U ivLen,
+                      optional_yield y);
+
+    CpaStatus initSession(CpaInstanceHandle cyInstHandle,
+                          CpaCySymSessionCtx *sessionCtx,
+                          Cpa8U *pCipherKey,
+                          CpaCySymCipherDirection cipherDirection);
+
+    CpaStatus updateSession(CpaCySymSessionCtx sessionCtx,
+                            Cpa8U *pCipherKey,
+                            CpaCySymCipherDirection cipherDirection);
+
+
+};
+
+class QatCrypto {
+ private:
+  std::function<void(CpaStatus stat)> completion_handler;
+  std::atomic<std::size_t> count;
+ public:
+  void complete() {
+    if (--count == 0) {
+      completion_handler(CPA_STATUS_SUCCESS);
+    }
+    return ;
+  }
+
+  QatCrypto () : count(0) {}
+  QatCrypto (const QatCrypto &qat) = delete;
+  QatCrypto (QatCrypto &&qat) = delete;
+  void operator=(const QatCrypto &qat) = delete;
+  void operator=(QatCrypto &&qat) = delete;
 
-    pthread_t *cypollthreads;
-    static const size_t qcc_sleep_duration = 2;
+  template <typename CompletionToken>
+  auto async_perform_op(int avail_inst, std::span<CpaCySymDpOpData*> pOpDataVec, CompletionToken&& token);
 };
 #endif //QCCCRYPTO_H
index e4fd0616855755c75cb1f26646c136e65bee5728..f101bc24c46a775dc56646a7ad693e1ce4b9ca85 100644 (file)
@@ -288,7 +288,7 @@ mec_option::empty };
 }
 
 
-CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct)
+CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct, const size_t chunk_size, const size_t max_requests)
 {
   CryptoAccelRef ca_impl = nullptr;
   stringstream ss;
@@ -300,7 +300,7 @@ CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct)
     ldpp_dout(dpp, -1) << __func__ << " cannot load crypto accelerator of type " << crypto_accel_type << dendl;
     return nullptr;
   }
-  int err = factory->factory(&ca_impl, &ss);
+  int err = factory->factory(&ca_impl, &ss, chunk_size, max_requests);
   if (err) {
     ldpp_dout(dpp, -1) << __func__ << " factory return error " << err <<
         " with description: " << ss.str() << dendl;
@@ -404,6 +404,7 @@ public:
   static const size_t AES_256_KEYSIZE = 256 / 8;
   static const size_t AES_256_IVSIZE = 128 / 8;
   static const size_t CHUNK_SIZE = 4096;
+  static const size_t QAT_MIN_SIZE = 65536;
   const DoutPrefixProvider* dpp;
 private:
   static const uint8_t IV[AES_256_IVSIZE];
@@ -442,33 +443,55 @@ public:
                      size_t size,
                      off_t stream_offset,
                      const unsigned char (&key)[AES_256_KEYSIZE],
-                     bool encrypt)
+                     bool encrypt,
+                     optional_yield y)
   {
     static std::atomic<bool> failed_to_get_crypto(false);
     CryptoAccelRef crypto_accel;
     if (! failed_to_get_crypto.load())
     {
-      crypto_accel = get_crypto_accel(this->dpp, cct);
+      static size_t max_requests = g_ceph_context->_conf->rgw_thread_pool_size;
+      crypto_accel = get_crypto_accel(this->dpp, cct, CHUNK_SIZE, max_requests);
       if (!crypto_accel)
         failed_to_get_crypto = true;
     }
-    bool result = true;
-    unsigned char iv[AES_256_IVSIZE];
-    for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) {
-      size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset;
-      prepare_iv(iv, stream_offset + offset);
-      if (crypto_accel != nullptr) {
-        if (encrypt) {
-          result = crypto_accel->cbc_encrypt(out + offset, in + offset,
-                                             process_size, iv, key);
+    bool result = false;
+    static std::string accelerator = cct->_conf->plugin_crypto_accelerator;
+    if (accelerator == "crypto_qat" && crypto_accel != nullptr && size >= QAT_MIN_SIZE) {
+      // now, batch mode is only for QAT plugin
+      size_t iv_num = size / CHUNK_SIZE;
+      if (size % CHUNK_SIZE) ++iv_num;
+      auto iv = new unsigned char[iv_num][AES_256_IVSIZE];
+      for (size_t offset = 0, i = 0; offset < size; offset += CHUNK_SIZE, i++) {
+        prepare_iv(iv[i], stream_offset + offset);
+      }
+      if (encrypt) {
+        result = crypto_accel->cbc_encrypt_batch(out, in, size, iv, key, y);
+      } else {
+        result = crypto_accel->cbc_decrypt_batch(out, in, size, iv, key, y);
+      }
+      delete[] iv;
+    }
+    if (result == false) {
+      // If QAT don't have free instance, we can fall back to this
+      result = true;
+      unsigned char iv[AES_256_IVSIZE];
+      for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) {
+        size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset;
+        prepare_iv(iv, stream_offset + offset);
+        if (crypto_accel != nullptr && accelerator != "crypto_qat") {
+          if (encrypt) {
+            result = crypto_accel->cbc_encrypt(out + offset, in + offset,
+                                              process_size, iv, key, y);
+          } else {
+            result = crypto_accel->cbc_decrypt(out + offset, in + offset,
+                                              process_size, iv, key, y);
+          }
         } else {
-          result = crypto_accel->cbc_decrypt(out + offset, in + offset,
-                                             process_size, iv, key);
+          result = cbc_transform(
+              out + offset, in + offset, process_size,
+              iv, key, encrypt);
         }
-      } else {
-        result = cbc_transform(
-            out + offset, in + offset, process_size,
-            iv, key, encrypt);
       }
     }
     return result;
@@ -479,7 +502,8 @@ public:
                off_t in_ofs,
                size_t size,
                bufferlist& output,
-               off_t stream_offset)
+               off_t stream_offset,
+               optional_yield y)
   {
     bool result = false;
     size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE;
@@ -493,7 +517,7 @@ public:
     result = cbc_transform(buf_raw,
                            input_raw + in_ofs,
                            aligned_size,
-                           stream_offset, key, true);
+                           stream_offset, key, true, y);
     if (result && (unaligned_rest_size > 0)) {
       /* remainder to encrypt */
       if (aligned_size % CHUNK_SIZE > 0) {
@@ -534,7 +558,8 @@ public:
                off_t in_ofs,
                size_t size,
                bufferlist& output,
-               off_t stream_offset)
+               off_t stream_offset,
+               optional_yield y)
   {
     bool result = false;
     size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE;
@@ -548,7 +573,7 @@ public:
     result = cbc_transform(buf_raw,
                            input_raw + in_ofs,
                            aligned_size,
-                           stream_offset, key, false);
+                           stream_offset, key, false, y);
     if (result && unaligned_rest_size > 0) {
       /* remainder to decrypt */
       if (aligned_size % CHUNK_SIZE > 0) {
@@ -635,7 +660,8 @@ bool AES_256_ECB_encrypt(const DoutPrefixProvider* dpp,
 RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp,
                                                CephContext* cct,
                                                RGWGetObj_Filter* next,
-                                               std::unique_ptr<BlockCrypt> crypt)
+                                               std::unique_ptr<BlockCrypt> crypt,
+                                               optional_yield y)
     :
     RGWGetObj_Filter(next),
     dpp(dpp),
@@ -644,7 +670,8 @@ RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp,
     enc_begin_skip(0),
     ofs(0),
     end(0),
-    cache()
+    cache(),
+    y(y)
 {
   block_size = this->crypt->get_block_size();
 }
@@ -726,7 +753,7 @@ int RGWGetObj_BlockDecrypt::fixup_range(off_t& bl_ofs, off_t& bl_end) {
 int RGWGetObj_BlockDecrypt::process(bufferlist& in, size_t part_ofs, size_t size)
 {
   bufferlist data;
-  if (!crypt->decrypt(in, 0, size, data, part_ofs)) {
+  if (!crypt->decrypt(in, 0, size, data, part_ofs, y)) {
     return -ERR_INTERNAL_ERROR;
   }
   off_t send_size = size - enc_begin_skip;
@@ -799,12 +826,14 @@ int RGWGetObj_BlockDecrypt::flush() {
 RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp,
                                                CephContext* cct,
                                                rgw::sal::DataProcessor *next,
-                                               std::unique_ptr<BlockCrypt> crypt)
+                                               std::unique_ptr<BlockCrypt> crypt,
+                                               optional_yield y)
   : Pipe(next),
     dpp(dpp),
     cct(cct),
     crypt(std::move(crypt)),
-    block_size(this->crypt->get_block_size())
+    block_size(this->crypt->get_block_size()),
+    y(y)
 {
 }
 
@@ -826,7 +855,7 @@ int RGWPutObj_BlockEncrypt::process(bufferlist&& data, uint64_t logical_offset)
   if (proc_size > 0) {
     bufferlist in, out;
     cache.splice(0, proc_size, &in);
-    if (!crypt->encrypt(in, 0, proc_size, out, logical_offset)) {
+    if (!crypt->encrypt(in, 0, proc_size, out, logical_offset, y)) {
       return -ERR_INTERNAL_ERROR;
     }
     int r = Pipe::process(std::move(out), logical_offset);
index 6008dd05eaea61afd0e6f7df8b3f686d5c5a83de..b0c4886011276fd8a5c540596abaf25e3043e522 100644 (file)
@@ -13,6 +13,7 @@
 #include <rgw/rgw_rest.h>
 #include <rgw/rgw_rest_s3.h>
 #include "rgw_putobj.h"
+#include "common/async/yield_context.h"
 
 /**
  * \brief Interface for block encryption methods
@@ -55,7 +56,8 @@ public:
                        off_t in_ofs,
                        size_t size,
                        bufferlist& output,
-                       off_t stream_offset) = 0;
+                       off_t stream_offset,
+                       optional_yield y) = 0;
 
   /**
    * Decrypts data.
@@ -75,7 +77,8 @@ public:
                        off_t in_ofs,
                        size_t size,
                        bufferlist& output,
-                       off_t stream_offset) = 0;
+                       off_t stream_offset,
+                       optional_yield y) = 0;
 };
 
 static const size_t AES_256_KEYSIZE = 256 / 8;
@@ -97,6 +100,7 @@ class RGWGetObj_BlockDecrypt : public RGWGetObj_Filter {
   off_t end; /**< stream offset of last byte that is requested */
   bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */
   size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
+  optional_yield y;
 
   int process(bufferlist& cipher, size_t part_ofs, size_t size);
 
@@ -106,7 +110,8 @@ public:
   RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp,
                          CephContext* cct,
                          RGWGetObj_Filter* next,
-                         std::unique_ptr<BlockCrypt> crypt);
+                         std::unique_ptr<BlockCrypt> crypt,
+                         optional_yield y);
   virtual ~RGWGetObj_BlockDecrypt();
 
   virtual int fixup_range(off_t& bl_ofs,
@@ -128,11 +133,13 @@ class RGWPutObj_BlockEncrypt : public rgw::putobj::Pipe
                                           for operations when enough data is accumulated */
   bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */
   const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
+  optional_yield y;
 public:
   RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp,
                          CephContext* cct,
                          rgw::sal::DataProcessor *next,
-                         std::unique_ptr<BlockCrypt> crypt);
+                         std::unique_ptr<BlockCrypt> crypt,
+                         optional_yield y);
 
   int process(bufferlist&& data, uint64_t logical_offset) override;
 }; /* RGWPutObj_BlockEncrypt */
index ccb8a397aec258ca2f87b4e1389de659a3b26fd6..5860eddb6a7b20620e2a19bffb98f746c7ae68d1 100644 (file)
@@ -581,7 +581,7 @@ int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>
   res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses);
   if (res == 0) {
     if (block_crypt != nullptr) {
-      auto f = std::make_unique<RGWGetObj_BlockDecrypt>(s, s->cct, cb, std::move(block_crypt));
+      auto f = std::make_unique<RGWGetObj_BlockDecrypt>(s, s->cct, cb, std::move(block_crypt), s->yield);
       if (manifest_bl != nullptr) {
         res = f->read_manifest(this, *manifest_bl);
         if (res == 0) {
@@ -2726,8 +2726,7 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter(
   res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses_unused);
   if (res == 0) {
     if (block_crypt != nullptr) {
-      auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt)));
-      //RGWGetObj_BlockDecrypt* f = new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt));
+      auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
       if (f != nullptr) {
         if (manifest_bl != nullptr) {
           res = f->read_manifest(this, *manifest_bl);
@@ -2759,7 +2758,7 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
        * We use crypto mode that configured as if we were decrypting. */
       res = rgw_s3_prepare_decrypt(s, obj->get_attrs(), &block_crypt, crypt_http_responses);
       if (res == 0 && block_crypt != nullptr)
-        filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+        filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
     }
     /* it is ok, to not have encryption at all */
   }
@@ -2768,7 +2767,7 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter(
     std::unique_ptr<BlockCrypt> block_crypt;
     res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt, crypt_http_responses);
     if (res == 0 && block_crypt != nullptr) {
-      filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+      filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
     }
   }
   return res;
@@ -3316,7 +3315,7 @@ int RGWPostObj_ObjStore_S3::get_encrypt_filter(
   int res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt,
                                    crypt_http_responses);
   if (res == 0 && block_crypt != nullptr) {
-    filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+    filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
   }
   return res;
 }