]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor: Change data formt to QZ_DEFLATE_GZIP_EXT for QAT zlib
authorhualong feng <hualong.feng@intel>
Thu, 6 Jun 2024 07:53:03 +0000 (15:53 +0800)
committerFeng,Hualong <hualong.feng@intel.com>
Mon, 29 Jul 2024 01:36:00 +0000 (01:36 +0000)
QAT zlib 'QZ_DEFLATE_RAW' data format cannot decompress
by QAT hardware. So here we replace 'QZ_DEFLATE_GZIP_EXT' data
format with 'QZ_DEFLATE_RAW'.

'QZ_DEFLATE_GZIP_EXT' data format need to add gz_header
by deflateSetHeader() in QATzip. And it leads multi stream
in one compression for hardware buffer. So the windows bit
is important information for decompression, which related to
if the inflate remove header.

Add busy_polling setting for reducing latency

Signed-off-by: Feng,Hualong <hualong.feng@intel.com>
(cherry picked from commit 855c5d6826dabba0093e65e34be14a2fb1581dd0)

src/common/options/global.yaml.in
src/compressor/QatAccel.cc
src/compressor/QatAccel.h
src/compressor/zlib/ZlibCompressor.cc

index a40cf8c63d5c81d94fabdfa20322e84ca7048d94..51c8c0acfd77a14036cefc6363fc92d448740259 100644 (file)
@@ -767,6 +767,11 @@ options:
   level: advanced
   desc: Set the maximum number of session within Qatzip when using QAT compressor
   default: 256
+- name: qat_compressor_busy_polling
+  type: bool
+  level: advanced
+  desc: Set QAT busy bolling to reduce latency at the cost of potentially increasing CPU usage
+  default: false
 - name: plugin_crypto_accelerator
   type: str
   level: advanced
index de19ccfa358eba0c36ece2be00df509d01079de4..c1c374a6836587b0364b13968220f4ed2df9d0c5 100644 (file)
@@ -19,6 +19,7 @@
 #include "common/dout.h"
 #include "common/errno.h"
 #include "QatAccel.h"
+#include "zlib.h"
 
 // -----------------------------------------------------------------------------
 #define dout_context g_ceph_context
@@ -33,6 +34,7 @@ static std::ostream& _prefix(std::ostream* _dout)
 // -----------------------------------------------------------------------------
 // default window size for Zlib 1.2.8, negated for raw deflate
 #define ZLIB_DEFAULT_WIN_SIZE -15
+#define GZIP_WRAPPER 16
 
 /* Estimate data expansion after decompression */
 static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
@@ -42,6 +44,10 @@ void QzSessionDeleter::operator() (struct QzSession_S *session) {
   delete session;
 }
 
+QzPollingMode_T busy_polling(bool isSet) {
+  return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING;
+}
+
 static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) {
   int rc;
   rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
@@ -52,10 +58,12 @@ static bool setup_session(const std::string &alg, QatAccel::session_ptr &session
     rc = qzGetDefaultsDeflate(&params);
     if (rc != QZ_OK)
       return false;
-    params.data_fmt = QZ_DEFLATE_RAW;
+
+    params.data_fmt = QZ_DEFLATE_GZIP_EXT;
     params.common_params.comp_algorithm = QZ_DEFLATE;
     params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
     params.common_params.direction = QZ_DIR_BOTH;
+    params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val<bool>("qat_compressor_busy_polling"));
     rc = qzSetupSessionDeflate(session.get(), &params);
     if (rc != QZ_OK)
       return false;
@@ -136,6 +144,8 @@ bool QatAccel::init(const std::string &alg) {
   }
 
   alg_name = alg;
+  windowBits = GZIP_WRAPPER + MAX_WBITS;
+
   return true;
 }
 
@@ -145,7 +155,8 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional<int3
     return -1; // session initialization failed
   }
   auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
-  compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+  compressor_message = windowBits;
+
   int begin = 1;
   for (auto &i : in.buffers()) {
     const unsigned char* c_in = (unsigned char*) i.c_str();
@@ -188,28 +199,31 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
 
   int rc = 0;
   bufferlist tmp;
-  size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
-
-  while (remaining) {
-    unsigned int ratio_idx = 0;
-    const char* c_in = nullptr;
-    unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
-    remaining -= len;
-    len -= begin;
-    c_in += begin;
-    begin = 0;
+  unsigned int ratio_idx = 0;
+  const char* c_in = nullptr;
+  p.copy_all(tmp);
+  c_in = tmp.c_str();
+  unsigned int len = std::min<unsigned int>(tmp.length(), compressed_len);
+
+  len -= begin;
+  c_in += begin;
+  begin = 0;
+
+  bufferptr ptr;
+  do {
     unsigned int out_len = QZ_HW_BUFF_SZ;
-
-    bufferptr ptr;
+    unsigned int len_current = len;
     do {
-      while (out_len <= len * expansion_ratio[ratio_idx]) {
+      while (out_len <= len_current * expansion_ratio[ratio_idx]) {
         out_len *= 2;
       }
 
       ptr = buffer::create_small_page_aligned(out_len);
-      rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
+      rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len);
       ratio_idx++;
     } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
+    c_in += len_current;
+    len -= len_current;
 
     if (rc == QZ_OK) {
       dst.append(ptr, 0, out_len);
@@ -223,7 +237,7 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
       dout(1) << "QAT compressor NOT OK" << dendl;
       return -1;
     }
-  }
 
+  } while (len != 0);
   return 0;
 }
index 3533eff9b8fd488d8fabe69f5b02461d4094cd0d..3735fa4616e17517ad21a789ad064230eff55a7c 100644 (file)
@@ -49,6 +49,7 @@ class QatAccel {
   std::vector<session_ptr> sessions;
   std::mutex mutex;
   std::string alg_name;
+  int windowBits;
 };
 
 #endif
index 2a0aa006901e3330a05321b907b975b2492c18c6..f37edc70a0f70986e1852f51a6d0050d99b9f326 100644 (file)
@@ -49,6 +49,7 @@ _prefix(std::ostream* _dout)
 
 // default window size for Zlib 1.2.8, negated for raw deflate
 #define ZLIB_DEFAULT_WIN_SIZE -15
+#define GZIP_WRAPPER 16
 
 // desired memory usage level. increasing to 9 doesn't speed things up
 // significantly (helps only on >=16K blocks) and sometimes degrades
@@ -205,8 +206,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona
 int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
 {
 #ifdef HAVE_QATZIP
-  // QAT can only decompress with the default window size
-  if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
+  // QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT'
+  if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS)
     return qat_accel.decompress(p, compressed_size, out, compressor_message);
 #endif
 
@@ -215,6 +216,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
   z_stream strm;
   const char* c_in;
   int begin = 1;
+  bool multisteam = false;
 
   /* allocate inflate state */
   strm.zalloc = Z_NULL;
@@ -226,6 +228,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
   // choose the variation of compressor
   if (!compressor_message)
     compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+
   ret = inflateInit2(&strm, *compressor_message);
   if (ret != Z_OK) {
     dout(1) << "Decompression init error: init return "
@@ -255,7 +258,10 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
       }
       have = MAX_LEN - strm.avail_out;
       out.append(ptr, 0, have);
-    } while (strm.avail_out == 0);
+      // There may be mutil stream to decompress
+      multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END);
+      if (multisteam) inflateReset(&strm);
+    } while (strm.avail_out == 0 || multisteam);
   }
 
   /* clean up and return */