#include <cstddef>
#include "include/Context.h"
+class optional_yield;
+
class CryptoAccel;
typedef std::shared_ptr<CryptoAccel> CryptoAccelRef;
class CryptoAccel {
public:
CryptoAccel() {}
+ CryptoAccel(const size_t chunk_size, const size_t max_requests) {}
virtual ~CryptoAccel() {}
static const int AES_256_IVSIZE = 128/8;
static const int AES_256_KEYSIZE = 256/8;
virtual bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) = 0;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) = 0;
virtual bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) = 0;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) = 0;
+ virtual bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) = 0;
+ virtual bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) = 0;
};
#endif
#include "ostream"
#include "crypto/crypto_accel.h"
+#include <boost/asio/io_context.hpp>
// -----------------------------------------------------------------------------
class CryptoPlugin : public ceph::Plugin {
~CryptoPlugin()
{}
virtual int factory(CryptoAccelRef *cs,
- std::ostream *ss) = 0;
+ std::ostream *ss,
+ const size_t chunk_size,
+ const size_t max_requests) = 0;
};
#endif
add_library(ceph_crypto_isal SHARED ${isal_crypto_plugin_srcs})
target_include_directories(ceph_crypto_isal PRIVATE ${isal_dir}/include)
+
+target_link_libraries(ceph_crypto_isal PRIVATE spawn)
+
set_target_properties(ceph_crypto_isal PROPERTIES
VERSION 1.0.0
SOVERSION 1
bool ISALCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y)
{
- if ((size % AES_256_IVSIZE) != 0) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
alignas(16) struct cbc_key_data keys_blk;
}
bool ISALCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y)
{
- if ((size % AES_256_IVSIZE) != 0) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
alignas(16) struct cbc_key_data keys_blk;
#ifndef ISAL_CRYPTO_ACCEL_H
#define ISAL_CRYPTO_ACCEL_H
#include "crypto/crypto_accel.h"
+#include "common/async/yield_context.h"
class ISALCryptoAccel : public CryptoAccel {
public:
bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
+ bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
+ bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
};
#endif
~ISALCryptoPlugin()
{}
virtual int factory(CryptoAccelRef *cs,
- std::ostream *ss)
+ std::ostream *ss,
+ const size_t chunk_size,
+ const size_t max_requests)
{
if (cryptoaccel == nullptr)
{
add_library(ceph_crypto_openssl SHARED ${openssl_crypto_plugin_srcs})
target_link_libraries(ceph_crypto_openssl
PRIVATE OpenSSL::Crypto
- $<$<PLATFORM_ID:Windows>:ceph-common>)
+ $<$<PLATFORM_ID:Windows>:ceph-common>
+ spawn)
target_include_directories(ceph_crypto_openssl PRIVATE ${OPENSSL_INCLUDE_DIR})
add_dependencies(crypto_plugins ceph_crypto_openssl)
set_target_properties(ceph_crypto_openssl PROPERTIES INSTALL_RPATH "")
bool OpenSSLCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y)
{
- if ((size % AES_256_IVSIZE) != 0) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
bool OpenSSLCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y)
{
- if ((size % AES_256_IVSIZE) != 0) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
#define OPENSSL_CRYPTO_ACCEL_H
#include "crypto/crypto_accel.h"
+#include "common/async/yield_context.h"
class OpenSSLCryptoAccel : public CryptoAccel {
public:
bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
+ bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
+ bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
};
#endif
public:
explicit OpenSSLCryptoPlugin(CephContext* cct) : CryptoPlugin(cct)
{}
- int factory(CryptoAccelRef *cs, std::ostream *ss) override {
+ int factory(CryptoAccelRef *cs,
+ std::ostream *ss,
+ const size_t chunk_size,
+ const size_t max_requests) override {
if (cryptoaccel == nullptr)
cryptoaccel = CryptoAccelRef(new OpenSSLCryptoAccel);
target_link_libraries(ceph_crypto_qat PRIVATE
QatDrv::qat_s
- QatDrv::usdm_drv_s)
+ QatDrv::usdm_drv_s
+ spawn)
add_dependencies(crypto_plugins ceph_crypto_qat)
set_target_properties(ceph_crypto_qat PROPERTIES VERSION 1.0.0 SOVERSION 1)
#include "crypto/qat/qat_crypto_accel.h"
-bool QccCryptoAccel::cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
- const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
-{
- if ((size % AES_256_IVSIZE) != 0) {
+
+
+
+bool QccCryptoAccel::cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
- return qcccrypto.perform_op(out, in, size,
- const_cast<unsigned char *>(&iv[0]),
- const_cast<unsigned char *>(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT);
+ return qcccrypto.perform_op_batch(out, in, size,
+ const_cast<unsigned char *>(&iv[0][0]),
+ const_cast<unsigned char *>(&key[0]),
+ CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT, y);
}
-bool QccCryptoAccel::cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
- const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE])
-{
- if ((size % AES_256_IVSIZE) != 0) {
+bool QccCryptoAccel::cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) {
+ if (unlikely((size % AES_256_IVSIZE) != 0)) {
return false;
}
- return qcccrypto.perform_op(out, in, size,
- const_cast<unsigned char *>(&iv[0]),
- const_cast<unsigned char *>(&key[0]), CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT);
+ return qcccrypto.perform_op_batch(out, in, size,
+ const_cast<unsigned char *>(&iv[0][0]),
+ const_cast<unsigned char *>(&key[0]),
+ CPA_CY_SYM_CIPHER_DIRECTION_DECRYPT, y);
}
#include "crypto/crypto_accel.h"
#include "crypto/qat/qcccrypto.h"
+#include "common/async/yield_context.h"
class QccCryptoAccel : public CryptoAccel {
public:
QccCrypto qcccrypto;
- QccCryptoAccel() { qcccrypto.init(); };
+ QccCryptoAccel(const size_t chunk_size, const size_t max_requests):qcccrypto() { qcccrypto.init(chunk_size, max_requests); };
~QccCryptoAccel() { qcccrypto.destroy(); };
bool cbc_encrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
bool cbc_decrypt(unsigned char* out, const unsigned char* in, size_t size,
const unsigned char (&iv)[AES_256_IVSIZE],
- const unsigned char (&key)[AES_256_KEYSIZE]) override;
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override { return false; }
+ bool cbc_encrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
+ bool cbc_decrypt_batch(unsigned char* out, const unsigned char* in, size_t size,
+ const unsigned char iv[][AES_256_IVSIZE],
+ const unsigned char (&key)[AES_256_KEYSIZE],
+ optional_yield y) override;
};
#endif
{}
~QccCryptoPlugin()
{}
- virtual int factory(CryptoAccelRef *cs, std::ostream *ss)
+ virtual int factory(CryptoAccelRef *cs, std::ostream *ss, const size_t chunk_size, const size_t max_requests)
{
std::lock_guard<std::mutex> l(qat_init);
if (cryptoaccel == nullptr)
- cryptoaccel = CryptoAccelRef(new QccCryptoAccel);
+ cryptoaccel = CryptoAccelRef(new QccCryptoAccel(chunk_size, max_requests));
*cs = cryptoaccel;
return 0;
#include <iostream>
#include "string.h"
#include <pthread.h>
+#include <condition_variable>
#include "common/debug.h"
#include "include/scope_guard.h"
#include "common/dout.h"
#include "common/errno.h"
+#include <atomic>
+#include <utility>
+#include <future>
+#include <chrono>
+
+#include "boost/container/static_vector.hpp"
// -----------------------------------------------------------------------------
#define dout_context g_ceph_context
// -----------------------------------------------------------------------------
/*
- * Poller thread & functions
-*/
+ * Callback function
+ */
+static void symDpCallback(CpaCySymDpOpData *pOpData,
+ CpaStatus status,
+ CpaBoolean verifyResult)
+{
+ if (nullptr != pOpData->pCallbackTag)
+ {
+ static_cast<QatCrypto*>(pOpData->pCallbackTag)->complete();
+ }
+}
+
static std::mutex qcc_alloc_mutex;
static std::mutex qcc_eng_mutex;
static std::atomic<bool> init_called = { false };
-
-void* QccCrypto::crypt_thread(void *args) {
- struct qcc_thread_args *thread_args = (struct qcc_thread_args *)args;
- thread_args->qccinstance->do_crypt(thread_args);
- return thread_args;
+static std::mutex poll_inst_mutex;
+static std::condition_variable poll_inst_cv;
+
+#define NON_INSTANCE -1
+#define RETRY_MAX_NUM 100
+
+template <typename CompletionToken>
+auto QccCrypto::async_get_instance(CompletionToken&& token) {
+ using boost::asio::async_completion;
+ using Signature = void(int);
+ async_completion<CompletionToken, Signature> init(token);
+
+ auto ex = boost::asio::get_associated_executor(init.completion_handler);
+
+ boost::asio::post(my_pool, [this, ex, handler = std::move(init.completion_handler)]()mutable{
+ auto handler1 = std::move(handler);
+ if (!open_instances.empty()) {
+ int avail_inst = open_instances.front();
+ open_instances.pop_front();
+ boost::asio::post(ex, std::bind(handler1, avail_inst));
+ } else if (!instance_completions.full()) {
+ // keep a few objects to wait QAT instance to make sure qat full utilization as much as possible,
+ // that is, QAT don't need to wait for new objects to ensure
+ // that QAT will not be in a free state as much as possible
+ instance_completions.push_back([this, ex, handler2 = std::move(handler1)](int inst)mutable{
+ boost::asio::post(ex, std::bind(handler2, inst));
+ });
+ } else {
+ boost::asio::post(ex, std::bind(handler1, NON_INSTANCE));
+ }
+ });
+ return init.result.get();
}
void QccCrypto::QccFreeInstance(int entry) {
- std::lock_guard<std::mutex> freeinst(qcc_alloc_mutex);
- open_instances.push(entry);
-}
-
-int QccCrypto::QccGetFreeInstance() {
- int ret = -1;
- std::lock_guard<std::mutex> getinst(qcc_alloc_mutex);
- if (!open_instances.empty()) {
- ret = open_instances.front();
- open_instances.pop();
- }
- return ret;
+ boost::asio::post(my_pool, [this, entry]()mutable{
+ if (!instance_completions.empty()) {
+ instance_completions.front()(entry);
+ instance_completions.pop_front();
+ } else {
+ open_instances.push_back(entry);
+ }
+ });
}
void QccCrypto::cleanup() {
icp_sal_userStop();
qaeMemDestroy();
is_init = false;
- init_stat = stat;
init_called = false;
derr << "Failure during QAT init sequence. Quitting" << dendl;
}
+void QccCrypto::poll_instances(void) {
+ CpaStatus stat = CPA_STATUS_SUCCESS;
+ poll_retry_num = RETRY_MAX_NUM;
+ while (!thread_stop) {
+ int free_instance_num = 0;
+ for (int iter = 0; iter < qcc_inst->num_instances; iter++) {
+ if (qcc_inst->is_polled[iter] == CPA_TRUE) {
+ stat = icp_sal_CyPollDpInstance(qcc_inst->cy_inst_handles[iter], 0);
+ if (stat != CPA_STATUS_SUCCESS) {
+ free_instance_num++;
+ }
+ }
+ }
+ if (free_instance_num == qcc_inst->num_instances) {
+ poll_retry_num--;
+ } else {
+ poll_retry_num = RETRY_MAX_NUM;
+ }
+ if (0 == poll_retry_num) {
+ std::unique_lock lock{poll_inst_mutex};
+ poll_inst_cv.wait_for(lock, std::chrono::milliseconds(1), [this](){return poll_retry_num > 0;});
+ poll_retry_num = RETRY_MAX_NUM;
+ }
+ }
+}
+
/*
* We initialize QAT instance and everything that is common for all ops
*/
-bool QccCrypto::init()
-{
+bool QccCrypto::init(const size_t chunk_size, const size_t max_requests) {
std::lock_guard<std::mutex> l(qcc_eng_mutex);
+ CpaStatus stat = CPA_STATUS_SUCCESS;
+ this->chunk_size = chunk_size;
+ this->max_requests = max_requests;
- if(init_called) {
+ if (init_called) {
dout(10) << "Init sequence already called. Skipping duplicate call" << dendl;
return true;
}
// Find if the usermode memory driver is available. We need to this to
// create contiguous memory needed by QAT.
stat = qaeMemInit();
- if(stat != CPA_STATUS_SUCCESS) {
+ if (stat != CPA_STATUS_SUCCESS) {
derr << "Unable to load memory driver" << dendl;
this->cleanup();
return false;
}
stat = icp_sal_userStart("CEPH");
- if(stat != CPA_STATUS_SUCCESS) {
+ if (stat != CPA_STATUS_SUCCESS) {
derr << "Unable to start qat device" << dendl;
this->cleanup();
return false;
}
qcc_os_mem_alloc((void **)&qcc_inst, sizeof(QCCINST));
- if(qcc_inst == NULL) {
+ if (qcc_inst == NULL) {
derr << "Unable to alloc mem for instance struct" << dendl;
this->cleanup();
return false;
this->cleanup();
return false;
}
+ dout(1) << "Get instances num: " << qcc_inst->num_instances << dendl;
+ if (max_requests > qcc_inst->num_instances) {
+ instance_completions.set_capacity(max_requests - qcc_inst->num_instances);
+ }
+ open_instances.set_capacity(qcc_inst->num_instances);
int iter = 0;
//Start Instances
- for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+ for (iter = 0; iter < qcc_inst->num_instances; iter++) {
stat = cpaCyStartInstance(qcc_inst->cy_inst_handles[iter]);
- if(stat != CPA_STATUS_SUCCESS) {
+ if (stat != CPA_STATUS_SUCCESS) {
derr << "Unable to start instance" << dendl;
this->cleanup();
return false;
qcc_os_mem_alloc((void **)&qcc_inst->is_polled,
((int)qcc_inst->num_instances * sizeof(CpaBoolean)));
CpaInstanceInfo2 info;
- for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+ for (iter = 0; iter < qcc_inst->num_instances; iter++) {
qcc_inst->is_polled[iter] = cpaCyInstanceGetInfo2(qcc_inst->cy_inst_handles[iter],
&info) == CPA_STATUS_SUCCESS ? info.isPolled : CPA_FALSE;
}
// Allocate memory structures for all instances
qcc_os_mem_alloc((void **)&qcc_sess,
((int)qcc_inst->num_instances * sizeof(QCCSESS)));
- if(qcc_sess == NULL) {
+ if (qcc_sess == NULL) {
derr << "Unable to allocate memory for session struct" << dendl;
this->cleanup();
return false;
qcc_os_mem_alloc((void **)&qcc_op_mem,
((int)qcc_inst->num_instances * sizeof(QCCOPMEM)));
- if(qcc_sess == NULL) {
+ if (qcc_sess == NULL) {
derr << "Unable to allocate memory for opmem struct" << dendl;
this->cleanup();
return false;
}
- qcc_os_mem_alloc((void **)&cypollthreads,
- ((int)qcc_inst->num_instances * sizeof(pthread_t)));
- if(cypollthreads == NULL) {
- derr << "Unable to allocate memory for pthreads" << dendl;
- this->cleanup();
- return false;
- }
-
//At this point we are only doing an user-space version.
- //To-Do: Maybe a kernel based one
- for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+ for (iter = 0; iter < qcc_inst->num_instances; iter++) {
stat = cpaCySetAddressTranslation(qcc_inst->cy_inst_handles[iter],
qaeVirtToPhysNUMA);
- if(stat == CPA_STATUS_SUCCESS) {
- // Start HW Polling Thread
- // To-Do: Enable epoll & interrupt based later?
- // QccCyStartPoll(iter);
- // Setup the session structures for crypto operation and populate
- // whatever we can now. Rest will be filled in when crypto operation
- // happens.
- qcc_sess[iter].sess_ctx_sz = 0;
- qcc_sess[iter].sess_ctx = NULL;
- qcc_sess[iter].sess_stp_data.sessionPriority = CPA_CY_PRIORITY_NORMAL;
- qcc_sess[iter].sess_stp_data.symOperation = CPA_CY_SYM_OP_CIPHER;
- open_instances.push(iter);
+ if (stat == CPA_STATUS_SUCCESS) {
+ open_instances.push_back(iter);
qcc_op_mem[iter].is_mem_alloc = false;
- qcc_op_mem[iter].op_complete = false;
- qcc_op_mem[iter].op_result = CPA_STATUS_SUCCESS;
- qcc_op_mem[iter].sym_op_data = NULL;
- qcc_op_mem[iter].buff_meta_size = qcc_op_mem[iter].buff_size = 0;
- qcc_op_mem[iter].src_buff_meta = qcc_op_mem[iter].src_buff
- = qcc_op_mem[iter].iv_buff = NULL;
- qcc_op_mem[iter].src_buff_list = NULL;
- qcc_op_mem[iter].src_buff_flat = NULL;
- qcc_op_mem[iter].num_buffers = 1;
+
+ stat = cpaCySymDpRegCbFunc(qcc_inst->cy_inst_handles[iter], symDpCallback);
+ if (stat != CPA_STATUS_SUCCESS) {
+ dout(1) << "Unable to register callback function for instance " << iter << " with status = " << stat << dendl;
+ return false;
+ }
} else {
- derr << "Unable to find address translations of instance " << iter << dendl;
+ dout(1) << "Unable to find address translations of instance " << iter << dendl;
this->cleanup();
return false;
}
}
+
+ qat_poll_thread = make_named_thread("qat_poll", &QccCrypto::poll_instances, this);
+
is_init = true;
dout(10) << "Init complete" << dendl;
return true;
return false;
}
- unsigned int retry = 0;
- while(retry <= QCC_MAX_RETRIES) {
- if(open_instances.size() == qcc_inst->num_instances) {
- break;
- } else {
- retry++;
- }
- dout(5) << "QAT is still busy and cannot free resources yet" << dendl;
- return false;
+ thread_stop = true;
+ if (qat_poll_thread.joinable()) {
+ qat_poll_thread.join();
}
+ my_pool.join();
dout(10) << "Destroying QAT crypto & related memory" << dendl;
int iter = 0;
// Free up op related memory
for (iter =0; iter < qcc_inst->num_instances; iter++) {
- qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff));
- qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff));
- qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_list));
- qcc_os_mem_free((void **)&(qcc_op_mem[iter].src_buff_flat));
- qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data));
+ for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) {
+ qcc_contig_mem_free((void **)&(qcc_op_mem[iter].src_buff[i]));
+ qcc_contig_mem_free((void **)&(qcc_op_mem[iter].iv_buff[i]));
+ qcc_contig_mem_free((void **)&(qcc_op_mem[iter].sym_op_data[i]));
+ }
}
// Free up Session memory
- for(iter = 0; iter < qcc_inst->num_instances; iter++) {
- cpaCySymRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx);
+ for (iter = 0; iter < qcc_inst->num_instances; iter++) {
+ cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[iter], qcc_sess[iter].sess_ctx);
qcc_contig_mem_free((void **)&(qcc_sess[iter].sess_ctx));
}
// Stop QAT Instances
- for(iter = 0; iter < qcc_inst->num_instances; iter++) {
+ for (iter = 0; iter < qcc_inst->num_instances; iter++) {
cpaCyStopInstance(qcc_inst->cy_inst_handles[iter]);
}
qcc_os_mem_free((void **)&qcc_sess);
qcc_os_mem_free((void **)&(qcc_inst->cy_inst_handles));
qcc_os_mem_free((void **)&(qcc_inst->is_polled));
- qcc_os_mem_free((void **)&cypollthreads);
qcc_os_mem_free((void **)&qcc_inst);
//Un-init memory driver and QAT HW
return true;
}
-void QccCrypto::do_crypt(qcc_thread_args *thread_args) {
- auto entry = thread_args->entry;
- qcc_op_mem[entry].op_result = cpaCySymPerformOp(qcc_inst->cy_inst_handles[entry],
- NULL,
- qcc_op_mem[entry].sym_op_data,
- qcc_op_mem[entry].src_buff_list,
- qcc_op_mem[entry].src_buff_list,
- NULL);
- qcc_op_mem[entry].op_complete = true;
- free(thread_args);
-}
-
-bool QccCrypto::perform_op(unsigned char* out, const unsigned char* in,
- size_t size, uint8_t *iv, uint8_t *key, CpaCySymCipherDirection op_type)
+bool QccCrypto::perform_op_batch(unsigned char* out, const unsigned char* in, size_t size,
+ Cpa8U *iv,
+ Cpa8U *key,
+ CpaCySymCipherDirection op_type,
+ optional_yield y)
{
if (!init_called) {
dout(10) << "QAT not intialized yet. Initializing now..." << dendl;
- if(!QccCrypto::init()) {
+ if (!QccCrypto::init(chunk_size, max_requests)) {
derr << "QAT init failed" << dendl;
return false;
}
}
- if(!is_init)
+ if (!is_init)
{
- dout(10) << "QAT not initialized in this instance or init failed with possible error " << (int)init_stat << dendl;
+ dout(10) << "QAT not initialized in this instance or init failed" << dendl;
return is_init;
}
-
- int avail_inst = -1;
- unsigned int retrycount = 0;
- while(retrycount <= QCC_MAX_RETRIES) {
- avail_inst = QccGetFreeInstance();
- if(avail_inst != -1) {
- break;
- } else {
- retrycount++;
- usleep(qcc_sleep_duration);
- }
+ CpaStatus status = CPA_STATUS_SUCCESS;
+ int avail_inst = NON_INSTANCE;
+
+ if (y) {
+ yield_context yield = y.get_yield_context();
+ avail_inst = async_get_instance(yield);
+ } else {
+ auto result = async_get_instance(boost::asio::use_future);
+ avail_inst = result.get();
}
- if(avail_inst == -1) {
- derr << "Unable to get an QAT instance. Failing request" << dendl;
+ if (avail_inst == NON_INSTANCE) {
return false;
}
- dout(15) << "Using inst " << avail_inst << dendl;
- // Start polling threads for this instance
- //QccCyStartPoll(avail_inst);
+ dout(15) << "Using dp_batch inst " << avail_inst << dendl;
- auto sg = make_scope_guard([=] {
+ auto sg = make_scope_guard([this, avail_inst] {
//free up the instance irrespective of the op status
dout(15) << "Completed task under " << avail_inst << dendl;
qcc_op_mem[avail_inst].op_complete = false;
* Hold onto to most of them until destructor is called.
*/
if (qcc_op_mem[avail_inst].is_mem_alloc == false) {
-
- qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherAlgorithm =
- CPA_CY_SYM_CIPHER_AES_CBC;
- qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherKeyLenInBytes =
- AES_256_KEY_SIZE;
-
- // Allocate contig memory for buffers that are independent of the
- // input/output
- stat = cpaCyBufferListGetMetaSize(qcc_inst->cy_inst_handles[avail_inst],
- qcc_op_mem[avail_inst].num_buffers, &(qcc_op_mem[avail_inst].buff_meta_size));
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to get buff meta size" << dendl;
- return false;
- }
-
- // Allocate Buffer List Private metadata
- stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_meta),
- qcc_op_mem[avail_inst].buff_meta_size, 1);
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to allocate private metadata memory" << dendl;
- return false;
- }
-
- // Allocate Buffer List Memory
- qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_list), sizeof(CpaBufferList));
- qcc_os_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff_flat),
- (qcc_op_mem[avail_inst].num_buffers * sizeof(CpaFlatBuffer)));
- if(qcc_op_mem[avail_inst].src_buff_list == NULL || qcc_op_mem[avail_inst].src_buff_flat == NULL) {
- derr << "Unable to allocate bufferlist memory" << dendl;
- return false;
- }
-
- // Allocate IV memory
- stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff), AES_256_IV_LEN);
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to allocate bufferlist memory" << dendl;
- return false;
- }
-
- //Assign src stuff for the operation
- (qcc_op_mem[avail_inst].src_buff_list)->pBuffers = qcc_op_mem[avail_inst].src_buff_flat;
- (qcc_op_mem[avail_inst].src_buff_list)->numBuffers = qcc_op_mem[avail_inst].num_buffers;
- (qcc_op_mem[avail_inst].src_buff_list)->pPrivateMetaData = qcc_op_mem[avail_inst].src_buff_meta;
-
- //Setup OpData
- stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data),
- sizeof(CpaCySymOpData));
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to allocate opdata memory" << dendl;
- return false;
- }
-
- // Assuming op to be encryption for initiation. This will be reset when we
- // exit this block
- qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection =
- CPA_CY_SYM_CIPHER_DIRECTION_ENCRYPT;
- // Allocate Session memory
- stat = cpaCySymSessionCtxGetSize(qcc_inst->cy_inst_handles[avail_inst],
- &(qcc_sess[avail_inst].sess_stp_data), &(qcc_sess[avail_inst].sess_ctx_sz));
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to find session size" << dendl;
- return false;
- }
-
- stat = qcc_contig_mem_alloc((void **)&(qcc_sess[avail_inst].sess_ctx),
- qcc_sess[avail_inst].sess_ctx_sz);
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to allocate contig memory" << dendl;
- return false;
+ for (size_t i = 0; i < MAX_NUM_SYM_REQ_BATCH; i++) {
+ // Allocate IV memory
+ status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].iv_buff[i]), AES_256_IV_LEN, 8);
+ if (status != CPA_STATUS_SUCCESS) {
+ derr << "Unable to allocate iv_buff memory" << dendl;
+ return false;
+ }
+
+ // Allocate src memory
+ status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff[i]), chunk_size, 8);
+ if (status != CPA_STATUS_SUCCESS) {
+ derr << "Unable to allocate src_buff memory" << dendl;
+ return false;
+ }
+
+ //Setup OpData
+ status = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].sym_op_data[i]),
+ sizeof(CpaCySymDpOpData), 8);
+ if (status != CPA_STATUS_SUCCESS) {
+ derr << "Unable to allocate opdata memory" << dendl;
+ return false;
+ }
}
// Set memalloc flag so that we don't go through this exercise again.
qcc_op_mem[avail_inst].is_mem_alloc = true;
- dout(15) << "Instantiation complete for " << avail_inst << dendl;
+ qcc_sess[avail_inst].sess_ctx = nullptr;
+ status = initSession(qcc_inst->cy_inst_handles[avail_inst],
+ &(qcc_sess[avail_inst].sess_ctx),
+ (Cpa8U *)key,
+ op_type);
+ } else {
+ do {
+ cpaCySymDpRemoveSession(qcc_inst->cy_inst_handles[avail_inst], qcc_sess[avail_inst].sess_ctx);
+ status = initSession(qcc_inst->cy_inst_handles[avail_inst],
+ &(qcc_sess[avail_inst].sess_ctx),
+ (Cpa8U *)key,
+ op_type);
+ if (unlikely(status == CPA_STATUS_RETRY)) {
+ dout(1) << "cpaCySymDpRemoveSession and initSession retry" << dendl;
+ }
+ } while (status == CPA_STATUS_RETRY);
}
-
- // Section that runs on every call
- // Identify the operation and assign to session
- qcc_sess[avail_inst].sess_stp_data.cipherSetupData.cipherDirection = op_type;
- qcc_sess[avail_inst].sess_stp_data.cipherSetupData.pCipherKey = (Cpa8U *)key;
-
- stat = cpaCySymInitSession(qcc_inst->cy_inst_handles[avail_inst],
- NULL,
- &(qcc_sess[avail_inst].sess_stp_data),
- qcc_sess[avail_inst].sess_ctx);
- if (stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to init session" << dendl;
+ if (unlikely(status != CPA_STATUS_SUCCESS)) {
+ derr << "Unable to init session with status =" << status << dendl;
return false;
}
- // Allocate actual buffers that will hold data
- if (qcc_op_mem[avail_inst].buff_size != (Cpa32U)size) {
- qcc_contig_mem_free((void **)&(qcc_op_mem[avail_inst].src_buff));
- qcc_op_mem[avail_inst].buff_size = (Cpa32U)size;
- stat = qcc_contig_mem_alloc((void **)&(qcc_op_mem[avail_inst].src_buff),
- qcc_op_mem[avail_inst].buff_size);
- if(stat != CPA_STATUS_SUCCESS) {
- derr << "Unable to allocate contig memory" << dendl;
- return false;
- }
- }
-
- // Copy src & iv into the respective buffers
- memcpy(qcc_op_mem[avail_inst].src_buff, in, size);
- memcpy(qcc_op_mem[avail_inst].iv_buff, iv, AES_256_IV_LEN);
-
- //Assign the reminder of the stuff
- qcc_op_mem[avail_inst].src_buff_flat->dataLenInBytes = qcc_op_mem[avail_inst].buff_size;
- qcc_op_mem[avail_inst].src_buff_flat->pData = qcc_op_mem[avail_inst].src_buff;
+ return symPerformOp(avail_inst,
+ qcc_sess[avail_inst].sess_ctx,
+ in,
+ out,
+ size,
+ reinterpret_cast<Cpa8U*>(iv),
+ AES_256_IV_LEN, y);
+}
- //OpData assignment
- qcc_op_mem[avail_inst].sym_op_data->sessionCtx = qcc_sess[avail_inst].sess_ctx;
- qcc_op_mem[avail_inst].sym_op_data->packetType = CPA_CY_SYM_PACKET_TYPE_FULL;
- qcc_op_mem[avail_inst].sym_op_data->pIv = qcc_op_mem[avail_inst].iv_buff;
- qcc_op_mem[avail_inst].sym_op_data->ivLenInBytes = AES_256_IV_LEN;
- qcc_op_mem[avail_inst].sym_op_data->cryptoStartSrcOffsetInBytes = 0;
- qcc_op_mem[avail_inst].sym_op_data->messageLenToCipherInBytes = qcc_op_mem[avail_inst].buff_size;
+/*
+ * Perform session update
+ */
+CpaStatus QccCrypto::updateSession(CpaCySymSessionCtx sessionCtx,
+ Cpa8U *pCipherKey,
+ CpaCySymCipherDirection cipherDirection) {
+ CpaStatus status = CPA_STATUS_SUCCESS;
+ CpaCySymSessionUpdateData sessionUpdateData = {0};
+
+ sessionUpdateData.flags = CPA_CY_SYM_SESUPD_CIPHER_KEY;
+ sessionUpdateData.flags |= CPA_CY_SYM_SESUPD_CIPHER_DIR;
+ sessionUpdateData.pCipherKey = pCipherKey;
+ sessionUpdateData.cipherDirection = cipherDirection;
+
+ status = cpaCySymUpdateSession(sessionCtx, &sessionUpdateData);
+
+ if (unlikely(status != CPA_STATUS_SUCCESS)) {
+ dout(10) << "cpaCySymUpdateSession failed with status = " << status << dendl;
+ }
- // Perform cipher operation in a thread
- qcc_thread_args* thread_args = new qcc_thread_args();
- thread_args->qccinstance = this;
- thread_args->entry = avail_inst;
+ return status;
+}
- if (pthread_create(&cypollthreads[avail_inst], NULL, crypt_thread, (void *)thread_args) != 0) {
- derr << "Unable to create thread for crypt operation" << dendl;
- return false;
+CpaStatus QccCrypto::initSession(CpaInstanceHandle cyInstHandle,
+ CpaCySymSessionCtx *sessionCtx,
+ Cpa8U *pCipherKey,
+ CpaCySymCipherDirection cipherDirection) {
+ CpaStatus status = CPA_STATUS_SUCCESS;
+ Cpa32U sessionCtxSize = 0;
+ CpaCySymSessionSetupData sessionSetupData;
+ memset(&sessionSetupData, 0, sizeof(sessionSetupData));
+
+ sessionSetupData.sessionPriority = CPA_CY_PRIORITY_NORMAL;
+ sessionSetupData.symOperation = CPA_CY_SYM_OP_CIPHER;
+ sessionSetupData.cipherSetupData.cipherAlgorithm = CPA_CY_SYM_CIPHER_AES_CBC;
+ sessionSetupData.cipherSetupData.cipherKeyLenInBytes = AES_256_KEY_SIZE;
+ sessionSetupData.cipherSetupData.pCipherKey = pCipherKey;
+ sessionSetupData.cipherSetupData.cipherDirection = cipherDirection;
+
+ if (nullptr == *sessionCtx) {
+ status = cpaCySymDpSessionCtxGetSize(cyInstHandle, &sessionSetupData, &sessionCtxSize);
+ if (likely(CPA_STATUS_SUCCESS == status)) {
+ status = qcc_contig_mem_alloc((void **)(sessionCtx), sessionCtxSize);
+ } else {
+ dout(1) << "cpaCySymDpSessionCtxGetSize failed with status = " << status << dendl;
+ }
}
- if (qcc_inst->is_polled[avail_inst] == CPA_TRUE) {
- while (!qcc_op_mem[avail_inst].op_complete) {
- icp_sal_CyPollInstance(qcc_inst->cy_inst_handles[avail_inst], 0);
+ if (likely(CPA_STATUS_SUCCESS == status)) {
+ status = cpaCySymDpInitSession(cyInstHandle,
+ &sessionSetupData,
+ *sessionCtx);
+ if (unlikely(status != CPA_STATUS_SUCCESS)) {
+ dout(1) << "cpaCySymDpInitSession failed with status = " << status << dendl;
}
+ } else {
+ dout(1) << "Session alloc failed with status = " << status << dendl;
}
- pthread_join(cypollthreads[avail_inst], NULL);
+ return status;
+}
- if(qcc_op_mem[avail_inst].op_result != CPA_STATUS_SUCCESS) {
- derr << "Unable to perform crypt operation" << dendl;
- return false;
+template <typename CompletionToken>
+auto QatCrypto::async_perform_op(int avail_inst, std::span<CpaCySymDpOpData*> pOpDataVec, CompletionToken&& token) {
+ CpaStatus status = CPA_STATUS_SUCCESS;
+ using boost::asio::async_completion;
+ using Signature = void(CpaStatus);
+ async_completion<CompletionToken, Signature> init(token);
+ auto ex = boost::asio::get_associated_executor(init.completion_handler);
+ completion_handler = [this, ex, handler = init.completion_handler](CpaStatus stat) {
+ boost::asio::post(ex, std::bind(handler, stat));
+ };
+
+ count = pOpDataVec.size();
+ poll_inst_cv.notify_one();
+ status = cpaCySymDpEnqueueOpBatch(pOpDataVec.size(), pOpDataVec.data(), CPA_TRUE);
+
+ if (status != CPA_STATUS_SUCCESS) {
+ completion_handler(status);
}
+ return init.result.get();
+}
- //Copy data back to out buffer
- memcpy(out, qcc_op_mem[avail_inst].src_buff, size);
- //Always cleanup memory holding user-data at the end
- memset(qcc_op_mem[avail_inst].iv_buff, 0, AES_256_IV_LEN);
- memset(qcc_op_mem[avail_inst].src_buff, 0, qcc_op_mem[avail_inst].buff_size);
+bool QccCrypto::symPerformOp(int avail_inst,
+ CpaCySymSessionCtx sessionCtx,
+ const Cpa8U *pSrc,
+ Cpa8U *pDst,
+ Cpa32U size,
+ Cpa8U *pIv,
+ Cpa32U ivLen,
+ optional_yield y) {
+ CpaStatus status = CPA_STATUS_SUCCESS;
+ Cpa32U one_batch_size = chunk_size * MAX_NUM_SYM_REQ_BATCH;
+ Cpa32U iv_index = 0;
+ size_t perform_retry_num = 0;
+ for (Cpa32U off = 0; off < size; off += one_batch_size) {
+ QatCrypto helper;
+ boost::container::static_vector<CpaCySymDpOpData*, MAX_NUM_SYM_REQ_BATCH> pOpDataVec;
+ for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) {
+ CpaCySymDpOpData *pOpData = qcc_op_mem[avail_inst].sym_op_data[i];
+ Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i];
+ Cpa8U *pIvBuffer = qcc_op_mem[avail_inst].iv_buff[i];
+ Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset;
+ // copy source into buffer
+ memcpy(pSrcBuffer, pSrc + offset, process_size);
+ // copy IV into buffer
+ memcpy(pIvBuffer, &pIv[iv_index * ivLen], ivLen);
+ iv_index++;
+
+ //pOpData assignment
+ pOpData->thisPhys = qaeVirtToPhysNUMA(pOpData);
+ pOpData->instanceHandle = qcc_inst->cy_inst_handles[avail_inst];
+ pOpData->sessionCtx = sessionCtx;
+ pOpData->pCallbackTag = &helper;
+ pOpData->cryptoStartSrcOffsetInBytes = 0;
+ pOpData->messageLenToCipherInBytes = process_size;
+ pOpData->iv = qaeVirtToPhysNUMA(pIvBuffer);
+ pOpData->pIv = pIvBuffer;
+ pOpData->ivLenInBytes = ivLen;
+ pOpData->srcBuffer = qaeVirtToPhysNUMA(pSrcBuffer);
+ pOpData->srcBufferLen = process_size;
+ pOpData->dstBuffer = qaeVirtToPhysNUMA(pSrcBuffer);
+ pOpData->dstBufferLen = process_size;
+
+ pOpDataVec.push_back(pOpData);
+ }
- return true;
+ do {
+ poll_retry_num = RETRY_MAX_NUM;
+ if (y) {
+ yield_context yield = y.get_yield_context();
+ status = helper.async_perform_op(avail_inst, std::span<CpaCySymDpOpData*>(pOpDataVec), yield);
+ } else {
+ auto result = helper.async_perform_op(avail_inst, std::span<CpaCySymDpOpData*>(pOpDataVec), boost::asio::use_future);
+ status = result.get();
+ }
+ if (status == CPA_STATUS_RETRY) {
+ if (++perform_retry_num > 3) {
+ cpaCySymDpPerformOpNow(qcc_inst->cy_inst_handles[avail_inst]);
+ return false;
+ }
+ }
+ } while (status == CPA_STATUS_RETRY);
+
+ if (likely(CPA_STATUS_SUCCESS == status)) {
+ for (Cpa32U offset = off, i = 0; offset < size && i < MAX_NUM_SYM_REQ_BATCH; offset += chunk_size, i++) {
+ Cpa8U *pSrcBuffer = qcc_op_mem[avail_inst].src_buff[i];
+ Cpa32U process_size = offset + chunk_size <= size ? chunk_size : size - offset;
+ memcpy(pDst + offset, pSrcBuffer, process_size);
+ }
+ } else {
+ dout(1) << "async_perform_op failed with status = " << status << dendl;
+ break;
+ }
+ }
+
+ Cpa32U max_used_buffer_num = iv_index > MAX_NUM_SYM_REQ_BATCH ? MAX_NUM_SYM_REQ_BATCH : iv_index;
+ for (Cpa32U i = 0; i < max_used_buffer_num; i++) {
+ memset(qcc_op_mem[avail_inst].src_buff[i], 0, chunk_size);
+ memset(qcc_op_mem[avail_inst].iv_buff[i], 0, ivLen);
+ }
+ return (CPA_STATUS_SUCCESS == status);
}
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
+#include <thread>
+#include <mutex>
#include <queue>
+#include <memory>
+#include "common/async/yield_context.h"
+#include <memory>
+#include "common/ceph_mutex.h"
+#include <vector>
+#include <functional>
+#include <span>
+#include "boost/circular_buffer.hpp"
+#include "boost/asio/thread_pool.hpp"
extern "C" {
#include "cpa.h"
+#include "cpa_cy_sym_dp.h"
+#include "cpa_cy_im.h"
#include "lac/cpa_cy_sym.h"
#include "lac/cpa_cy_im.h"
#include "qae_mem.h"
}
class QccCrypto {
+ friend class QatCrypto;
+ size_t chunk_size{0};
+ size_t max_requests{0};
+
+ boost::asio::thread_pool my_pool{1};
+
+ boost::circular_buffer<std::function<void(int)>> instance_completions;
+
+ template <typename CompletionToken>
+ auto async_get_instance(CompletionToken&& token);
public:
CpaCySymCipherDirection qcc_op_type;
- QccCrypto() {};
- ~QccCrypto() {};
+ QccCrypto() {};
+ ~QccCrypto() { destroy(); };
- bool init();
+ bool init(const size_t chunk_size, const size_t max_requests);
bool destroy();
- bool perform_op(unsigned char* out, const unsigned char* in, size_t size,
- uint8_t *iv,
- uint8_t *key,
- CpaCySymCipherDirection op_type);
+ bool perform_op_batch(unsigned char* out, const unsigned char* in, size_t size,
+ Cpa8U *iv,
+ Cpa8U *key,
+ CpaCySymCipherDirection op_type,
+ optional_yield y);
private:
-
// Currently only supporting AES_256_CBC.
// To-Do: Needs to be expanded
static const size_t AES_256_IV_LEN = 16;
static const size_t AES_256_KEY_SIZE = 32;
- static const size_t QCC_MAX_RETRIES = 5000;
+ static const size_t MAX_NUM_SYM_REQ_BATCH = 32;
/*
* Struct to hold an instance of QAT to handle the crypto operations. These
* single crypto or multi-buffer crypto.
*/
struct QCCSESS {
- CpaCySymSessionSetupData sess_stp_data;
Cpa32U sess_ctx_sz;
CpaCySymSessionCtx sess_ctx;
} *qcc_sess;
// Op common items
bool is_mem_alloc;
bool op_complete;
- CpaStatus op_result;
- CpaCySymOpData *sym_op_data;
- Cpa32U buff_meta_size;
- Cpa32U num_buffers;
- Cpa32U buff_size;
-
- //Src data items
- Cpa8U *src_buff_meta;
- CpaBufferList *src_buff_list;
- CpaFlatBuffer *src_buff_flat;
- Cpa8U *src_buff;
- Cpa8U *iv_buff;
+ CpaCySymDpOpData *sym_op_data[MAX_NUM_SYM_REQ_BATCH];
+ Cpa8U *src_buff[MAX_NUM_SYM_REQ_BATCH];
+ Cpa8U *iv_buff[MAX_NUM_SYM_REQ_BATCH];
} *qcc_op_mem;
- //QAT HW polling thread input structure
- struct qcc_thread_args {
- QccCrypto* qccinstance;
- int entry;
- };
-
-
- /*
- * Function to handle the crypt operation. Will run while the main thread
- * runs the polling function on the instance doing the op
- */
- void do_crypt(qcc_thread_args *thread_args);
-
/*
* Handle queue with free instances to handle op
*/
- std::queue<int> open_instances;
- int QccGetFreeInstance();
+ boost::circular_buffer<int> open_instances;
void QccFreeInstance(int entry);
+ std::thread qat_poll_thread;
+ bool thread_stop{false};
/*
* Contiguous Memory Allocator and de-allocator. We are using the usdm
}
std::atomic<bool> is_init = { false };
- CpaStatus init_stat, stat;
/*
* Function to cleanup memory if constructor fails
* associated callbacks. For synchronous operation (like this one), QAT
* library creates an internal callback for the operation.
*/
- static void* crypt_thread(void* entry);
- CpaStatus QccCyStartPoll(int entry);
- void poll_instance(int entry);
+ void poll_instances(void);
+ std::atomic<size_t> poll_retry_num{0};
+
+ bool symPerformOp(int avail_inst,
+ CpaCySymSessionCtx sessionCtx,
+ const Cpa8U *pSrc,
+ Cpa8U *pDst,
+ Cpa32U size,
+ Cpa8U *pIv,
+ Cpa32U ivLen,
+ optional_yield y);
+
+ CpaStatus initSession(CpaInstanceHandle cyInstHandle,
+ CpaCySymSessionCtx *sessionCtx,
+ Cpa8U *pCipherKey,
+ CpaCySymCipherDirection cipherDirection);
+
+ CpaStatus updateSession(CpaCySymSessionCtx sessionCtx,
+ Cpa8U *pCipherKey,
+ CpaCySymCipherDirection cipherDirection);
+
+
+};
+
+class QatCrypto {
+ private:
+ std::function<void(CpaStatus stat)> completion_handler;
+ std::atomic<std::size_t> count;
+ public:
+ void complete() {
+ if (--count == 0) {
+ completion_handler(CPA_STATUS_SUCCESS);
+ }
+ return ;
+ }
+
+ QatCrypto () : count(0) {}
+ QatCrypto (const QatCrypto &qat) = delete;
+ QatCrypto (QatCrypto &&qat) = delete;
+ void operator=(const QatCrypto &qat) = delete;
+ void operator=(QatCrypto &&qat) = delete;
- pthread_t *cypollthreads;
- static const size_t qcc_sleep_duration = 2;
+ template <typename CompletionToken>
+ auto async_perform_op(int avail_inst, std::span<CpaCySymDpOpData*> pOpDataVec, CompletionToken&& token);
};
#endif //QCCCRYPTO_H
}
-CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct)
+CryptoAccelRef get_crypto_accel(const DoutPrefixProvider* dpp, CephContext *cct, const size_t chunk_size, const size_t max_requests)
{
CryptoAccelRef ca_impl = nullptr;
stringstream ss;
ldpp_dout(dpp, -1) << __func__ << " cannot load crypto accelerator of type " << crypto_accel_type << dendl;
return nullptr;
}
- int err = factory->factory(&ca_impl, &ss);
+ int err = factory->factory(&ca_impl, &ss, chunk_size, max_requests);
if (err) {
ldpp_dout(dpp, -1) << __func__ << " factory return error " << err <<
" with description: " << ss.str() << dendl;
static const size_t AES_256_KEYSIZE = 256 / 8;
static const size_t AES_256_IVSIZE = 128 / 8;
static const size_t CHUNK_SIZE = 4096;
+ static const size_t QAT_MIN_SIZE = 65536;
const DoutPrefixProvider* dpp;
private:
static const uint8_t IV[AES_256_IVSIZE];
size_t size,
off_t stream_offset,
const unsigned char (&key)[AES_256_KEYSIZE],
- bool encrypt)
+ bool encrypt,
+ optional_yield y)
{
static std::atomic<bool> failed_to_get_crypto(false);
CryptoAccelRef crypto_accel;
if (! failed_to_get_crypto.load())
{
- crypto_accel = get_crypto_accel(this->dpp, cct);
+ static size_t max_requests = g_ceph_context->_conf->rgw_thread_pool_size;
+ crypto_accel = get_crypto_accel(this->dpp, cct, CHUNK_SIZE, max_requests);
if (!crypto_accel)
failed_to_get_crypto = true;
}
- bool result = true;
- unsigned char iv[AES_256_IVSIZE];
- for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) {
- size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset;
- prepare_iv(iv, stream_offset + offset);
- if (crypto_accel != nullptr) {
- if (encrypt) {
- result = crypto_accel->cbc_encrypt(out + offset, in + offset,
- process_size, iv, key);
+ bool result = false;
+ static std::string accelerator = cct->_conf->plugin_crypto_accelerator;
+ if (accelerator == "crypto_qat" && crypto_accel != nullptr && size >= QAT_MIN_SIZE) {
+ // now, batch mode is only for QAT plugin
+ size_t iv_num = size / CHUNK_SIZE;
+ if (size % CHUNK_SIZE) ++iv_num;
+ auto iv = new unsigned char[iv_num][AES_256_IVSIZE];
+ for (size_t offset = 0, i = 0; offset < size; offset += CHUNK_SIZE, i++) {
+ prepare_iv(iv[i], stream_offset + offset);
+ }
+ if (encrypt) {
+ result = crypto_accel->cbc_encrypt_batch(out, in, size, iv, key, y);
+ } else {
+ result = crypto_accel->cbc_decrypt_batch(out, in, size, iv, key, y);
+ }
+ delete[] iv;
+ }
+ if (result == false) {
+ // If QAT don't have free instance, we can fall back to this
+ result = true;
+ unsigned char iv[AES_256_IVSIZE];
+ for (size_t offset = 0; result && (offset < size); offset += CHUNK_SIZE) {
+ size_t process_size = offset + CHUNK_SIZE <= size ? CHUNK_SIZE : size - offset;
+ prepare_iv(iv, stream_offset + offset);
+ if (crypto_accel != nullptr && accelerator != "crypto_qat") {
+ if (encrypt) {
+ result = crypto_accel->cbc_encrypt(out + offset, in + offset,
+ process_size, iv, key, y);
+ } else {
+ result = crypto_accel->cbc_decrypt(out + offset, in + offset,
+ process_size, iv, key, y);
+ }
} else {
- result = crypto_accel->cbc_decrypt(out + offset, in + offset,
- process_size, iv, key);
+ result = cbc_transform(
+ out + offset, in + offset, process_size,
+ iv, key, encrypt);
}
- } else {
- result = cbc_transform(
- out + offset, in + offset, process_size,
- iv, key, encrypt);
}
}
return result;
off_t in_ofs,
size_t size,
bufferlist& output,
- off_t stream_offset)
+ off_t stream_offset,
+ optional_yield y)
{
bool result = false;
size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE;
result = cbc_transform(buf_raw,
input_raw + in_ofs,
aligned_size,
- stream_offset, key, true);
+ stream_offset, key, true, y);
if (result && (unaligned_rest_size > 0)) {
/* remainder to encrypt */
if (aligned_size % CHUNK_SIZE > 0) {
off_t in_ofs,
size_t size,
bufferlist& output,
- off_t stream_offset)
+ off_t stream_offset,
+ optional_yield y)
{
bool result = false;
size_t aligned_size = size / AES_256_IVSIZE * AES_256_IVSIZE;
result = cbc_transform(buf_raw,
input_raw + in_ofs,
aligned_size,
- stream_offset, key, false);
+ stream_offset, key, false, y);
if (result && unaligned_rest_size > 0) {
/* remainder to decrypt */
if (aligned_size % CHUNK_SIZE > 0) {
RGWGetObj_BlockDecrypt::RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp,
CephContext* cct,
RGWGetObj_Filter* next,
- std::unique_ptr<BlockCrypt> crypt)
+ std::unique_ptr<BlockCrypt> crypt,
+ optional_yield y)
:
RGWGetObj_Filter(next),
dpp(dpp),
enc_begin_skip(0),
ofs(0),
end(0),
- cache()
+ cache(),
+ y(y)
{
block_size = this->crypt->get_block_size();
}
int RGWGetObj_BlockDecrypt::process(bufferlist& in, size_t part_ofs, size_t size)
{
bufferlist data;
- if (!crypt->decrypt(in, 0, size, data, part_ofs)) {
+ if (!crypt->decrypt(in, 0, size, data, part_ofs, y)) {
return -ERR_INTERNAL_ERROR;
}
off_t send_size = size - enc_begin_skip;
RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp,
CephContext* cct,
rgw::sal::DataProcessor *next,
- std::unique_ptr<BlockCrypt> crypt)
+ std::unique_ptr<BlockCrypt> crypt,
+ optional_yield y)
: Pipe(next),
dpp(dpp),
cct(cct),
crypt(std::move(crypt)),
- block_size(this->crypt->get_block_size())
+ block_size(this->crypt->get_block_size()),
+ y(y)
{
}
if (proc_size > 0) {
bufferlist in, out;
cache.splice(0, proc_size, &in);
- if (!crypt->encrypt(in, 0, proc_size, out, logical_offset)) {
+ if (!crypt->encrypt(in, 0, proc_size, out, logical_offset, y)) {
return -ERR_INTERNAL_ERROR;
}
int r = Pipe::process(std::move(out), logical_offset);
#include <rgw/rgw_rest.h>
#include <rgw/rgw_rest_s3.h>
#include "rgw_putobj.h"
+#include "common/async/yield_context.h"
/**
* \brief Interface for block encryption methods
off_t in_ofs,
size_t size,
bufferlist& output,
- off_t stream_offset) = 0;
+ off_t stream_offset,
+ optional_yield y) = 0;
/**
* Decrypts data.
off_t in_ofs,
size_t size,
bufferlist& output,
- off_t stream_offset) = 0;
+ off_t stream_offset,
+ optional_yield y) = 0;
};
static const size_t AES_256_KEYSIZE = 256 / 8;
off_t end; /**< stream offset of last byte that is requested */
bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */
size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
+ optional_yield y;
int process(bufferlist& cipher, size_t part_ofs, size_t size);
RGWGetObj_BlockDecrypt(const DoutPrefixProvider *dpp,
CephContext* cct,
RGWGetObj_Filter* next,
- std::unique_ptr<BlockCrypt> crypt);
+ std::unique_ptr<BlockCrypt> crypt,
+ optional_yield y);
virtual ~RGWGetObj_BlockDecrypt();
virtual int fixup_range(off_t& bl_ofs,
for operations when enough data is accumulated */
bufferlist cache; /**< stores extra data that could not (yet) be processed by BlockCrypt */
const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
+ optional_yield y;
public:
RGWPutObj_BlockEncrypt(const DoutPrefixProvider *dpp,
CephContext* cct,
rgw::sal::DataProcessor *next,
- std::unique_ptr<BlockCrypt> crypt);
+ std::unique_ptr<BlockCrypt> crypt,
+ optional_yield y);
int process(bufferlist&& data, uint64_t logical_offset) override;
}; /* RGWPutObj_BlockEncrypt */
res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses);
if (res == 0) {
if (block_crypt != nullptr) {
- auto f = std::make_unique<RGWGetObj_BlockDecrypt>(s, s->cct, cb, std::move(block_crypt));
+ auto f = std::make_unique<RGWGetObj_BlockDecrypt>(s, s->cct, cb, std::move(block_crypt), s->yield);
if (manifest_bl != nullptr) {
res = f->read_manifest(this, *manifest_bl);
if (res == 0) {
res = rgw_s3_prepare_decrypt(s, attrs, &block_crypt, crypt_http_responses_unused);
if (res == 0) {
if (block_crypt != nullptr) {
- auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt)));
- //RGWGetObj_BlockDecrypt* f = new RGWGetObj_BlockDecrypt(s->cct, cb, std::move(block_crypt));
+ auto f = std::unique_ptr<RGWGetObj_BlockDecrypt>(new RGWGetObj_BlockDecrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
if (f != nullptr) {
if (manifest_bl != nullptr) {
res = f->read_manifest(this, *manifest_bl);
* We use crypto mode that configured as if we were decrypting. */
res = rgw_s3_prepare_decrypt(s, obj->get_attrs(), &block_crypt, crypt_http_responses);
if (res == 0 && block_crypt != nullptr)
- filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+ filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
}
/* it is ok, to not have encryption at all */
}
std::unique_ptr<BlockCrypt> block_crypt;
res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt, crypt_http_responses);
if (res == 0 && block_crypt != nullptr) {
- filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+ filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
}
}
return res;
int res = rgw_s3_prepare_encrypt(s, attrs, &block_crypt,
crypt_http_responses);
if (res == 0 && block_crypt != nullptr) {
- filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt)));
+ filter->reset(new RGWPutObj_BlockEncrypt(s, s->cct, cb, std::move(block_crypt), s->yield));
}
return res;
}