From 0ce6d03bbcd30d516f17646847127b3b0ad649ae Mon Sep 17 00:00:00 2001 From: Feng Hualong Date: Mon, 25 Apr 2022 11:34:49 +0800 Subject: [PATCH] Crypto: Add QAT batch mode Now, the current code does not allow qat to exert its acceleration advantage, but leads to poor performance of qat. The QAT batch mode is implemented here, so that the QAT performance can be displayed. When we scale up the number of concurrent requests and worry about QAT instance will be bottleneck, we can fall back to CPU. And there add a parameter to set the times of the number of QAT instance to wait for free instance, which can avoid QAT to be not busy and make sure QAT full of utilization as much as possible. max_queue_size is up to max_requests Add optional_yield in RGWPutObj_BlockEncrypt and RGWGetObj_BlockDecrypt. Make it with coroutine and non-coroutine mode. Signed-off-by: Feng Hualong --- src/crypto/crypto_accel.h | 17 +- src/crypto/crypto_plugin.h | 5 +- src/crypto/isa-l/CMakeLists.txt | 3 + src/crypto/isa-l/isal_crypto_accel.cc | 10 +- src/crypto/isa-l/isal_crypto_accel.h | 15 +- src/crypto/isa-l/isal_crypto_plugin.h | 4 +- src/crypto/openssl/CMakeLists.txt | 3 +- src/crypto/openssl/openssl_crypto_accel.cc | 10 +- src/crypto/openssl/openssl_crypto_accel.h | 15 +- src/crypto/openssl/openssl_crypto_plugin.h | 5 +- src/crypto/qat/CMakeLists.txt | 3 +- src/crypto/qat/qat_crypto_accel.cc | 37 +- src/crypto/qat/qat_crypto_accel.h | 17 +- src/crypto/qat/qat_crypto_plugin.h | 4 +- src/crypto/qat/qcccrypto.cc | 590 ++++++++++++--------- src/crypto/qat/qcccrypto.h | 124 +++-- src/rgw/rgw_crypt.cc | 87 ++- src/rgw/rgw_crypt.h | 15 +- src/rgw/rgw_rest_s3.cc | 11 +- 19 files changed, 610 insertions(+), 365 deletions(-) diff --git a/src/crypto/crypto_accel.h b/src/crypto/crypto_accel.h index 5c15936099273..f2ba61906b4ac 100644 --- a/src/crypto/crypto_accel.h +++ b/src/crypto/crypto_accel.h @@ -17,21 +17,34 @@ #include #include "include/Context.h" +class optional_yield; + class CryptoAccel; typedef std::shared_ptr CryptoAccelRef; class CryptoAccel { public: CryptoAccel() {} + CryptoAccel(const size_t chunk_size, const size_t max_requests) {} virtual ~CryptoAccel() {} static const int AES_256_IVSIZE = 128/8; static const int AES_256_KEYSIZE = 256/8; virtual bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) = 0; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) = 0; virtual bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) = 0; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) = 0; + virtual bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) = 0; + virtual bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) = 0; }; #endif diff --git a/src/crypto/crypto_plugin.h b/src/crypto/crypto_plugin.h index cf22d5cb42505..1319659715237 100644 --- a/src/crypto/crypto_plugin.h +++ b/src/crypto/crypto_plugin.h @@ -20,6 +20,7 @@ #include "ostream" #include "crypto/crypto_accel.h" +#include // ----------------------------------------------------------------------------- class CryptoPlugin : public ceph::Plugin { @@ -31,6 +32,8 @@ public: ~CryptoPlugin() {} virtual int factory(CryptoAccelRef *cs, - std::ostream *ss) = 0; + std::ostream *ss, + const size_t chunk_size, + const size_t max_requests) = 0; }; #endif diff --git a/src/crypto/isa-l/CMakeLists.txt b/src/crypto/isa-l/CMakeLists.txt index 2a2ec0bc0cb28..c8d832247d92e 100644 --- a/src/crypto/isa-l/CMakeLists.txt +++ b/src/crypto/isa-l/CMakeLists.txt @@ -29,6 +29,9 @@ endif(HAVE_NASM_X64) add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs}) target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include) + +target_link_libraries(ceph_crypto_isal PRIVATE spawn) + set_target_properties(ceph_crypto_isal PROPERTIES VERSION 1.0.0 SOVERSION 1 diff --git a/src/crypto/isa-l/isal_crypto_accel.cc b/src/crypto/isa-l/isal_crypto_accel.cc index 7dccf64fd09c6..a22cd2c4fa00f 100644 --- a/src/crypto/isa-l/isal_crypto_accel.cc +++ b/src/crypto/isa-l/isal_crypto_accel.cc @@ -18,9 +18,10 @@ bool ISALCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { - if ((size % AES_256_IVSIZE) != 0) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } alignas(16) struct cbc_key_data keys_blk; @@ -31,9 +32,10 @@ bool ISALCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, s } bool ISALCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { - if ((size % AES_256_IVSIZE) != 0) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } alignas(16) struct cbc_key_data keys_blk; diff --git a/src/crypto/isa-l/isal_crypto_accel.h b/src/crypto/isa-l/isal_crypto_accel.h index 84331bbddd47a..7fffd5122bc97 100644 --- a/src/crypto/isa-l/isal_crypto_accel.h +++ b/src/crypto/isa-l/isal_crypto_accel.h @@ -15,6 +15,7 @@ #ifndef ISAL_CRYPTO_ACCEL_H #define ISAL_CRYPTO_ACCEL_H #include "crypto/crypto_accel.h" +#include "common/async/yield_context.h" class ISALCryptoAccel : public CryptoAccel { public: @@ -23,9 +24,19 @@ class ISALCryptoAccel : public CryptoAccel { bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; + bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } + bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } }; #endif diff --git a/src/crypto/isa-l/isal_crypto_plugin.h b/src/crypto/isa-l/isal_crypto_plugin.h index 68e782e69fc0a..50789b777225d 100644 --- a/src/crypto/isa-l/isal_crypto_plugin.h +++ b/src/crypto/isa-l/isal_crypto_plugin.h @@ -31,7 +31,9 @@ public: ~ISALCryptoPlugin() {} virtual int factory(CryptoAccelRef *cs, - std::ostream *ss) + std::ostream *ss, + const size_t chunk_size, + const size_t max_requests) { if (cryptoaccel == nullptr) { diff --git a/src/crypto/openssl/CMakeLists.txt b/src/crypto/openssl/CMakeLists.txt index 6ede1567f218d..5365ab9a6ca22 100644 --- a/src/crypto/openssl/CMakeLists.txt +++ b/src/crypto/openssl/CMakeLists.txt @@ -7,7 +7,8 @@ set(openssl_crypto_plugin_srcs add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs}) target_link_libraries(ceph_crypto_openssl PRIVATE OpenSSL::Crypto - $<$:ceph-common>) + $<$:ceph-common> + spawn) target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR}) add_dependencies(crypto_plugins ceph_crypto_openssl) set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "") diff --git a/src/crypto/openssl/openssl_crypto_accel.cc b/src/crypto/openssl/openssl_crypto_accel.cc index e6ea0fa7290cd..f99844a3848b4 100644 --- a/src/crypto/openssl/openssl_crypto_accel.cc +++ b/src/crypto/openssl/openssl_crypto_accel.cc @@ -77,9 +77,10 @@ bool evp_transform(unsigned char* out, const unsigned char* in, size_t size, bool OpenSSLCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { - if ((size % AES_256_IVSIZE) != 0) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } @@ -91,9 +92,10 @@ bool OpenSSLCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in bool OpenSSLCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { - if ((size % AES_256_IVSIZE) != 0) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } diff --git a/src/crypto/openssl/openssl_crypto_accel.h b/src/crypto/openssl/openssl_crypto_accel.h index ad90cbeceaf3e..90edf1ec6ecb7 100644 --- a/src/crypto/openssl/openssl_crypto_accel.h +++ b/src/crypto/openssl/openssl_crypto_accel.h @@ -16,6 +16,7 @@ #define OPENSSL_CRYPTO_ACCEL_H #include "crypto/crypto_accel.h" +#include "common/async/yield_context.h" class OpenSSLCryptoAccel : public CryptoAccel { public: @@ -24,9 +25,19 @@ class OpenSSLCryptoAccel : public CryptoAccel { bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; + bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } + bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } }; #endif diff --git a/src/crypto/openssl/openssl_crypto_plugin.h b/src/crypto/openssl/openssl_crypto_plugin.h index 408d9ebdaa7b8..86faf770146a9 100644 --- a/src/crypto/openssl/openssl_crypto_plugin.h +++ b/src/crypto/openssl/openssl_crypto_plugin.h @@ -25,7 +25,10 @@ class OpenSSLCryptoPlugin : public CryptoPlugin { public: explicit OpenSSLCryptoPlugin(CephContext* cct) : CryptoPlugin(cct) {} - int factory(CryptoAccelRef *cs, std::ostream *ss) override { + int factory(CryptoAccelRef *cs, + std::ostream *ss, + const size_t chunk_size, + const size_t max_requests) override { if (cryptoaccel == nullptr) cryptoaccel = CryptoAccelRef(new OpenSSLCryptoAccel); diff --git a/src/crypto/qat/CMakeLists.txt b/src/crypto/qat/CMakeLists.txt index fb751967a97a1..77791cacf79b3 100644 --- a/src/crypto/qat/CMakeLists.txt +++ b/src/crypto/qat/CMakeLists.txt @@ -13,7 +13,8 @@ add_dependencies(crypto_plugins ceph_crypto_qat) target_link_libraries(ceph_crypto_qat PRIVATE QatDrv::qat_s - QatDrv::usdm_drv_s) + QatDrv::usdm_drv_s + spawn) add_dependencies(crypto_plugins ceph_crypto_qat) set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1) diff --git a/src/crypto/qat/qat_crypto_accel.cc b/src/crypto/qat/qat_crypto_accel.cc index 23f86edfa7e25..6ff601de4ec6b 100644 --- a/src/crypto/qat/qat_crypto_accel.cc +++ b/src/crypto/qat/qat_crypto_accel.cc @@ -15,28 +15,33 @@ #include "crypto/qat/qat_crypto_accel.h" -bool QccCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, - const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) -{ - if ((size % AES_256_IVSIZE) != 0) { + + + +bool QccCryptoAccel::cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } - return qcccrypto.perform_op(out, in, size, - const_cast(&iv[0]), - const_cast(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT); + return qcccrypto.perform_op_batch(out, in, size, + const_cast(&iv[0][0]), + const_cast(&key[0]), + CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT, y); } -bool QccCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, - const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) -{ - if ((size % AES_256_IVSIZE) != 0) { +bool QccCryptoAccel::cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) { + if (unlikely((size % AES_256_IVSIZE) != 0)) { return false; } - return qcccrypto.perform_op(out, in, size, - const_cast(&iv[0]), - const_cast(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT); + return qcccrypto.perform_op_batch(out, in, size, + const_cast(&iv[0][0]), + const_cast(&key[0]), + CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT, y); } diff --git a/src/crypto/qat/qat_crypto_accel.h b/src/crypto/qat/qat_crypto_accel.h index 5badefc288345..714575799a9e1 100644 --- a/src/crypto/qat/qat_crypto_accel.h +++ b/src/crypto/qat/qat_crypto_accel.h @@ -18,18 +18,29 @@ #include "crypto/crypto_accel.h" #include "crypto/qat/qcccrypto.h" +#include "common/async/yield_context.h" class QccCryptoAccel : public CryptoAccel { public: QccCrypto qcccrypto; - QccCryptoAccel() { qcccrypto.init(); }; + QccCryptoAccel(const size_t chunk_size, const size_t max_requests):qcccrypto() { qcccrypto.init(chunk_size, max_requests); }; ~QccCryptoAccel() { qcccrypto.destroy(); }; bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size, const unsigned char (&iv)[AES_256_IVSIZE], - const unsigned char (&key)[AES_256_KEYSIZE]) override; + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override { return false; } + bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; + bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size, + const unsigned char iv[][AES_256_IVSIZE], + const unsigned char (&key)[AES_256_KEYSIZE], + optional_yield y) override; }; #endif diff --git a/src/crypto/qat/qat_crypto_plugin.h b/src/crypto/qat/qat_crypto_plugin.h index a8d4df7cb8f7e..52b8e9578d417 100644 --- a/src/crypto/qat/qat_crypto_plugin.h +++ b/src/crypto/qat/qat_crypto_plugin.h @@ -29,11 +29,11 @@ public: {} ~QccCryptoPlugin() {} - virtual int factory(CryptoAccelRef *cs, std::ostream *ss) + virtual int factory(CryptoAccelRef *cs, std::ostream *ss, const size_t chunk_size, const size_t max_requests) { std::lock_guard l(qat_init); if (cryptoaccel == nullptr) - cryptoaccel = CryptoAccelRef(new QccCryptoAccel); + cryptoaccel = CryptoAccelRef(new QccCryptoAccel(chunk_size, max_requests)); *cs = cryptoaccel; return 0; diff --git a/src/crypto/qat/qcccrypto.cc b/src/crypto/qat/qcccrypto.cc index a3f2537264348..35bb5d3459f77 100644 --- a/src/crypto/qat/qcccrypto.cc +++ b/src/crypto/qat/qcccrypto.cc @@ -2,10 +2,17 @@ #include #include "string.h" #include +#include #include "common/debug.h" #include "include/scope_guard.h" #include "common/dout.h" #include "common/errno.h" +#include +#include +#include +#include + +#include "boost/container/static_vector.hpp" // ----------------------------------------------------------------------------- #define dout_context g_ceph_context @@ -20,51 +27,111 @@ static std::ostream& _prefix(std::ostream* _dout) // ----------------------------------------------------------------------------- /* - * Poller thread & functions -*/ + * Callback function + */ +static void symDpCallback(CpaCySymDpOpData *pOpData, + CpaStatus status, + CpaBoolean verifyResult) +{ + if (nullptr != pOpData->pCallbackTag) + { + static_cast(pOpData->pCallbackTag)->complete(); + } +} + static std::mutex qcc_alloc_mutex; static std::mutex qcc_eng_mutex; static std::atomic init_called = { false }; - -void* QccCrypto::crypt_thread(void *args) { - struct qcc_thread_args *thread_args = (struct qcc_thread_args *)args; - thread_args->qccinstance->do_crypt(thread_args); - return thread_args; +static std::mutex poll_inst_mutex; +static std::condition_variable poll_inst_cv; + +#define NON_INSTANCE -1 +#define RETRY_MAX_NUM 100 + +template +auto QccCrypto::async_get_instance(CompletionToken&& token) { + using boost::asio::async_completion; + using Signature = void(int); + async_completion init(token); + + auto ex = boost::asio::get_associated_executor(init.completion_handler); + + boost::asio::post(my_pool, [this, ex, handler = std::move(init.completion_handler)]()mutable{ + auto handler1 = std::move(handler); + if (!open_instances.empty()) { + int avail_inst = open_instances.front(); + open_instances.pop_front(); + boost::asio::post(ex, std::bind(handler1, avail_inst)); + } else if (!instance_completions.full()) { + // keep a few objects to wait QAT instance to make sure qat full utilization as much as possible, + // that is, QAT don't need to wait for new objects to ensure + // that QAT will not be in a free state as much as possible + instance_completions.push_back([this, ex, handler2 = std::move(handler1)](int inst)mutable{ + boost::asio::post(ex, std::bind(handler2, inst)); + }); + } else { + boost::asio::post(ex, std::bind(handler1, NON_INSTANCE)); + } + }); + return init.result.get(); } void QccCrypto::QccFreeInstance(int entry) { - std::lock_guard freeinst(qcc_alloc_mutex); - open_instances.push(entry); -} - -int QccCrypto::QccGetFreeInstance() { - int ret = -1; - std::lock_guard getinst(qcc_alloc_mutex); - if (!open_instances.empty()) { - ret = open_instances.front(); - open_instances.pop(); - } - return ret; + boost::asio::post(my_pool, [this, entry]()mutable{ + if (!instance_completions.empty()) { + instance_completions.front()(entry); + instance_completions.pop_front(); + } else { + open_instances.push_back(entry); + } + }); } void QccCrypto::cleanup() { icp_sal_userStop(); qaeMemDestroy(); is_init = false; - init_stat = stat; init_called = false; derr << "Failure during QAT init sequence. Quitting" << dendl; } +void QccCrypto::poll_instances(void) { + CpaStatus stat = CPA_STATUS_SUCCESS; + poll_retry_num = RETRY_MAX_NUM; + while (!thread_stop) { + int free_instance_num = 0; + for (int iter = 0; iter < qcc_inst->num_instances; iter++) { + if (qcc_inst->is_polled[iter] == CPA_TRUE) { + stat = icp_sal_CyPollDpInstance(qcc_inst->cy_inst_handles[iter], 0); + if (stat != CPA_STATUS_SUCCESS) { + free_instance_num++; + } + } + } + if (free_instance_num == qcc_inst->num_instances) { + poll_retry_num--; + } else { + poll_retry_num = RETRY_MAX_NUM; + } + if (0 == poll_retry_num) { + std::unique_lock lock{poll_inst_mutex}; + poll_inst_cv.wait_for(lock, std::chrono::milliseconds(1), [this](){return poll_retry_num > 0;}); + poll_retry_num = RETRY_MAX_NUM; + } + } +} + /* * We initialize QAT instance and everything that is common for all ops */ -bool QccCrypto::init() -{ +bool QccCrypto::init(const size_t chunk_size, const size_t max_requests) { std::lock_guard l(qcc_eng_mutex); + CpaStatus stat = CPA_STATUS_SUCCESS; + this->chunk_size = chunk_size; + this->max_requests = max_requests; - if(init_called) { + if (init_called) { dout(10) << "Init sequence already called. Skipping duplicate call" << dendl; return true; } @@ -76,21 +143,21 @@ bool QccCrypto::init() // Find if the usermode memory driver is available. We need to this to // create contiguous memory needed by QAT. stat = qaeMemInit(); - if(stat != CPA_STATUS_SUCCESS) { + if (stat != CPA_STATUS_SUCCESS) { derr << "Unable to load memory driver" << dendl; this->cleanup(); return false; } stat = icp_sal_userStart("CEPH"); - if(stat != CPA_STATUS_SUCCESS) { + if (stat != CPA_STATUS_SUCCESS) { derr << "Unable to start qat device" << dendl; this->cleanup(); return false; } qcc_os_mem_alloc((void **)&qcc_inst, sizeof(QCCINST)); - if(qcc_inst == NULL) { + if (qcc_inst == NULL) { derr << "Unable to alloc mem for instance struct" << dendl; this->cleanup(); return false; @@ -121,12 +188,17 @@ bool QccCrypto::init() this->cleanup(); return false; } + dout(1) << "Get instances num: " << qcc_inst->num_instances << dendl; + if (max_requests > qcc_inst->num_instances) { + instance_completions.set_capacity(max_requests - qcc_inst->num_instances); + } + open_instances.set_capacity(qcc_inst->num_instances); int iter = 0; //Start Instances - for(iter = 0; iter < qcc_inst->num_instances; iter++) { + for (iter = 0; iter < qcc_inst->num_instances; iter++) { stat = cpaCyStartInstance(qcc_inst->cy_inst_handles[iter]); - if(stat != CPA_STATUS_SUCCESS) { + if (stat != CPA_STATUS_SUCCESS) { derr << "Unable to start instance" << dendl; this->cleanup(); return false; @@ -136,7 +208,7 @@ bool QccCrypto::init() qcc_os_mem_alloc((void **)&qcc_inst->is_polled, ((int)qcc_inst->num_instances * sizeof(CpaBoolean))); CpaInstanceInfo2 info; - for(iter = 0; iter < qcc_inst->num_instances; iter++) { + for (iter = 0; iter < qcc_inst->num_instances; iter++) { qcc_inst->is_polled[iter] = cpaCyInstanceGetInfo2(qcc_inst->cy_inst_handles[iter], &info) == CPA_STATUS_SUCCESS ? info.isPolled : CPA_FALSE; } @@ -144,7 +216,7 @@ bool QccCrypto::init() // Allocate memory structures for all instances qcc_os_mem_alloc((void **)&qcc_sess, ((int)qcc_inst->num_instances * sizeof(QCCSESS))); - if(qcc_sess == NULL) { + if (qcc_sess == NULL) { derr << "Unable to allocate memory for session struct" << dendl; this->cleanup(); return false; @@ -152,53 +224,34 @@ bool QccCrypto::init() qcc_os_mem_alloc((void **)&qcc_op_mem, ((int)qcc_inst->num_instances * sizeof(QCCOPMEM))); - if(qcc_sess == NULL) { + if (qcc_sess == NULL) { derr << "Unable to allocate memory for opmem struct" << dendl; this->cleanup(); return false; } - qcc_os_mem_alloc((void **)&cypollthreads, - ((int)qcc_inst->num_instances * sizeof(pthread_t))); - if(cypollthreads == NULL) { - derr << "Unable to allocate memory for pthreads" << dendl; - this->cleanup(); - return false; - } - //At this point we are only doing an user-space version. - //To-Do: Maybe a kernel based one - for(iter = 0; iter < qcc_inst->num_instances; iter++) { + for (iter = 0; iter < qcc_inst->num_instances; iter++) { stat = cpaCySetAddressTranslation(qcc_inst->cy_inst_handles[iter], qaeVirtToPhysNUMA); - if(stat == CPA_STATUS_SUCCESS) { - // Start HW Polling Thread - // To-Do: Enable epoll & interrupt based later? - // QccCyStartPoll(iter); - // Setup the session structures for crypto operation and populate - // whatever we can now. Rest will be filled in when crypto operation - // happens. - qcc_sess[iter].sess_ctx_sz = 0; - qcc_sess[iter].sess_ctx = NULL; - qcc_sess[iter].sess_stp_data.sessionPriority = CPA_CY_PRIORITY_NORMAL; - qcc_sess[iter].sess_stp_data.symOperation = CPA_CY_SYM_OP_CIPHER; - open_instances.push(iter); + if (stat == CPA_STATUS_SUCCESS) { + open_instances.push_back(iter); qcc_op_mem[iter].is_mem_alloc = false; - qcc_op_mem[iter].op_complete = false; - qcc_op_mem[iter].op_result = CPA_STATUS_SUCCESS; - qcc_op_mem[iter].sym_op_data = NULL; - qcc_op_mem[iter].buff_meta_size = qcc_op_mem[iter].buff_size = 0; - qcc_op_mem[iter].src_buff_meta = qcc_op_mem[iter].src_buff - = qcc_op_mem[iter].iv_buff = NULL; - qcc_op_mem[iter].src_buff_list = NULL; - qcc_op_mem[iter].src_buff_flat = NULL; - qcc_op_mem[iter].num_buffers = 1; + + stat = cpaCySymDpRegCbFunc(qcc_inst->cy_inst_handles[iter], symDpCallback); + if (stat != CPA_STATUS_SUCCESS) { + dout(1) << "Unable to register callback function for instance " << iter << " with status = " << stat << dendl; + return false; + } } else { - derr << "Unable to find address translations of instance " << iter << dendl; + dout(1) << "Unable to find address translations of instance " << iter << dendl; this->cleanup(); return false; } } + + qat_poll_thread = make_named_thread("qat_poll", &QccCrypto::poll_instances, this); + is_init = true; dout(10) << "Init complete" << dendl; return true; @@ -210,37 +263,32 @@ bool QccCrypto::destroy() { return false; } - unsigned int retry = 0; - while(retry <= QCC_MAX_RETRIES) { - if(open_instances.size() == qcc_inst->num_instances) { - break; - } else { - retry++; - } - dout(5) << "QAT is still busy and cannot free resources yet" << dendl; - return false; + thread_stop = true; + if (qat_poll_thread.joinable()) { + qat_poll_thread.join(); } + my_pool.join(); dout(10) << "Destroying QAT crypto & related memory" << dendl; int iter = 0; // Free up op related memory for (iter =0; iter < qcc_inst->num_instances; iter++) { - qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff)); - qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff)); - qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_list)); - qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_flat)); - qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data)); + for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) { + qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff[i])); + qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff[i])); + qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data[i])); + } } // Free up Session memory - for(iter = 0; iter < qcc_inst->num_instances; iter++) { - cpaCySymRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx); + for (iter = 0; iter < qcc_inst->num_instances; iter++) { + cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx); qcc_contig_mem_free((void **)&(qcc_sess[iter].sess_ctx)); } // Stop QAT Instances - for(iter = 0; iter < qcc_inst->num_instances; iter++) { + for (iter = 0; iter < qcc_inst->num_instances; iter++) { cpaCyStopInstance(qcc_inst->cy_inst_handles[iter]); } @@ -249,7 +297,6 @@ bool QccCrypto::destroy() { qcc_os_mem_free((void **)&qcc_sess); qcc_os_mem_free((void **)&(qcc_inst->cy_inst_handles)); qcc_os_mem_free((void **)&(qcc_inst->is_polled)); - qcc_os_mem_free((void **)&cypollthreads); qcc_os_mem_free((void **)&qcc_inst); //Un-init memory driver and QAT HW @@ -260,57 +307,43 @@ bool QccCrypto::destroy() { return true; } -void QccCrypto::do_crypt(qcc_thread_args *thread_args) { - auto entry = thread_args->entry; - qcc_op_mem[entry].op_result = cpaCySymPerformOp(qcc_inst->cy_inst_handles[entry], - NULL, - qcc_op_mem[entry].sym_op_data, - qcc_op_mem[entry].src_buff_list, - qcc_op_mem[entry].src_buff_list, - NULL); - qcc_op_mem[entry].op_complete = true; - free(thread_args); -} - -bool QccCrypto::perform_op(unsigned char* out, const unsigned char* in, - size_t size, uint8_t *iv, uint8_t *key, CpaCySymCipherDirection op_type) +bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, size_t size, + Cpa8U *iv, + Cpa8U *key, + CpaCySymCipherDirection op_type, + optional_yield y) { if (!init_called) { dout(10) << "QAT not intialized yet. Initializing now..." << dendl; - if(!QccCrypto::init()) { + if (!QccCrypto::init(chunk_size, max_requests)) { derr << "QAT init failed" << dendl; return false; } } - if(!is_init) + if (!is_init) { - dout(10) << "QAT not initialized in this instance or init failed with possible error " << (int)init_stat << dendl; + dout(10) << "QAT not initialized in this instance or init failed" << dendl; return is_init; } - - int avail_inst = -1; - unsigned int retrycount = 0; - while(retrycount <= QCC_MAX_RETRIES) { - avail_inst = QccGetFreeInstance(); - if(avail_inst != -1) { - break; - } else { - retrycount++; - usleep(qcc_sleep_duration); - } + CpaStatus status = CPA_STATUS_SUCCESS; + int avail_inst = NON_INSTANCE; + + if (y) { + yield_context yield = y.get_yield_context(); + avail_inst = async_get_instance(yield); + } else { + auto result = async_get_instance(boost::asio::use_future); + avail_inst = result.get(); } - if(avail_inst == -1) { - derr << "Unable to get an QAT instance. Failing request" << dendl; + if (avail_inst == NON_INSTANCE) { return false; } - dout(15) << "Using inst " << avail_inst << dendl; - // Start polling threads for this instance - //QccCyStartPoll(avail_inst); + dout(15) << "Using dp_batch inst " << avail_inst << dendl; - auto sg = make_scope_guard([=] { + auto sg = make_scope_guard([this, avail_inst] { //free up the instance irrespective of the op status dout(15) << "Completed task under " << avail_inst << dendl; qcc_op_mem[avail_inst].op_complete = false; @@ -322,150 +355,221 @@ bool QccCrypto::perform_op(unsigned char* out, const unsigned char* in, * Hold onto to most of them until destructor is called. */ if (qcc_op_mem[avail_inst].is_mem_alloc == false) { - - qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherAlgorithm = - CPA_CY_SYM_CIPHER_AES_CBC; - qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherKeyLenInBytes = - AES_256_KEY_SIZE; - - // Allocate contig memory for buffers that are independent of the - // input/output - stat = cpaCyBufferListGetMetaSize(qcc_inst->cy_inst_handles[avail_inst], - qcc_op_mem[avail_inst].num_buffers, &(qcc_op_mem[avail_inst].buff_meta_size)); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to get buff meta size" << dendl; - return false; - } - - // Allocate Buffer List Private metadata - stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_meta), - qcc_op_mem[avail_inst].buff_meta_size, 1); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to allocate private metadata memory" << dendl; - return false; - } - - // Allocate Buffer List Memory - qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_list), sizeof(CpaBufferList)); - qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_flat), - (qcc_op_mem[avail_inst].num_buffers * sizeof(CpaFlatBuffer))); - if(qcc_op_mem[avail_inst].src_buff_list == NULL || qcc_op_mem[avail_inst].src_buff_flat == NULL) { - derr << "Unable to allocate bufferlist memory" << dendl; - return false; - } - - // Allocate IV memory - stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff), AES_256_IV_LEN); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to allocate bufferlist memory" << dendl; - return false; - } - - //Assign src stuff for the operation - (qcc_op_mem[avail_inst].src_buff_list)->pBuffers = qcc_op_mem[avail_inst].src_buff_flat; - (qcc_op_mem[avail_inst].src_buff_list)->numBuffers = qcc_op_mem[avail_inst].num_buffers; - (qcc_op_mem[avail_inst].src_buff_list)->pPrivateMetaData = qcc_op_mem[avail_inst].src_buff_meta; - - //Setup OpData - stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data), - sizeof(CpaCySymOpData)); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to allocate opdata memory" << dendl; - return false; - } - - // Assuming op to be encryption for initiation. This will be reset when we - // exit this block - qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection = - CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT; - // Allocate Session memory - stat = cpaCySymSessionCtxGetSize(qcc_inst->cy_inst_handles[avail_inst], - &(qcc_sess[avail_inst].sess_stp_data), &(qcc_sess[avail_inst].sess_ctx_sz)); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to find session size" << dendl; - return false; - } - - stat = qcc_contig_mem_alloc((void **)&(qcc_sess[avail_inst].sess_ctx), - qcc_sess[avail_inst].sess_ctx_sz); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to allocate contig memory" << dendl; - return false; + for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) { + // Allocate IV memory + status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff[i]), AES_256_IV_LEN, 8); + if (status != CPA_STATUS_SUCCESS) { + derr << "Unable to allocate iv_buff memory" << dendl; + return false; + } + + // Allocate src memory + status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff[i]), chunk_size, 8); + if (status != CPA_STATUS_SUCCESS) { + derr << "Unable to allocate src_buff memory" << dendl; + return false; + } + + //Setup OpData + status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data[i]), + sizeof(CpaCySymDpOpData), 8); + if (status != CPA_STATUS_SUCCESS) { + derr << "Unable to allocate opdata memory" << dendl; + return false; + } } // Set memalloc flag so that we don't go through this exercise again. qcc_op_mem[avail_inst].is_mem_alloc = true; - dout(15) << "Instantiation complete for " << avail_inst << dendl; + qcc_sess[avail_inst].sess_ctx = nullptr; + status = initSession(qcc_inst->cy_inst_handles[avail_inst], + &(qcc_sess[avail_inst].sess_ctx), + (Cpa8U *)key, + op_type); + } else { + do { + cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[avail_inst], qcc_sess[avail_inst].sess_ctx); + status = initSession(qcc_inst->cy_inst_handles[avail_inst], + &(qcc_sess[avail_inst].sess_ctx), + (Cpa8U *)key, + op_type); + if (unlikely(status == CPA_STATUS_RETRY)) { + dout(1) << "cpaCySymDpRemoveSession and initSession retry" << dendl; + } + } while (status == CPA_STATUS_RETRY); } - - // Section that runs on every call - // Identify the operation and assign to session - qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection = op_type; - qcc_sess[avail_inst].sess_stp_data.cipherSetupData.pCipherKey = (Cpa8U *)key; - - stat = cpaCySymInitSession(qcc_inst->cy_inst_handles[avail_inst], - NULL, - &(qcc_sess[avail_inst].sess_stp_data), - qcc_sess[avail_inst].sess_ctx); - if (stat != CPA_STATUS_SUCCESS) { - derr << "Unable to init session" << dendl; + if (unlikely(status != CPA_STATUS_SUCCESS)) { + derr << "Unable to init session with status =" << status << dendl; return false; } - // Allocate actual buffers that will hold data - if (qcc_op_mem[avail_inst].buff_size != (Cpa32U)size) { - qcc_contig_mem_free((void **)&(qcc_op_mem[avail_inst].src_buff)); - qcc_op_mem[avail_inst].buff_size = (Cpa32U)size; - stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff), - qcc_op_mem[avail_inst].buff_size); - if(stat != CPA_STATUS_SUCCESS) { - derr << "Unable to allocate contig memory" << dendl; - return false; - } - } - - // Copy src & iv into the respective buffers - memcpy(qcc_op_mem[avail_inst].src_buff, in, size); - memcpy(qcc_op_mem[avail_inst].iv_buff, iv, AES_256_IV_LEN); - - //Assign the reminder of the stuff - qcc_op_mem[avail_inst].src_buff_flat->dataLenInBytes = qcc_op_mem[avail_inst].buff_size; - qcc_op_mem[avail_inst].src_buff_flat->pData = qcc_op_mem[avail_inst].src_buff; + return symPerformOp(avail_inst, + qcc_sess[avail_inst].sess_ctx, + in, + out, + size, + reinterpret_cast(iv), + AES_256_IV_LEN, y); +} - //OpData assignment - qcc_op_mem[avail_inst].sym_op_data->sessionCtx = qcc_sess[avail_inst].sess_ctx; - qcc_op_mem[avail_inst].sym_op_data->packetType = CPA_CY_SYM_PACKET_TYPE_FULL; - qcc_op_mem[avail_inst].sym_op_data->pIv = qcc_op_mem[avail_inst].iv_buff; - qcc_op_mem[avail_inst].sym_op_data->ivLenInBytes = AES_256_IV_LEN; - qcc_op_mem[avail_inst].sym_op_data->cryptoStartSrcOffsetInBytes = 0; - qcc_op_mem[avail_inst].sym_op_data->messageLenToCipherInBytes = qcc_op_mem[avail_inst].buff_size; +/* + * Perform session update + */ +CpaStatus QccCrypto::updateSession(CpaCySymSessionCtx sessionCtx, + Cpa8U *pCipherKey, + CpaCySymCipherDirection cipherDirection) { + CpaStatus status = CPA_STATUS_SUCCESS; + CpaCySymSessionUpdateData sessionUpdateData = {0}; + + sessionUpdateData.flags = CPA_CY_SYM_SESUPD_CIPHER_KEY; + sessionUpdateData.flags |= CPA_CY_SYM_SESUPD_CIPHER_DIR; + sessionUpdateData.pCipherKey = pCipherKey; + sessionUpdateData.cipherDirection = cipherDirection; + + status = cpaCySymUpdateSession(sessionCtx, &sessionUpdateData); + + if (unlikely(status != CPA_STATUS_SUCCESS)) { + dout(10) << "cpaCySymUpdateSession failed with status = " << status << dendl; + } - // Perform cipher operation in a thread - qcc_thread_args* thread_args = new qcc_thread_args(); - thread_args->qccinstance = this; - thread_args->entry = avail_inst; + return status; +} - if (pthread_create(&cypollthreads[avail_inst], NULL, crypt_thread, (void *)thread_args) != 0) { - derr << "Unable to create thread for crypt operation" << dendl; - return false; +CpaStatus QccCrypto::initSession(CpaInstanceHandle cyInstHandle, + CpaCySymSessionCtx *sessionCtx, + Cpa8U *pCipherKey, + CpaCySymCipherDirection cipherDirection) { + CpaStatus status = CPA_STATUS_SUCCESS; + Cpa32U sessionCtxSize = 0; + CpaCySymSessionSetupData sessionSetupData; + memset(&sessionSetupData, 0, sizeof(sessionSetupData)); + + sessionSetupData.sessionPriority = CPA_CY_PRIORITY_NORMAL; + sessionSetupData.symOperation = CPA_CY_SYM_OP_CIPHER; + sessionSetupData.cipherSetupData.cipherAlgorithm = CPA_CY_SYM_CIPHER_AES_CBC; + sessionSetupData.cipherSetupData.cipherKeyLenInBytes = AES_256_KEY_SIZE; + sessionSetupData.cipherSetupData.pCipherKey = pCipherKey; + sessionSetupData.cipherSetupData.cipherDirection = cipherDirection; + + if (nullptr == *sessionCtx) { + status = cpaCySymDpSessionCtxGetSize(cyInstHandle, &sessionSetupData, &sessionCtxSize); + if (likely(CPA_STATUS_SUCCESS == status)) { + status = qcc_contig_mem_alloc((void **)(sessionCtx), sessionCtxSize); + } else { + dout(1) << "cpaCySymDpSessionCtxGetSize failed with status = " << status << dendl; + } } - if (qcc_inst->is_polled[avail_inst] == CPA_TRUE) { - while (!qcc_op_mem[avail_inst].op_complete) { - icp_sal_CyPollInstance(qcc_inst->cy_inst_handles[avail_inst], 0); + if (likely(CPA_STATUS_SUCCESS == status)) { + status = cpaCySymDpInitSession(cyInstHandle, + &sessionSetupData, + *sessionCtx); + if (unlikely(status != CPA_STATUS_SUCCESS)) { + dout(1) << "cpaCySymDpInitSession failed with status = " << status << dendl; } + } else { + dout(1) << "Session alloc failed with status = " << status << dendl; } - pthread_join(cypollthreads[avail_inst], NULL); + return status; +} - if(qcc_op_mem[avail_inst].op_result != CPA_STATUS_SUCCESS) { - derr << "Unable to perform crypt operation" << dendl; - return false; +template +auto QatCrypto::async_perform_op(int avail_inst, std::span pOpDataVec, CompletionToken&& token) { + CpaStatus status = CPA_STATUS_SUCCESS; + using boost::asio::async_completion; + using Signature = void(CpaStatus); + async_completion init(token); + auto ex = boost::asio::get_associated_executor(init.completion_handler); + completion_handler = [this, ex, handler = init.completion_handler](CpaStatus stat) { + boost::asio::post(ex, std::bind(handler, stat)); + }; + + count = pOpDataVec.size(); + poll_inst_cv.notify_one(); + status = cpaCySymDpEnqueueOpBatch(pOpDataVec.size(), pOpDataVec.data(), CPA_TRUE); + + if (status != CPA_STATUS_SUCCESS) { + completion_handler(status); } + return init.result.get(); +} - //Copy data back to out buffer - memcpy(out, qcc_op_mem[avail_inst].src_buff, size); - //Always cleanup memory holding user-data at the end - memset(qcc_op_mem[avail_inst].iv_buff, 0, AES_256_IV_LEN); - memset(qcc_op_mem[avail_inst].src_buff, 0, qcc_op_mem[avail_inst].buff_size); +bool QccCrypto::symPerformOp(int avail_inst, + CpaCySymSessionCtx sessionCtx, + const Cpa8U *pSrc, + Cpa8U *pDst, + Cpa32U size, + Cpa8U *pIv, + Cpa32U ivLen, + optional_yield y) { + CpaStatus status = CPA_STATUS_SUCCESS; + Cpa32U one_batch_size = chunk_size * MAX_NUM_SYM_REQ_BATCH; + Cpa32U iv_index = 0; + size_t perform_retry_num = 0; + for (Cpa32U off = 0; off < size; off += one_batch_size) { + QatCrypto helper; + boost::container::static_vector pOpDataVec; + for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) { + CpaCySymDpOpData *pOpData = qcc_op_mem[avail_inst].sym_op_data[i]; + Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i]; + Cpa8U *pIvBuffer = qcc_op_mem[avail_inst].iv_buff[i]; + Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset; + // copy source into buffer + memcpy(pSrcBuffer, pSrc + offset, process_size); + // copy IV into buffer + memcpy(pIvBuffer, &pIv[iv_index * ivLen], ivLen); + iv_index++; + + //pOpData assignment + pOpData->thisPhys = qaeVirtToPhysNUMA(pOpData); + pOpData->instanceHandle = qcc_inst->cy_inst_handles[avail_inst]; + pOpData->sessionCtx = sessionCtx; + pOpData->pCallbackTag = &helper; + pOpData->cryptoStartSrcOffsetInBytes = 0; + pOpData->messageLenToCipherInBytes = process_size; + pOpData->iv = qaeVirtToPhysNUMA(pIvBuffer); + pOpData->pIv = pIvBuffer; + pOpData->ivLenInBytes = ivLen; + pOpData->srcBuffer = qaeVirtToPhysNUMA(pSrcBuffer); + pOpData->srcBufferLen = process_size; + pOpData->dstBuffer = qaeVirtToPhysNUMA(pSrcBuffer); + pOpData->dstBufferLen = process_size; + + pOpDataVec.push_back(pOpData); + } - return true; + do { + poll_retry_num = RETRY_MAX_NUM; + if (y) { + yield_context yield = y.get_yield_context(); + status = helper.async_perform_op(avail_inst, std::span(pOpDataVec), yield); + } else { + auto result = helper.async_perform_op(avail_inst, std::span(pOpDataVec), boost::asio::use_future); + status = result.get(); + } + if (status == CPA_STATUS_RETRY) { + if (++perform_retry_num > 3) { + cpaCySymDpPerformOpNow(qcc_inst->cy_inst_handles[avail_inst]); + return false; + } + } + } while (status == CPA_STATUS_RETRY); + + if (likely(CPA_STATUS_SUCCESS == status)) { + for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) { + Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i]; + Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset; + memcpy(pDst + offset, pSrcBuffer, process_size); + } + } else { + dout(1) << "async_perform_op failed with status = " << status << dendl; + break; + } + } + + Cpa32U max_used_buffer_num = iv_index > MAX_NUM_SYM_REQ_BATCH ? MAX_NUM_SYM_REQ_BATCH : iv_index; + for (Cpa32U i = 0; i < max_used_buffer_num; i++) { + memset(qcc_op_mem[avail_inst].src_buff[i], 0, chunk_size); + memset(qcc_op_mem[avail_inst].iv_buff[i], 0, ivLen); + } + return (CPA_STATUS_SUCCESS == status); } diff --git a/src/crypto/qat/qcccrypto.h b/src/crypto/qat/qcccrypto.h index a36b0898b7330..04cd4d9cafa3a 100644 --- a/src/crypto/qat/qcccrypto.h +++ b/src/crypto/qat/qcccrypto.h @@ -6,9 +6,22 @@ #include #include #include +#include +#include #include +#include +#include "common/async/yield_context.h" +#include +#include "common/ceph_mutex.h" +#include +#include +#include +#include "boost/circular_buffer.hpp" +#include "boost/asio/thread_pool.hpp" extern "C" { #include "cpa.h" +#include "cpa_cy_sym_dp.h" +#include "cpa_cy_im.h" #include "lac/cpa_cy_sym.h" #include "lac/cpa_cy_im.h" #include "qae_mem.h" @@ -18,27 +31,37 @@ extern "C" { } class QccCrypto { + friend class QatCrypto; + size_t chunk_size{0}; + size_t max_requests{0}; + + boost::asio::thread_pool my_pool{1}; + + boost::circular_buffer> instance_completions; + + template + auto async_get_instance(CompletionToken&& token); public: CpaCySymCipherDirection qcc_op_type; - QccCrypto() {}; - ~QccCrypto() {}; + QccCrypto() {}; + ~QccCrypto() { destroy(); }; - bool init(); + bool init(const size_t chunk_size, const size_t max_requests); bool destroy(); - bool perform_op(unsigned char* out, const unsigned char* in, size_t size, - uint8_t *iv, - uint8_t *key, - CpaCySymCipherDirection op_type); + bool perform_op_batch(unsigned char* out, const unsigned char* in, size_t size, + Cpa8U *iv, + Cpa8U *key, + CpaCySymCipherDirection op_type, + optional_yield y); private: - // Currently only supporting AES_256_CBC. // To-Do: Needs to be expanded static const size_t AES_256_IV_LEN = 16; static const size_t AES_256_KEY_SIZE = 32; - static const size_t QCC_MAX_RETRIES = 5000; + static const size_t MAX_NUM_SYM_REQ_BATCH = 32; /* * Struct to hold an instance of QAT to handle the crypto operations. These @@ -62,7 +85,6 @@ class QccCrypto { * single crypto or multi-buffer crypto. */ struct QCCSESS { - CpaCySymSessionSetupData sess_stp_data; Cpa32U sess_ctx_sz; CpaCySymSessionCtx sess_ctx; } *qcc_sess; @@ -76,39 +98,18 @@ class QccCrypto { // Op common items bool is_mem_alloc; bool op_complete; - CpaStatus op_result; - CpaCySymOpData *sym_op_data; - Cpa32U buff_meta_size; - Cpa32U num_buffers; - Cpa32U buff_size; - - //Src data items - Cpa8U *src_buff_meta; - CpaBufferList *src_buff_list; - CpaFlatBuffer *src_buff_flat; - Cpa8U *src_buff; - Cpa8U *iv_buff; + CpaCySymDpOpData *sym_op_data[MAX_NUM_SYM_REQ_BATCH]; + Cpa8U *src_buff[MAX_NUM_SYM_REQ_BATCH]; + Cpa8U *iv_buff[MAX_NUM_SYM_REQ_BATCH]; } *qcc_op_mem; - //QAT HW polling thread input structure - struct qcc_thread_args { - QccCrypto* qccinstance; - int entry; - }; - - - /* - * Function to handle the crypt operation. Will run while the main thread - * runs the polling function on the instance doing the op - */ - void do_crypt(qcc_thread_args *thread_args); - /* * Handle queue with free instances to handle op */ - std::queue open_instances; - int QccGetFreeInstance(); + boost::circular_buffer open_instances; void QccFreeInstance(int entry); + std::thread qat_poll_thread; + bool thread_stop{false}; /* * Contiguous Memory Allocator and de-allocator. We are using the usdm @@ -153,7 +154,6 @@ class QccCrypto { } std::atomic is_init = { false }; - CpaStatus init_stat, stat; /* * Function to cleanup memory if constructor fails @@ -166,11 +166,49 @@ class QccCrypto { * associated callbacks. For synchronous operation (like this one), QAT * library creates an internal callback for the operation. */ - static void* crypt_thread(void* entry); - CpaStatus QccCyStartPoll(int entry); - void poll_instance(int entry); + void poll_instances(void); + std::atomic poll_retry_num{0}; + + bool symPerformOp(int avail_inst, + CpaCySymSessionCtx sessionCtx, + const Cpa8U *pSrc, + Cpa8U *pDst, + Cpa32U size, + Cpa8U *pIv, + Cpa32U ivLen, + optional_yield y); + + CpaStatus initSession(CpaInstanceHandle cyInstHandle, + CpaCySymSessionCtx *sessionCtx, + Cpa8U *pCipherKey, + CpaCySymCipherDirection cipherDirection); + + CpaStatus updateSession(CpaCySymSessionCtx sessionCtx, + Cpa8U *pCipherKey, + CpaCySymCipherDirection cipherDirection); + + +}; + +class QatCrypto { + private: + std::function completion_handler; + std::atomic count; + public: + void complete() { + if (--count == 0) { + completion_handler(CPA_STATUS_SUCCESS); + } + return ; + } + + QatCrypto () : count(0) {} + QatCrypto (const QatCrypto &qat) = delete; + QatCrypto (QatCrypto &&qat) = delete; + void operator=(const QatCrypto &qat) = delete; + void operator=(QatCrypto &&qat) = delete; - pthread_t *cypollthreads; - static const size_t qcc_sleep_duration = 2; + template + auto async_perform_op(int avail_inst, std::span pOpDataVec, CompletionToken&& token); }; #endif //QCCCRYPTO_H diff --git a/src/rgw/rgw_crypt.cc b/src/rgw/rgw_crypt.cc index e4fd061685575..f101bc24c46a7 100644 --- a/src/rgw/rgw_crypt.cc +++ b/src/rgw/rgw_crypt.cc @@ -288,7 +288,7 @@ mec_option::empty }; } -CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct) +CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct, const size_t chunk_size, const size_t max_requests) { CryptoAccelRef ca_impl = nullptr; stringstream ss; @@ -300,7 +300,7 @@ CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct) ldpp_dout(dpp, -1) << __func__ << " cannot load crypto accelerator of type " << crypto_accel_type << dendl; return nullptr; } - int err = factory->factory(&ca_impl, &ss); + int err = factory->factory(&ca_impl, &ss, chunk_size, max_requests); if (err) { ldpp_dout(dpp, -1) << __func__ << " factory return error " << err << " with description: " << ss.str() << dendl; @@ -404,6 +404,7 @@ public: static const size_t AES_256_KEYSIZE = 256 / 8; static const size_t AES_256_IVSIZE = 128 / 8; static const size_t CHUNK_SIZE = 4096; + static const size_t QAT_MIN_SIZE = 65536; const DoutPrefixProvider* dpp; private: static const uint8_t IV[AES_256_IVSIZE]; @@ -442,33 +443,55 @@ public: size_t size, off_t stream_offset, const unsigned char (&key)[AES_256_KEYSIZE], - bool encrypt) + bool encrypt, + optional_yield y) { static std::atomic failed_to_get_crypto(false); CryptoAccelRef crypto_accel; if (! failed_to_get_crypto.load()) { - crypto_accel = get_crypto_accel(this->dpp, cct); + static size_t max_requests = g_ceph_context->_conf->rgw_thread_pool_size; + crypto_accel = get_crypto_accel(this->dpp, cct, CHUNK_SIZE, max_requests); if (!crypto_accel) failed_to_get_crypto = true; } - bool result = true; - unsigned char iv[AES_256_IVSIZE]; - for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) { - size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset; - prepare_iv(iv, stream_offset + offset); - if (crypto_accel != nullptr) { - if (encrypt) { - result = crypto_accel->cbc_encrypt(out + offset, in + offset, - process_size, iv, key); + bool result = false; + static std::string accelerator = cct->_conf->plugin_crypto_accelerator; + if (accelerator == "crypto_qat" && crypto_accel != nullptr && size >= QAT_MIN_SIZE) { + // now, batch mode is only for QAT plugin + size_t iv_num = size / CHUNK_SIZE; + if (size % CHUNK_SIZE) ++iv_num; + auto iv = new unsigned char[iv_num][AES_256_IVSIZE]; + for (size_t offset = 0, i = 0; offset < size; offset += CHUNK_SIZE, i++) { + prepare_iv(iv[i], stream_offset + offset); + } + if (encrypt) { + result = crypto_accel->cbc_encrypt_batch(out, in, size, iv, key, y); + } else { + result = crypto_accel->cbc_decrypt_batch(out, in, size, iv, key, y); + } + delete[] iv; + } + if (result == false) { + // If QAT don't have free instance, we can fall back to this + result = true; + unsigned char iv[AES_256_IVSIZE]; + for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) { + size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset; + prepare_iv(iv, stream_offset + offset); + if (crypto_accel != nullptr && accelerator != "crypto_qat") { + if (encrypt) { + result = crypto_accel->cbc_encrypt(out + offset, in + offset, + process_size, iv, key, y); + } else { + result = crypto_accel->cbc_decrypt(out + offset, in + offset, + process_size, iv, key, y); + } } else { - result = crypto_accel->cbc_decrypt(out + offset, in + offset, - process_size, iv, key); + result = cbc_transform( + out + offset, in + offset, process_size, + iv, key, encrypt); } - } else { - result = cbc_transform( - out + offset, in + offset, process_size, - iv, key, encrypt); } } return result; @@ -479,7 +502,8 @@ public: off_t in_ofs, size_t size, bufferlist& output, - off_t stream_offset) + off_t stream_offset, + optional_yield y) { bool result = false; size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE; @@ -493,7 +517,7 @@ public: result = cbc_transform(buf_raw, input_raw + in_ofs, aligned_size, - stream_offset, key, true); + stream_offset, key, true, y); if (result && (unaligned_rest_size > 0)) { /* remainder to encrypt */ if (aligned_size % CHUNK_SIZE > 0) { @@ -534,7 +558,8 @@ public: off_t in_ofs, size_t size, bufferlist& output, - off_t stream_offset) + off_t stream_offset, + optional_yield y) { bool result = false; size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE; @@ -548,7 +573,7 @@ public: result = cbc_transform(buf_raw, input_raw + in_ofs, aligned_size, - stream_offset, key, false); + stream_offset, key, false, y); if (result && unaligned_rest_size > 0) { /* remainder to decrypt */ if (aligned_size % CHUNK_SIZE > 0) { @@ -635,7 +660,8 @@ bool AES_256_ECB_encrypt(const DoutPrefixProvider* dpp, RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp, CephContext* cct, RGWGetObj_Filter* next, - std::unique_ptr crypt) + std::unique_ptr crypt, + optional_yield y) : RGWGetObj_Filter(next), dpp(dpp), @@ -644,7 +670,8 @@ RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp, enc_begin_skip(0), ofs(0), end(0), - cache() + cache(), + y(y) { block_size = this->crypt->get_block_size(); } @@ -726,7 +753,7 @@ int RGWGetObj_BlockDecrypt::fixup_range(off_t& bl_ofs, off_t& bl_end) { int RGWGetObj_BlockDecrypt::process(bufferlist& in, size_t part_ofs, size_t size) { bufferlist data; - if (!crypt->decrypt(in, 0, size, data, part_ofs)) { + if (!crypt->decrypt(in, 0, size, data, part_ofs, y)) { return -ERR_INTERNAL_ERROR; } off_t send_size = size - enc_begin_skip; @@ -799,12 +826,14 @@ int RGWGetObj_BlockDecrypt::flush() { RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp, CephContext* cct, rgw::sal::DataProcessor *next, - std::unique_ptr crypt) + std::unique_ptr crypt, + optional_yield y) : Pipe(next), dpp(dpp), cct(cct), crypt(std::move(crypt)), - block_size(this->crypt->get_block_size()) + block_size(this->crypt->get_block_size()), + y(y) { } @@ -826,7 +855,7 @@ int RGWPutObj_BlockEncrypt::process(bufferlist&& data, uint64_t logical_offset) if (proc_size > 0) { bufferlist in, out; cache.splice(0, proc_size, &in); - if (!crypt->encrypt(in, 0, proc_size, out, logical_offset)) { + if (!crypt->encrypt(in, 0, proc_size, out, logical_offset, y)) { return -ERR_INTERNAL_ERROR; } int r = Pipe::process(std::move(out), logical_offset); diff --git a/src/rgw/rgw_crypt.h b/src/rgw/rgw_crypt.h index 6008dd05eaea6..b0c4886011276 100644 --- a/src/rgw/rgw_crypt.h +++ b/src/rgw/rgw_crypt.h @@ -13,6 +13,7 @@ #include #include #include "rgw_putobj.h" +#include "common/async/yield_context.h" /** * \brief Interface for block encryption methods @@ -55,7 +56,8 @@ public: off_t in_ofs, size_t size, bufferlist& output, - off_t stream_offset) = 0; + off_t stream_offset, + optional_yield y) = 0; /** * Decrypts data. @@ -75,7 +77,8 @@ public: off_t in_ofs, size_t size, bufferlist& output, - off_t stream_offset) = 0; + off_t stream_offset, + optional_yield y) = 0; }; static const size_t AES_256_KEYSIZE = 256 / 8; @@ -97,6 +100,7 @@ class RGWGetObj_BlockDecrypt : public RGWGetObj_Filter { off_t end; /**< stream offset of last byte that is requested */ bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */ size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */ + optional_yield y; int process(bufferlist& cipher, size_t part_ofs, size_t size); @@ -106,7 +110,8 @@ public: RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp, CephContext* cct, RGWGetObj_Filter* next, - std::unique_ptr crypt); + std::unique_ptr crypt, + optional_yield y); virtual ~RGWGetObj_BlockDecrypt(); virtual int fixup_range(off_t& bl_ofs, @@ -128,11 +133,13 @@ class RGWPutObj_BlockEncrypt : public rgw::putobj::Pipe for operations when enough data is accumulated */ bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */ const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */ + optional_yield y; public: RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp, CephContext* cct, rgw::sal::DataProcessor *next, - std::unique_ptr crypt); + std::unique_ptr crypt, + optional_yield y); int process(bufferlist&& data, uint64_t logical_offset) override; }; /* RGWPutObj_BlockEncrypt */ diff --git a/src/rgw/rgw_rest_s3.cc b/src/rgw/rgw_rest_s3.cc index ccb8a397aec25..5860eddb6a7b2 100644 --- a/src/rgw/rgw_rest_s3.cc +++ b/src/rgw/rgw_rest_s3.cc @@ -581,7 +581,7 @@ int RGWGetObj_ObjStore_S3::get_decrypt_filter(std::unique_ptr res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses); if (res == 0) { if (block_crypt != nullptr) { - auto f = std::make_unique(s, s->cct, cb, std::move(block_crypt)); + auto f = std::make_unique(s, s->cct, cb, std::move(block_crypt), s->yield); if (manifest_bl != nullptr) { res = f->read_manifest(this, *manifest_bl); if (res == 0) { @@ -2726,8 +2726,7 @@ int RGWPutObj_ObjStore_S3::get_decrypt_filter( res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses_unused); if (res == 0) { if (block_crypt != nullptr) { - auto f = std::unique_ptr(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt))); - //RGWGetObj_BlockDecrypt* f = new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt)); + auto f = std::unique_ptr(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt), s->yield)); if (f != nullptr) { if (manifest_bl != nullptr) { res = f->read_manifest(this, *manifest_bl); @@ -2759,7 +2758,7 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter( * We use crypto mode that configured as if we were decrypting. */ res = rgw_s3_prepare_decrypt(s, obj->get_attrs(), &block_crypt, crypt_http_responses); if (res == 0 && block_crypt != nullptr) - filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt))); + filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield)); } /* it is ok, to not have encryption at all */ } @@ -2768,7 +2767,7 @@ int RGWPutObj_ObjStore_S3::get_encrypt_filter( std::unique_ptr block_crypt; res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt, crypt_http_responses); if (res == 0 && block_crypt != nullptr) { - filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt))); + filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield)); } } return res; @@ -3316,7 +3315,7 @@ int RGWPostObj_ObjStore_S3::get_encrypt_filter( int res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt, crypt_http_responses); if (res == 0 && block_crypt != nullptr) { - filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt))); + filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield)); } return res; } -- 2.39.5