#include "common/dout.h"
#include "common/errno.h"
#include "QatAccel.h"
+#include "zlib.h"
// -----------------------------------------------------------------------------
#define dout_context g_ceph_context
// -----------------------------------------------------------------------------
// default window size for Zlib 1.2.8, negated for raw deflate
#define ZLIB_DEFAULT_WIN_SIZE -15
+#define GZIP_WRAPPER 16
/* Estimate data expansion after decompression */
static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
delete session;
}
+QzPollingMode_T busy_polling(bool isSet) {
+ return isSet ? QZ_BUSY_POLLING : QZ_PERIODICAL_POLLING;
+}
+
static bool setup_session(const std::string &alg, QatAccel::session_ptr &session) {
int rc;
rc = qzInit(session.get(), QZ_SW_BACKUP_DEFAULT);
rc = qzGetDefaultsDeflate(¶ms);
if (rc != QZ_OK)
return false;
- params.data_fmt = QZ_DEFLATE_RAW;
+
+ params.data_fmt = QZ_DEFLATE_GZIP_EXT;
params.common_params.comp_algorithm = QZ_DEFLATE;
params.common_params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
params.common_params.direction = QZ_DIR_BOTH;
+ params.common_params.polling_mode = busy_polling(g_ceph_context->_conf.get_val<bool>("qat_compressor_busy_polling"));
rc = qzSetupSessionDeflate(session.get(), ¶ms);
if (rc != QZ_OK)
return false;
}
alg_name = alg;
+ windowBits = GZIP_WRAPPER + MAX_WBITS;
+
return true;
}
return -1; // session initialization failed
}
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
- compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+ compressor_message = windowBits;
+
int begin = 1;
for (auto &i : in.buffers()) {
const unsigned char* c_in = (unsigned char*) i.c_str();
int rc = 0;
bufferlist tmp;
- size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
-
- while (remaining) {
- unsigned int ratio_idx = 0;
- const char* c_in = nullptr;
- unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
- remaining -= len;
- len -= begin;
- c_in += begin;
- begin = 0;
+ unsigned int ratio_idx = 0;
+ const char* c_in = nullptr;
+ p.copy_all(tmp);
+ c_in = tmp.c_str();
+ unsigned int len = std::min<unsigned int>(tmp.length(), compressed_len);
+
+ len -= begin;
+ c_in += begin;
+ begin = 0;
+
+ bufferptr ptr;
+ do {
unsigned int out_len = QZ_HW_BUFF_SZ;
-
- bufferptr ptr;
+ unsigned int len_current = len;
do {
- while (out_len <= len * expansion_ratio[ratio_idx]) {
+ while (out_len <= len_current * expansion_ratio[ratio_idx]) {
out_len *= 2;
}
ptr = buffer::create_small_page_aligned(out_len);
- rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
+ rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len_current, (unsigned char*)ptr.c_str(), &out_len);
ratio_idx++;
} while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
+ c_in += len_current;
+ len -= len_current;
if (rc == QZ_OK) {
dst.append(ptr, 0, out_len);
dout(1) << "QAT compressor NOT OK" << dendl;
return -1;
}
- }
+ } while (len != 0);
return 0;
}
// default window size for Zlib 1.2.8, negated for raw deflate
#define ZLIB_DEFAULT_WIN_SIZE -15
+#define GZIP_WRAPPER 16
// desired memory usage level. increasing to 9 doesn't speed things up
// significantly (helps only on >=16K blocks) and sometimes degrades
int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, std::optional<int32_t> compressor_message)
{
#ifdef HAVE_QATZIP
- // QAT can only decompress with the default window size
- if (qat_enabled && (!compressor_message || *compressor_message == ZLIB_DEFAULT_WIN_SIZE))
+ // QAT can only decompress with existing header, only for 'QZ_DEFLATE_GZIP_EXT'
+ if (qat_enabled && compressor_message.has_value() && *compressor_message == GZIP_WRAPPER + MAX_WBITS)
return qat_accel.decompress(p, compressed_size, out, compressor_message);
#endif
z_stream strm;
const char* c_in;
int begin = 1;
+ bool multisteam = false;
/* allocate inflate state */
strm.zalloc = Z_NULL;
// choose the variation of compressor
if (!compressor_message)
compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+
ret = inflateInit2(&strm, *compressor_message);
if (ret != Z_OK) {
dout(1) << "Decompression init error: init return "
}
have = MAX_LEN - strm.avail_out;
out.append(ptr, 0, have);
- } while (strm.avail_out == 0);
+ // There may be mutil stream to decompress
+ multisteam = (strm.avail_in != 0 && ret == Z_STREAM_END);
+ if (multisteam) inflateReset(&strm);
+ } while (strm.avail_out == 0 || multisteam);
}
/* clean up and return */