return *_dout << "QatAccel: ";
}
// -----------------------------------------------------------------------------
+// default window size for Zlib 1.2.8, negated for raw deflate
+#define ZLIB_DEFAULT_WIN_SIZE -15
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200, 1000, 10000};
void QzSessionDeleter::operator() (struct QzSession_S *session) {
qzTeardownSession(session);
delete session;
}
-/* Estimate data expansion after decompression */
-static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
-
static bool get_qz_params(const std::string &alg, QzSessionParams_T ¶ms) {
int rc;
rc = qzGetDefaults(¶ms);
params.is_busy_polling = true;
if (alg == "zlib") {
params.comp_algorithm = QZ_DEFLATE;
- params.data_fmt = QZ_DEFLATE_GZIP_EXT;
+ params.data_fmt = QZ_DEFLATE_RAW;
+ params.comp_lvl = g_ceph_context->_conf->compressor_zlib_level;
}
else {
// later, there also has lz4.
return -1; // session initialization failed
}
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
-
+ compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+ int begin = 1;
for (auto &i : in.buffers()) {
const unsigned char* c_in = (unsigned char*) i.c_str();
unsigned int len = i.length();
- unsigned int out_len = qzMaxCompressedLength(len, session.get());
+ unsigned int out_len = qzMaxCompressedLength(len, session.get()) + begin;
bufferptr ptr = buffer::create_small_page_aligned(out_len);
- int rc = qzCompress(session.get(), c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
+ unsigned char* c_out = (unsigned char*)ptr.c_str() + begin;
+ int rc = qzCompress(session.get(), c_in, &len, c_out, &out_len, 1);
if (rc != QZ_OK)
return -1;
+ if (begin) {
+ // put a compressor variation mark in front of compressed stream, not used at the moment
+ ptr.c_str()[0] = 0;
+ out_len += begin;
+ begin = 0;
+ }
out.append(ptr, 0, out_len);
+
}
return 0;
return -1; // session initialization failed
}
auto session = cached_session_t{this, std::move(s)}; // returns to the session pool on destruction
+ int begin = 1;
- unsigned int ratio_idx = 0;
- bool read_more = false;
- bool joint = false;
int rc = 0;
bufferlist tmp;
size_t remaining = std::min<size_t>(p.get_remaining(), compressed_len);
while (remaining) {
- if (p.end()) {
- return -1;
- }
-
- bufferptr cur_ptr = p.get_current_ptr();
- unsigned int len = cur_ptr.length();
- if (joint) {
- if (read_more)
- tmp.append(cur_ptr.c_str(), len);
- len = tmp.length();
- tmp.rebuild_page_aligned();
- }
+ unsigned int ratio_idx = 0;
+ const char* c_in = nullptr;
+ unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+ remaining -= len;
+ len -= begin;
+ c_in += begin;
+ begin = 0;
+ unsigned int out_len = QZ_HW_BUFF_SZ;
+
+ bufferptr ptr;
+ do {
+ while (out_len <= len * expansion_ratio[ratio_idx]) {
+ out_len *= 2;
+ }
- unsigned int out_len = len * expansion_ratio[ratio_idx];
- bufferptr ptr = buffer::create_small_page_aligned(out_len);
+ ptr = buffer::create_small_page_aligned(out_len);
+ rc = qzDecompress(session.get(), (const unsigned char*)c_in, &len, (unsigned char*)ptr.c_str(), &out_len);
+ ratio_idx++;
+ } while (rc == QZ_BUF_ERROR && ratio_idx < std::size(expansion_ratio));
- if (joint)
- rc = qzDecompress(session.get(), (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
- else
- rc = qzDecompress(session.get(), (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
- if (rc == QZ_DATA_ERROR) {
- if (!joint) {
- tmp.append(cur_ptr.c_str(), cur_ptr.length());
- p += cur_ptr.length();
- remaining -= cur_ptr.length();
- joint = true;
- }
- read_more = true;
- continue;
+ if (rc == QZ_OK) {
+ dst.append(ptr, 0, out_len);
+ } else if (rc == QZ_DATA_ERROR) {
+ dout(1) << "QAT compressor DATA ERROR" << dendl;
+ return -1;
} else if (rc == QZ_BUF_ERROR) {
- if (ratio_idx == std::size(expansion_ratio))
- return -1;
- if (joint)
- read_more = false;
- ratio_idx++;
- continue;
+ dout(1) << "QAT compressor BUF ERROR" << dendl;
+ return -1;
} else if (rc != QZ_OK) {
+ dout(1) << "QAT compressor NOT OK" << dendl;
return -1;
- } else {
- ratio_idx = 0;
- joint = false;
- read_more = false;
}
-
- p += cur_ptr.length();
- remaining -= cur_ptr.length();
- dst.append(ptr, 0, out_len);
}
return 0;