From aba2ac13fcf8d2c57b7dce08e757387812ee5bd6 Mon Sep 17 00:00:00 2001 From: Feng Hualong Date: Wed, 16 Feb 2022 14:01:17 +0800 Subject: [PATCH] common/compressor: fix the issue that cannot processed concurrently Now, one session cannot support concurrent and it will lead to crash. So there are mutil session using. At same time, it also can improve the performance. Fixes: https://tracker.ceph.com/issues/54361 Signed-off-by: Feng Hualong --- src/common/options/global.yaml.in | 5 + src/compressor/CMakeLists.txt | 1 + src/compressor/Compressor.cc | 4 + src/compressor/Compressor.h | 2 +- src/compressor/QatAccel.cc | 149 +++++++++++++++++++++++++----- src/compressor/QatAccel.h | 20 +++- 6 files changed, 151 insertions(+), 30 deletions(-) diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 73db03b637e71..e4e271a1b12e6 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -761,6 +761,11 @@ options: desc: Enable Intel QAT acceleration support for compression if available default: false with_legacy: true +- name: qat_compressor_session_max_number + type: uint + level: advanced + desc: Set the maximum number of session within Qatzip when using QAT compressor + default: 256 - name: plugin_crypto_accelerator type: str level: advanced diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt index 51dd66bf24a7a..0155e9e530a6f 100644 --- a/src/compressor/CMakeLists.txt +++ b/src/compressor/CMakeLists.txt @@ -31,6 +31,7 @@ if(HAVE_BROTLI) endif() add_library(compressor STATIC $) +target_link_libraries(compressor PRIVATE compressor_objs) set(ceph_compressor_libs ceph_snappy diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index fa0f052f69b15..8b859a8a5b699 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -26,6 +26,10 @@ namespace TOPNSPC { +#ifdef HAVE_QATZIP + QatAccel Compressor::qat_accel; +#endif + const char* Compressor::get_comp_alg_name(int a) { auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 0a45a990a87f1..ddf0171f204c9 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -72,7 +72,7 @@ public: #ifdef HAVE_QATZIP bool qat_enabled; - QatAccel qat_accel; + static QatAccel qat_accel; #endif static const char* get_comp_alg_name(int a); diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc index e9b8d45311325..6c6a5e45b711c 100644 --- a/src/compressor/QatAccel.cc +++ b/src/compressor/QatAccel.cc @@ -12,58 +12,148 @@ * */ #include + +#include "common/ceph_context.h" +#include "common/common_init.h" +#include "common/debug.h" +#include "common/dout.h" +#include "common/errno.h" #include "QatAccel.h" +// ----------------------------------------------------------------------------- +#define dout_context g_ceph_context +#define dout_subsys ceph_subsys_compressor +#undef dout_prefix +#define dout_prefix _prefix(_dout) + +static std::ostream& _prefix(std::ostream* _dout) +{ + return *_dout << "QatAccel: "; +} +// ----------------------------------------------------------------------------- + void QzSessionDeleter::operator() (struct QzSession_S *session) { - if (NULL != session->internal) { - qzTeardownSession(session); - qzClose(session); - } + qzTeardownSession(session); delete session; } /* Estimate data expansion after decompression */ static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200}; -QatAccel::QatAccel() { - session.reset(new struct QzSession_S); - memset(session.get(), 0, sizeof(struct QzSession_S)); -} - -QatAccel::~QatAccel() {} - -bool QatAccel::init(const std::string &alg) { - QzSessionParams_T params = {(QzHuffmanHdr_T)0,}; +static bool get_qz_params(const std::string &alg, QzSessionParams_T ¶ms) { int rc; - rc = qzGetDefaults(¶ms); if (rc != QZ_OK) return false; params.direction = QZ_DIR_BOTH; - if (alg == "zlib") + params.is_busy_polling = true; + if (alg == "zlib") { params.comp_algorithm = QZ_DEFLATE; - else + params.data_fmt = QZ_DEFLATE_GZIP_EXT; + } + else { + // later, there also has lz4. return false; + } rc = qzSetDefaults(¶ms); if (rc != QZ_OK) return false; + return true; +} +static bool setup_session(QatAccel::session_ptr &session, QzSessionParams_T ¶ms) { + int rc; rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT); - if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW) + if (rc != QZ_OK && rc != QZ_DUPLICATE) return false; - rc = qzSetupSession(session.get(), ¶ms); - if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) { - qzTeardownSession(session.get()); - qzClose(session.get()); + if (rc != QZ_OK) { return false; } + return true; +} + +// put the session back to the session pool in a RAII manner +struct cached_session_t { + cached_session_t(QatAccel* accel, QatAccel::session_ptr&& sess) + : accel{accel}, session{std::move(sess)} {} + + ~cached_session_t() { + std::scoped_lock lock{accel->mutex}; + // if the cache size is still under its upper bound, the current session is put into + // accel->sessions. otherwise it's released right + uint64_t sessions_num = g_ceph_context->_conf.get_val("qat_compressor_session_max_number"); + if (accel->sessions.size() < sessions_num) { + accel->sessions.push_back(std::move(session)); + } + } + struct QzSession_S* get() { + assert(static_cast(session)); + return session.get(); + } + + QatAccel* accel; + QatAccel::session_ptr session; +}; + +QatAccel::session_ptr QatAccel::get_session() { + { + std::scoped_lock lock{mutex}; + if (!sessions.empty()) { + auto session = std::move(sessions.back()); + sessions.pop_back(); + return session; + } + } + + // If there are no available session to use, we try allocate a new + // session. + QzSessionParams_T params = {(QzHuffmanHdr_T)0,}; + session_ptr session(new struct QzSession_S()); + memset(session.get(), 0, sizeof(struct QzSession_S)); + if (get_qz_params(alg_name, params) && setup_session(session, params)) { + return session; + } else { + return nullptr; + } +} + +QatAccel::QatAccel() {} + +QatAccel::~QatAccel() { + // First, we should uninitialize all QATzip session that disconnects all session + // from a hardware instance and deallocates buffers. + sessions.clear(); + // Then we close the connection with QAT. + // where the value of the parameter passed to qzClose() does not matter. as long as + // it is not nullptr. + qzClose((QzSession_T*)1); +} + +bool QatAccel::init(const std::string &alg) { + std::scoped_lock lock(mutex); + if (!alg_name.empty()) { + return true; + } + + dout(15) << "First use for QAT compressor" << dendl; + if (alg != "zlib") { + return false; + } + + alg_name = alg; return true; } int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional &compressor_message) { + auto s = get_session(); // get a session from the pool + if (!s) { + return -1; // session initialization failed + } + auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction + for (auto &i : in.buffers()) { const unsigned char* c_in = (unsigned char*) i.c_str(); unsigned int len = i.length(); @@ -88,12 +178,18 @@ int QatAccel::decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional compressor_message) { + auto s = get_session(); // get a session from the pool + if (!s) { + return -1; // session initialization failed + } + auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction + unsigned int ratio_idx = 0; bool read_more = false; bool joint = false; int rc = 0; bufferlist tmp; - size_t remaining = MIN(p.get_remaining(), compressed_len); + size_t remaining = std::min(p.get_remaining(), compressed_len); while (remaining) { if (p.end()) { @@ -106,7 +202,9 @@ int QatAccel::decompress(bufferlist::const_iterator &p, if (read_more) tmp.append(cur_ptr.c_str(), len); len = tmp.length(); + tmp.rebuild_page_aligned(); } + unsigned int out_len = len * expansion_ratio[ratio_idx]; bufferptr ptr = buffer::create_small_page_aligned(out_len); @@ -117,7 +215,8 @@ int QatAccel::decompress(bufferlist::const_iterator &p, if (rc == QZ_DATA_ERROR) { if (!joint) { tmp.append(cur_ptr.c_str(), cur_ptr.length()); - p += remaining; + p += cur_ptr.length(); + remaining -= cur_ptr.length(); joint = true; } read_more = true; @@ -137,8 +236,8 @@ int QatAccel::decompress(bufferlist::const_iterator &p, read_more = false; } - p += remaining; - remaining -= len; + p += cur_ptr.length(); + remaining -= cur_ptr.length(); dst.append(ptr, 0, out_len); } diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h index ff99e200046f4..33ad588ad9fde 100644 --- a/src/compressor/QatAccel.h +++ b/src/compressor/QatAccel.h @@ -15,20 +15,23 @@ #ifndef CEPH_QATACCEL_H #define CEPH_QATACCEL_H -#include #include +#include +#include +#include +#include + #include "include/buffer.h" -extern "C" struct QzSession_S; //struct QzSession_S comes from QAT libraries +extern "C" struct QzSession_S; // typedef struct QzSession_S QzSession_T; struct QzSessionDeleter { void operator() (struct QzSession_S *session); }; class QatAccel { - std::unique_ptr session; - public: + using session_ptr = std::unique_ptr; QatAccel(); ~QatAccel(); @@ -37,6 +40,15 @@ class QatAccel { int compress(const bufferlist &in, bufferlist &out, boost::optional &compressor_message); int decompress(const bufferlist &in, bufferlist &out, boost::optional compressor_message); int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional compressor_message); + + private: + // get a session from the pool or create a new one. returns null if session init fails + session_ptr get_session(); + + friend struct cached_session_t; + std::vector sessions; + std::mutex mutex; + std::string alg_name; }; #endif -- 2.39.5