From 0b8ef8e274f434c81c29c826153dffa01d2a592b Mon Sep 17 00:00:00 2001 From: hualong feng Date: Thu, 6 Jun 2024 15:53:03 +0800 Subject: [PATCH] compressor: Change data formt to QZ_DEFLATE_GZIP_EXT for QAT zlib QAT zlib 'QZ_DEFLATE_RAW' data format cannot decompress by QAT hardware. So here we replace 'QZ_DEFLATE_GZIP_EXT' data format with 'QZ_DEFLATE_RAW'. 'QZ_DEFLATE_GZIP_EXT' data format need to add gz_header by deflateSetHeader() in QATzip. And it leads multi stream in one compression for hardware buffer. So the windows bit is important information for decompression, which related to if the inflate remove header. Add busy_polling setting for reducing latency Signed-off-by: Feng,Hualong (cherry picked from commit 855c5d6826dabba0093e65e34be14a2fb1581dd0) --- src/common/options/global.yaml.in | 5 +++ src/compressor/QatAccel.cc | 48 +++++++++++++++++---------- src/compressor/QatAccel.h | 1 + src/compressor/zlib/ZlibCompressor.cc | 12 +++++-- 4 files changed, 46 insertions(+), 20 deletions(-) diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index a40cf8c63d5..51c8c0acfd7 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -767,6 +767,11 @@ options: level: advanced desc: Set the maximum number of session within Qatzip when using QAT compressor default: 256 +- name: qat_compressor_busy_polling + type: bool + level: advanced + desc: Set QAT busy bolling to reduce latency at the cost of potentially increasing CPU usage + default: false - name: plugin_crypto_accelerator type: str level: advanced diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc index de19ccfa358..c1c374a6836 100644 --- a/src/compressor/QatAccel.cc +++ b/src/compressor/QatAccel.cc @@ -19,6 +19,7 @@ #include "common/dout.h" #include "common/errno.h" #include "QatAccel.h" +#include "zlib.h" // ----------------------------------------------------------------------------- #define dout_context g_ceph_context @@ -33,6 +34,7 @@ static std::ostream& _prefix(std::ostream* _dout) // ----------------------------------------------------------------------------- // default window size for Zlib 1.2.8, negated for raw deflate #define ZLIB_DEFAULT_WIN_SIZE -15 +#define GZIP_WRAPPER 16 /* Estimate data expansion after decompression */ static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000}; @@ -42,6 +44,10 @@ void QzSessionDeleter::operator() (struct QzSession_S *session) { delete session; } +QzPollingMode_T busy_polling(bool isSet) { + return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING; +} + static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) { int rc; rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT); @@ -52,10 +58,12 @@ static bool setup_session(const std::string &alg, QatAccel::session_ptr &session rc = qzGetDefaultsDeflate(¶ms); if (rc != QZ_OK) return false; - params.data_fmt = QZ_DEFLATE_RAW; + + params.data_fmt = QZ_DEFLATE_GZIP_EXT; params.common_params.comp_algorithm = QZ_DEFLATE; params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level; params.common_params.direction = QZ_DIR_BOTH; + params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val("qat_compressor_busy_polling")); rc = qzSetupSessionDeflate(session.get(), ¶ms); if (rc != QZ_OK) return false; @@ -136,6 +144,8 @@ bool QatAccel::init(const std::string &alg) { } alg_name = alg; + windowBits = GZIP_WRAPPER + MAX_WBITS; + return true; } @@ -145,7 +155,8 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional(p.get_remaining(), compressed_len); - - while (remaining) { - unsigned int ratio_idx = 0; - const char* c_in = nullptr; - unsigned int len = p.get_ptr_and_advance(remaining, &c_in); - remaining -= len; - len -= begin; - c_in += begin; - begin = 0; + unsigned int ratio_idx = 0; + const char* c_in = nullptr; + p.copy_all(tmp); + c_in = tmp.c_str(); + unsigned int len = std::min(tmp.length(), compressed_len); + + len -= begin; + c_in += begin; + begin = 0; + + bufferptr ptr; + do { unsigned int out_len = QZ_HW_BUFF_SZ; - - bufferptr ptr; + unsigned int len_current = len; do { - while (out_len <= len * expansion_ratio[ratio_idx]) { + while (out_len <= len_current * expansion_ratio[ratio_idx]) { out_len *= 2; } ptr = buffer::create_small_page_aligned(out_len); - rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len); + rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len); ratio_idx++; } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio)); + c_in += len_current; + len -= len_current; if (rc == QZ_OK) { dst.append(ptr, 0, out_len); @@ -223,7 +237,7 @@ int QatAccel::decompress(bufferlist::const_iterator &p, dout(1) << "QAT compressor NOT OK" << dendl; return -1; } - } + } while (len != 0); return 0; } diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h index 3533eff9b8f..3735fa4616e 100644 --- a/src/compressor/QatAccel.h +++ b/src/compressor/QatAccel.h @@ -49,6 +49,7 @@ class QatAccel { std::vector sessions; std::mutex mutex; std::string alg_name; + int windowBits; }; #endif diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc index 2a0aa006901..f37edc70a0f 100644 --- a/src/compressor/zlib/ZlibCompressor.cc +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -49,6 +49,7 @@ _prefix(std::ostream* _dout) // default window size for Zlib 1.2.8, negated for raw deflate #define ZLIB_DEFAULT_WIN_SIZE -15 +#define GZIP_WRAPPER 16 // desired memory usage level. increasing to 9 doesn't speed things up // significantly (helps only on >=16K blocks) and sometimes degrades @@ -205,8 +206,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional compressor_message) { #ifdef HAVE_QATZIP - // QAT can only decompress with the default window size - if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE)) + // QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT' + if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS) return qat_accel.decompress(p, compressed_size, out, compressor_message); #endif @@ -215,6 +216,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ z_stream strm; const char* c_in; int begin = 1; + bool multisteam = false; /* allocate inflate state */ strm.zalloc = Z_NULL; @@ -226,6 +228,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ // choose the variation of compressor if (!compressor_message) compressor_message = ZLIB_DEFAULT_WIN_SIZE; + ret = inflateInit2(&strm, *compressor_message); if (ret != Z_OK) { dout(1) << "Decompression init error: init return " @@ -255,7 +258,10 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ } have = MAX_LEN - strm.avail_out; out.append(ptr, 0, have); - } while (strm.avail_out == 0); + // There may be mutil stream to decompress + multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END); + if (multisteam) inflateReset(&strm); + } while (strm.avail_out == 0 || multisteam); } /* clean up and return */ -- 2.39.5