From: hualong feng Date: Thu, 6 Jun 2024 07:53:03 +0000 (+0800) Subject: compressor: Change data formt to QZ_DEFLATE_GZIP_EXT for QAT zlib X-Git-Tag: v20.0.0~1414^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=855c5d6826dabba0093e65e34be14a2fb1581dd0;p=ceph.git compressor: Change data formt to QZ_DEFLATE_GZIP_EXT for QAT zlib QAT zlib 'QZ_DEFLATE_RAW' data format cannot decompress by QAT hardware. So here we replace 'QZ_DEFLATE_GZIP_EXT' data format with 'QZ_DEFLATE_RAW'. 'QZ_DEFLATE_GZIP_EXT' data format need to add gz_header by deflateSetHeader() in QATzip. And it leads multi stream in one compression for hardware buffer. So the windows bit is important information for decompression, which related to if the inflate remove header. Add busy_polling setting for reducing latency Signed-off-by: Feng,Hualong --- diff --git a/src/common/options/global.yaml.in b/src/common/options/global.yaml.in index 1b355d6e03ad..7905dc5c483f 100644 --- a/src/common/options/global.yaml.in +++ b/src/common/options/global.yaml.in @@ -790,6 +790,11 @@ options: level: advanced desc: Set the maximum number of session within Qatzip when using QAT compressor default: 256 +- name: qat_compressor_busy_polling + type: bool + level: advanced + desc: Set QAT busy bolling to reduce latency at the cost of potentially increasing CPU usage + default: false - name: plugin_crypto_accelerator type: str level: advanced diff --git a/src/compressor/QatAccel.cc b/src/compressor/QatAccel.cc index de19ccfa358e..c1c374a68365 100644 --- a/src/compressor/QatAccel.cc +++ b/src/compressor/QatAccel.cc @@ -19,6 +19,7 @@ #include "common/dout.h" #include "common/errno.h" #include "QatAccel.h" +#include "zlib.h" // ----------------------------------------------------------------------------- #define dout_context g_ceph_context @@ -33,6 +34,7 @@ static std::ostream& _prefix(std::ostream* _dout) // ----------------------------------------------------------------------------- // default window size for Zlib 1.2.8, negated for raw deflate #define ZLIB_DEFAULT_WIN_SIZE -15 +#define GZIP_WRAPPER 16 /* Estimate data expansion after decompression */ static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000}; @@ -42,6 +44,10 @@ void QzSessionDeleter::operator() (struct QzSession_S *session) { delete session; } +QzPollingMode_T busy_polling(bool isSet) { + return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING; +} + static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) { int rc; rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT); @@ -52,10 +58,12 @@ static bool setup_session(const std::string &alg, QatAccel::session_ptr &session rc = qzGetDefaultsDeflate(¶ms); if (rc != QZ_OK) return false; - params.data_fmt = QZ_DEFLATE_RAW; + + params.data_fmt = QZ_DEFLATE_GZIP_EXT; params.common_params.comp_algorithm = QZ_DEFLATE; params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level; params.common_params.direction = QZ_DIR_BOTH; + params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val("qat_compressor_busy_polling")); rc = qzSetupSessionDeflate(session.get(), ¶ms); if (rc != QZ_OK) return false; @@ -136,6 +144,8 @@ bool QatAccel::init(const std::string &alg) { } alg_name = alg; + windowBits = GZIP_WRAPPER + MAX_WBITS; + return true; } @@ -145,7 +155,8 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out, std::optional(p.get_remaining(), compressed_len); - - while (remaining) { - unsigned int ratio_idx = 0; - const char* c_in = nullptr; - unsigned int len = p.get_ptr_and_advance(remaining, &c_in); - remaining -= len; - len -= begin; - c_in += begin; - begin = 0; + unsigned int ratio_idx = 0; + const char* c_in = nullptr; + p.copy_all(tmp); + c_in = tmp.c_str(); + unsigned int len = std::min(tmp.length(), compressed_len); + + len -= begin; + c_in += begin; + begin = 0; + + bufferptr ptr; + do { unsigned int out_len = QZ_HW_BUFF_SZ; - - bufferptr ptr; + unsigned int len_current = len; do { - while (out_len <= len * expansion_ratio[ratio_idx]) { + while (out_len <= len_current * expansion_ratio[ratio_idx]) { out_len *= 2; } ptr = buffer::create_small_page_aligned(out_len); - rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len); + rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len); ratio_idx++; } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio)); + c_in += len_current; + len -= len_current; if (rc == QZ_OK) { dst.append(ptr, 0, out_len); @@ -223,7 +237,7 @@ int QatAccel::decompress(bufferlist::const_iterator &p, dout(1) << "QAT compressor NOT OK" << dendl; return -1; } - } + } while (len != 0); return 0; } diff --git a/src/compressor/QatAccel.h b/src/compressor/QatAccel.h index 3533eff9b8fd..3735fa4616e1 100644 --- a/src/compressor/QatAccel.h +++ b/src/compressor/QatAccel.h @@ -49,6 +49,7 @@ class QatAccel { std::vector sessions; std::mutex mutex; std::string alg_name; + int windowBits; }; #endif diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc index 2a0aa006901e..f37edc70a0f7 100644 --- a/src/compressor/zlib/ZlibCompressor.cc +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -49,6 +49,7 @@ _prefix(std::ostream* _dout) // default window size for Zlib 1.2.8, negated for raw deflate #define ZLIB_DEFAULT_WIN_SIZE -15 +#define GZIP_WRAPPER 16 // desired memory usage level. increasing to 9 doesn't speed things up // significantly (helps only on >=16K blocks) and sometimes degrades @@ -205,8 +206,8 @@ int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, std::optiona int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional compressor_message) { #ifdef HAVE_QATZIP - // QAT can only decompress with the default window size - if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE)) + // QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT' + if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS) return qat_accel.decompress(p, compressed_size, out, compressor_message); #endif @@ -215,6 +216,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ z_stream strm; const char* c_in; int begin = 1; + bool multisteam = false; /* allocate inflate state */ strm.zalloc = Z_NULL; @@ -226,6 +228,7 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ // choose the variation of compressor if (!compressor_message) compressor_message = ZLIB_DEFAULT_WIN_SIZE; + ret = inflateInit2(&strm, *compressor_message); if (ret != Z_OK) { dout(1) << "Decompression init error: init return " @@ -255,7 +258,10 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_ } have = MAX_LEN - strm.avail_out; out.append(ptr, 0, have); - } while (strm.avail_out == 0); + // There may be mutil stream to decompress + multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END); + if (multisteam) inflateReset(&strm); + } while (strm.avail_out == 0 || multisteam); } /* clean up and return */