]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
common/compressor: fix the issue that read more data 45806/head
authorFeng Hualong <hualong.feng@intel.com>
Fri, 1 Apr 2022 09:09:41 +0000 (17:09 +0800)
committerFeng Hualong <hualong.feng@intel.com>
Wed, 1 Jun 2022 15:13:37 +0000 (23:13 +0800)
QAT decompression may read more data when object size is less
than 4k. So there use another way to get data.

And We should keep win_size within compressor_message,
So QAT compress is compatible for software compress.

Signed-off-by: Feng Hualong <hualong.feng@intel.com>
src/compressor/QatAccel.cc
src/compressor/zlib/ZlibCompressor.cc

index 6c6a5e45b711caa274e21d40639d2b0fe9b196fa..919f42bfdda6b1c9db3b2a81b8526ae16fdde143 100644 (file)
@@ -31,15 +31,17 @@ static std::ostream& _prefix(std::ostream* _dout)
   return *_dout << "QatAccel: ";
 }
 // -----------------------------------------------------------------------------
+// default window size for Zlib 1.2.8, negated for raw deflate
+#define ZLIB_DEFAULT_WIN_SIZE -15
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
 
 void QzSessionDeleter::operator() (struct QzSession_S *session) {
   qzTeardownSession(session);
   delete session;
 }
 
-/* Estimate data expansion after decompression */
-static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
-
 static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
   int rc;
   rc = qzGetDefaults(&params);
@@ -49,7 +51,8 @@ static bool get_qz_params(const std::string &alg, QzSessionParams_T &params) {
   params.is_busy_polling = true;
   if (alg == "zlib") {
     params.comp_algorithm = QZ_DEFLATE;
-    params.data_fmt = QZ_DEFLATE_GZIP_EXT;
+    params.data_fmt = QZ_DEFLATE_RAW;
+    params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
   }
   else {
     // later, there also has lz4.
@@ -153,17 +156,26 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional<in
     return -1; // session initialization failed
   }
   auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
-
+  compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+  int begin = 1;
   for (auto &i : in.buffers()) {
     const unsigned char* c_in = (unsigned char*) i.c_str();
     unsigned int len = i.length();
-    unsigned int out_len = qzMaxCompressedLength(len, session.get());
+    unsigned int out_len = qzMaxCompressedLength(len, session.get()) + begin;
 
     bufferptr ptr = buffer::create_small_page_aligned(out_len);
-    int rc = qzCompress(session.get(), c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
+    unsigned char* c_out = (unsigned char*)ptr.c_str() + begin;
+    int rc = qzCompress(session.get(), c_in, &len, c_out, &out_len, 1);
     if (rc != QZ_OK)
       return -1;
+    if (begin) {
+      // put a compressor variation mark in front of compressed stream, not used at the moment
+      ptr.c_str()[0] = 0;
+      out_len += begin;
+      begin = 0;
+    }
     out.append(ptr, 0, out_len);
+
   }
 
   return 0;
@@ -183,62 +195,45 @@ int QatAccel::decompress(bufferlist::const_iterator &p,
     return -1; // session initialization failed
   }
   auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+  int begin = 1;
 
-  unsigned int ratio_idx = 0;
-  bool read_more = false;
-  bool joint = false;
   int rc = 0;
   bufferlist tmp;
   size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
 
   while (remaining) {
-    if (p.end()) {
-      return -1;
-    }
-
-    bufferptr cur_ptr = p.get_current_ptr();
-    unsigned int len = cur_ptr.length();
-    if (joint) {
-      if (read_more)
-        tmp.append(cur_ptr.c_str(), len);
-      len = tmp.length();
-      tmp.rebuild_page_aligned();
-    }
+    unsigned int ratio_idx = 0;
+    const char* c_in = nullptr;
+    unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+    remaining -= len;
+    len -= begin;
+    c_in += begin;
+    begin = 0;
+    unsigned int out_len = QZ_HW_BUFF_SZ;
+
+    bufferptr ptr;
+    do {
+      while (out_len <= len * expansion_ratio[ratio_idx]) {
+        out_len *= 2;
+      }
 
-    unsigned int out_len = len * expansion_ratio[ratio_idx];
-    bufferptr ptr = buffer::create_small_page_aligned(out_len);
+      ptr = buffer::create_small_page_aligned(out_len);
+      rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
+      ratio_idx++;
+    } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
 
-    if (joint)
-      rc = qzDecompress(session.get(), (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
-    else
-      rc = qzDecompress(session.get(), (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
-    if (rc == QZ_DATA_ERROR) {
-      if (!joint) {
-        tmp.append(cur_ptr.c_str(), cur_ptr.length());
-        p += cur_ptr.length();
-        remaining -= cur_ptr.length();
-        joint = true;
-      }
-      read_more = true;
-      continue;
+    if (rc == QZ_OK) {
+      dst.append(ptr, 0, out_len);
+    } else if (rc == QZ_DATA_ERROR) {
+      dout(1) << "QAT compressor DATA ERROR" << dendl;
+      return -1;
     } else if (rc == QZ_BUF_ERROR) {
-      if (ratio_idx == std::size(expansion_ratio))
-        return -1;
-      if (joint)
-        read_more = false;
-      ratio_idx++;
-      continue;
+      dout(1) << "QAT compressor BUF ERROR" << dendl;
+      return -1;
     } else if (rc != QZ_OK) {
+      dout(1) << "QAT compressor NOT OK" << dendl;
       return -1;
-    } else {
-      ratio_idx = 0;
-      joint = false;
-      read_more = false;
     }
-
-    p += cur_ptr.length();
-    remaining -= cur_ptr.length();
-    dst.append(ptr, 0, out_len);
   }
 
   return 0;
index d1ee1549977bba86e4001a0768d105bf90ca0b8a..4482f44e2bac702c87f97eab5aa46c2f709374d6 100644 (file)
@@ -187,7 +187,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, boost::optio
 int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, boost::optional<int32_t> compressor_message)
 {
 #ifdef HAVE_QATZIP
-  if (qat_enabled)
+  // QAT can only decompress with the default window size
+  if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
     return qat_accel.decompress(p, compressed_size, out, compressor_message);
 #endif