]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/compressor: fix the issue that cannot processed concurrently 45050/head 46015/head
authorFeng Hualong <hualong.feng@intel.com>
Wed, 16 Feb 2022 06:01:17 +0000 (14:01 +0800)
committerFeng Hualong <hualong.feng@intel.com>
Mon, 21 Mar 2022 01:22:57 +0000 (09:22 +0800)
Now, one session cannot support concurrent and it will lead to crash.
So there are mutil session using. At same time, it also can improve
the performance.

Fixes: https://tracker.ceph.com/issues/54361
Signed-off-by: Feng Hualong <hualong.feng@intel.com>
src/common/options/global.yaml.in
src/compressor/CMakeLists.txt
src/compressor/Compressor.cc
src/compressor/Compressor.h
src/compressor/QatAccel.cc
src/compressor/QatAccel.h

index 73db03b637e71f868ba5298008b23d5bfe9f2364..e4e271a1b12e612ef7e1232e5b70c65b8b221a03 100644 (file)
@@ -761,6 +761,11 @@ options:
   desc: Enable Intel QAT acceleration support for compression if available
   default: false
   with_legacy: true
+- name: qat_compressor_session_max_number
+  type: uint
+  level: advanced
+  desc: Set the maximum number of session within Qatzip when using QAT compressor
+  default: 256
 - name: plugin_crypto_accelerator
   type: str
   level: advanced
index 51dd66bf24a7aac75ab4aeb0edfe8af3b15d50b0..0155e9e530a6f0995852843d5b4a9ff747d51ff1 100644 (file)
@@ -31,6 +31,7 @@ if(HAVE_BROTLI)
 endif()
 
 add_library(compressor STATIC $<TARGET_OBJECTS:compressor_objs>)
+target_link_libraries(compressor PRIVATE compressor_objs)
 
 set(ceph_compressor_libs
     ceph_snappy
index fa0f052f69b15445f8637434015f9ed39db4dff0..8b859a8a5b699de89c39dc0e935f21211df2c2b8 100644 (file)
 
 namespace TOPNSPC {
 
+#ifdef HAVE_QATZIP
+  QatAccel Compressor::qat_accel;
+#endif
+
 const char* Compressor::get_comp_alg_name(int a) {
 
   auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
index 0a45a990a87f149e3dd140c8213edfb403c950c2..ddf0171f204c94c9756852074711d6e45500ed12 100644 (file)
@@ -72,7 +72,7 @@ public:
 
 #ifdef HAVE_QATZIP
   bool qat_enabled;
-  QatAccel qat_accel;
+  static QatAccel qat_accel;
 #endif
 
   static const char* get_comp_alg_name(int a);
index e9b8d453113251f1d73315ea118c995d5dbea093..6c6a5e45b711caa274e21d40639d2b0fe9b196fa 100644 (file)
  *
  */
 #include <qatzip.h>
+
+#include "common/ceph_context.h"
+#include "common/common_init.h"
+#include "common/debug.h"
+#include "common/dout.h"
+#include "common/errno.h"
 #include "QatAccel.h"
 
+// -----------------------------------------------------------------------------
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_compressor
+#undef dout_prefix
+#define dout_prefix _prefix(_dout)
+
+static std::ostream& _prefix(std::ostream* _dout)
+{
+  return *_dout << "QatAccel: ";
+}
+// -----------------------------------------------------------------------------
+
 void QzSessionDeleter::operator() (struct QzSession_S *session) {
-  if (NULL != session->internal) {
-    qzTeardownSession(session);
-    qzClose(session);
-  }
+  qzTeardownSession(session);
   delete session;
 }
 
 /* Estimate data expansion after decompression */
 static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
 
-QatAccel::QatAccel() {
-  session.reset(new struct QzSession_S);
-  memset(session.get(), 0, sizeof(struct QzSession_S));
-}
-
-QatAccel::~QatAccel() {}
-
-bool QatAccel::init(const std::string &alg) {
-  QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
   int rc;
-
   rc = qzGetDefaults(&params);
   if (rc != QZ_OK)
     return false;
   params.direction = QZ_DIR_BOTH;
-  if (alg == "zlib")
+  params.is_busy_polling = true;
+  if (alg == "zlib") {
     params.comp_algorithm = QZ_DEFLATE;
-  else
+    params.data_fmt = QZ_DEFLATE_GZIP_EXT;
+  }
+  else {
+    // later, there also has lz4.
     return false;
+  }
 
   rc = qzSetDefaults(&params);
   if (rc != QZ_OK)
       return false;
+  return true;
+}
 
+static bool setup_session(QatAccel::session_ptr &session, QzSessionParams_T &params) {
+  int rc;
   rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
-  if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
+  if (rc != QZ_OK && rc != QZ_DUPLICATE)
     return false;
-
   rc = qzSetupSession(session.get(), &params);
-  if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
-    qzTeardownSession(session.get());
-    qzClose(session.get());
+  if (rc != QZ_OK) {
     return false;
   }
+  return true;
+}
+
+// put the session back to the session pool in a RAII manner
+struct cached_session_t {
+  cached_session_t(QatAccel* accel, QatAccel::session_ptr&& sess)
+    : accel{accel}, session{std::move(sess)} {}
+
+  ~cached_session_t() {
+    std::scoped_lock lock{accel->mutex};
+    // if the cache size is still under its upper bound, the current session is put into
+    // accel->sessions. otherwise it's released right
+    uint64_t sessions_num = g_ceph_context->_conf.get_val<uint64_t>("qat_compressor_session_max_number");
+    if (accel->sessions.size() < sessions_num) {
+      accel->sessions.push_back(std::move(session));
+    }
+  }
 
+  struct QzSession_S* get() {
+    assert(static_cast<bool>(session));
+    return session.get();
+  }
+
+  QatAccel* accel;
+  QatAccel::session_ptr session;
+};
+
+QatAccel::session_ptr QatAccel::get_session() {
+  {
+    std::scoped_lock lock{mutex};
+    if (!sessions.empty()) {
+      auto session = std::move(sessions.back());
+      sessions.pop_back();
+      return session;
+    }
+  }
+
+  // If there are no available session to use, we try allocate a new
+  // session.
+  QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+  session_ptr session(new struct QzSession_S());
+  memset(session.get(), 0, sizeof(struct QzSession_S));
+  if (get_qz_params(alg_name, params) && setup_session(session, params)) {
+    return session;
+  } else {
+    return nullptr;
+  }
+}
+
+QatAccel::QatAccel() {}
+
+QatAccel::~QatAccel() {
+  // First, we should uninitialize all QATzip session that disconnects all session
+  // from a hardware instance and deallocates buffers.
+  sessions.clear();
+  // Then we close the connection with QAT.
+  // where the value of the parameter passed to qzClose() does not matter. as long as
+  // it is not nullptr.
+  qzClose((QzSession_T*)1);
+}
+
+bool QatAccel::init(const std::string &alg) {
+  std::scoped_lock lock(mutex);
+  if (!alg_name.empty()) {
+    return true;
+  }
+
+  dout(15) << "First use for QAT compressor" << dendl;
+  if (alg != "zlib") {
+    return false;
+  }
+
+  alg_name = alg;
   return true;
 }
 
 int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) {
+  auto s = get_session(); // get a session from the pool
+  if (!s) {
+    return -1; // session initialization failed
+  }
+  auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+
   for (auto &i : in.buffers()) {
     const unsigned char* c_in = (unsigned char*) i.c_str();
     unsigned int len = i.length();
@@ -88,12 +178,18 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
                 size_t compressed_len,
                 bufferlist &dst,
                 boost::optional<int32_t> compressor_message) {
+  auto s = get_session(); // get a session from the pool
+  if (!s) {
+    return -1; // session initialization failed
+  }
+  auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+
   unsigned int ratio_idx = 0;
   bool read_more = false;
   bool joint = false;
   int rc = 0;
   bufferlist tmp;
-  size_t remaining = MIN(p.get_remaining(), compressed_len);
+  size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
 
   while (remaining) {
     if (p.end()) {
@@ -106,7 +202,9 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
       if (read_more)
         tmp.append(cur_ptr.c_str(), len);
       len = tmp.length();
+      tmp.rebuild_page_aligned();
     }
+
     unsigned int out_len = len * expansion_ratio[ratio_idx];
     bufferptr ptr = buffer::create_small_page_aligned(out_len);
 
@@ -117,7 +215,8 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
     if (rc == QZ_DATA_ERROR) {
       if (!joint) {
         tmp.append(cur_ptr.c_str(), cur_ptr.length());
-        p += remaining;
+        p += cur_ptr.length();
+        remaining -= cur_ptr.length();
         joint = true;
       }
       read_more = true;
@@ -137,8 +236,8 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
       read_more = false;
     }
 
-    p += remaining;
-    remaining -= len;
+    p += cur_ptr.length();
+    remaining -= cur_ptr.length();
     dst.append(ptr, 0, out_len);
   }
 
index ff99e200046f440a2037ff71b0b591ca290caf5a..33ad588ad9fdedab08dea71325bfef0414075dac 100644 (file)
 #ifndef CEPH_QATACCEL_H
 #define CEPH_QATACCEL_H
 
-#include <memory>
 #include <boost/optional.hpp>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <vector>
+
 #include "include/buffer.h"
 
-extern "C" struct QzSession_S; //struct QzSession_S comes from QAT libraries
+extern "C" struct QzSession_S; // typedef struct QzSession_S QzSession_T;
 
 struct QzSessionDeleter {
   void operator() (struct QzSession_S *session);
 };
 
 class QatAccel {
-  std::unique_ptr<struct QzSession_S, QzSessionDeleter> session;
-
  public:
+  using session_ptr = std::unique_ptr<struct QzSession_S, QzSessionDeleter>;
   QatAccel();
   ~QatAccel();
 
@@ -37,6 +40,15 @@ class QatAccel {
   int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message);
   int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message);
   int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional<int32_t> compressor_message);
+
+ private:
+  // get a session from the pool or create a new one. returns null if session init fails
+  session_ptr get_session();
+
+  friend struct cached_session_t;
+  std::vector<session_ptr> sessions;
+  std::mutex mutex;
+  std::string alg_name;
 };
 
 #endif