This patch adds new QATzip plugin to support QAT for compression.
QATZip is a user space library which builds on top of the Intel
QAT (QuickAssist Technology) user space library, to provide extended
accelerated compression and decompression services by offloading the
actual compression and decompression request(s) to the hardware
QAT accelerators, which are more efficient in terms of cost and power
than general purpose CPUs for those specific compute-intensive
workloads.
Based on QAT accelerators, QATZip can support several compression
algorithm, including deflate, snappy, lz4, etc..
Signed-off-by: Qiaowei Ren <qiaowei.ren@intel.com>
option(WITH_BLUEFS "libbluefs library" OFF)
+option(WITH_QATZIP "Enable QATZIP" OFF)
+if(WITH_QATZIP)
+ find_package(qatzip REQUIRED)
+ set(HAVE_QATZIP ${QATZIP_FOUND})
+endif(WITH_QATZIP)
+
# needs mds and? XXX
option(WITH_LIBCEPHFS "libcephfs client library" ON)
--- /dev/null
+# - Find Qatzip
+# Find the qatzip compression library and includes
+#
+# QATZIP_INCLUDE_DIR - where to find qatzip.h, etc.
+# QATZIP_LIBRARIES - List of libraries when using qatzip.
+# QATZIP_FOUND - True if qatzip found.
+
+find_path(QATZIP_INCLUDE_DIR NAMES qatzip.h)
+
+find_library(QATZIP_LIBRARIES NAMES qatzip)
+
+include(FindPackageHandleStandardArgs)
+find_package_handle_standard_args(qatzip DEFAULT_MSG QATZIP_LIBRARIES QATZIP_INCLUDE_DIR)
+
+mark_as_advanced(
+ QATZIP_LIBRARIES
+ QATZIP_INCLUDE_DIR)
if(NOT WITH_SYSTEM_BOOST)
list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
endif()
+if(HAVE_QATZIP)
+ list(APPEND ceph_common_deps ${QATZIP_LIBRARIES})
+endif()
set_source_files_properties(${CMAKE_SOURCE_DIR}/src/ceph_ver.c
${CMAKE_SOURCE_DIR}/src/common/version.cc
OPTION(compressor_zlib_isal, OPT_BOOL)
OPTION(compressor_zlib_level, OPT_INT) //regular zlib compression level, not applicable to isa-l optimized version
+OPTION(qat_compressor_enabled, OPT_BOOL)
+
OPTION(plugin_crypto_accelerator, OPT_STR)
OPTION(mempool_debug, OPT_BOOL)
.set_default(5)
.set_description(""),
+ Option("qat_compressor_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
+ .set_default(false)
+ .set_description("enable qat acceleration support for compression"),
+
Option("plugin_crypto_accelerator", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("crypto_isal")
.set_description(""),
-set(compressor_srcs
+set(compressor_srcs
Compressor.cc)
+if (HAVE_QATZIP)
+ list(APPEND compressor_srcs QatAccel.cc)
+endif()
add_library(compressor_objs OBJECT ${compressor_srcs})
## compressor plugins
if(WITH_EMBEDDED)
include(MergeStaticLibraries)
add_library(cephd_compressor_base STATIC ${compressor_srcs})
+ if(HAVE_QATZIP)
+ target_link_libraries(cephd_compressor_base ${QATZIP_LIBRARIES})
+ endif()
set_target_properties(cephd_compressor_base PROPERTIES COMPILE_DEFINITIONS BUILDING_FOR_EMBEDDED)
set(cephd_compressor_libs
cephd_compressor_base
#include "include/assert.h" // boost clobbers this
#include "include/buffer.h"
#include "include/int_types.h"
+#ifdef HAVE_QATZIP
+ #include "QatAccel.h"
+#endif
class Compressor;
typedef std::shared_ptr<Compressor> CompressorRef;
COMP_FORCE ///< compress always
};
+#ifdef HAVE_QATZIP
+ bool qat_enabled;
+ QatAccel qat_accel;
+#endif
+
static std::string get_comp_alg_name(int a);
static boost::optional<CompressionAlgorithm> get_comp_alg_type(const std::string &s);
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "QatAccel.h"
+
+/* Estimate data expansion after decompression */
+static const unsigned int expansion_ratio[] = {5, 20, 50, 100, 200};
+
+QatAccel::~QatAccel() {
+ if (NULL != session.internal) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ }
+}
+
+bool QatAccel::init(const std::string &alg) {
+ QzSessionParams_T params = {(QzHuffmanHdr_T)0,};
+ int rc;
+
+ rc = qzGetDefaults(¶ms);
+ if (rc != QZ_OK)
+ return false;
+ params.direction = QZ_DIR_BOTH;
+ if (alg == "snappy")
+ params.comp_algorithm = QZ_SNAPPY;
+ else if (alg == "zlib")
+ params.comp_algorithm = QZ_DEFLATE;
+ else if (alg == "lz4")
+ params.comp_algorithm = QZ_LZ4;
+ else
+ return false;
+
+ rc = qzSetDefaults(¶ms);
+ if (rc != QZ_OK)
+ return false;
+
+ rc = qzInit(&session, QZ_SW_BACKUP_DEFAULT);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW)
+ return false;
+
+ rc = qzSetupSession(&session, ¶ms);
+ if (rc != QZ_OK && rc != QZ_DUPLICATE && rc != QZ_NO_HW ) {
+ qzTeardownSession(&session);
+ qzClose(&session);
+ return false;
+ }
+
+ return true;
+}
+
+int QatAccel::compress(const bufferlist &in, bufferlist &out) {
+ for (auto &i : in.buffers()) {
+ const unsigned char* c_in = (unsigned char*) i.c_str();
+ unsigned int len = i.length();
+ unsigned int out_len = qzMaxCompressedLength(len);
+
+ bufferptr ptr = buffer::create_page_aligned(out_len);
+ int rc = qzCompress(&session, c_in, &len, (unsigned char *)ptr.c_str(), &out_len, 1);
+ if (rc != QZ_OK)
+ return -1;
+ out.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}
+
+int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
+ bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
+ return decompress(i, in.length(), out);
+}
+
+int QatAccel::decompress(bufferlist::iterator &p,
+ size_t compressed_len,
+ bufferlist &dst) {
+ unsigned int ratio_idx = 0;
+ bool read_more = false;
+ bool joint = false;
+ int rc = 0;
+ bufferlist tmp;
+ size_t remaining = MIN(p.get_remaining(), compressed_len);
+
+ while (remaining) {
+ if (p.end()) {
+ return -1;
+ }
+
+ bufferptr cur_ptr = p.get_current_ptr();
+ unsigned int len = cur_ptr.length();
+ if (joint) {
+ if (read_more)
+ tmp.append(cur_ptr.c_str(), len);
+ len = tmp.length();
+ }
+ unsigned int out_len = len * expansion_ratio[ratio_idx];
+ bufferptr ptr = buffer::create_page_aligned(out_len);
+
+ if (joint)
+ rc = qzDecompress(&session, (const unsigned char*)tmp.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ else
+ rc = qzDecompress(&session, (const unsigned char*)cur_ptr.c_str(), &len, (unsigned char*)ptr.c_str(), &out_len);
+ if (rc == QZ_DATA_ERROR) {
+ if (!joint) {
+ tmp.append(cur_ptr.c_str(), cur_ptr.length());
+ p.advance(remaining);
+ joint = true;
+ }
+ read_more = true;
+ continue;
+ } else if (rc == QZ_BUF_ERROR) {
+ if (ratio_idx == std::size(expansion_ratio))
+ return -1;
+ if (joint)
+ read_more = false;
+ ratio_idx++;
+ continue;
+ } else if (rc != QZ_OK) {
+ return -1;
+ } else {
+ ratio_idx = 0;
+ joint = false;
+ read_more = false;
+ }
+
+ p.advance(remaining);
+ remaining -= len;
+ dst.append(ptr, 0, out_len);
+ }
+
+ return 0;
+}
--- /dev/null
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2018 Intel Corporation
+ *
+ * Author: Qiaowei Ren <qiaowei.ren@intel.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_QATACCEL_H
+#define CEPH_QATACCEL_H
+
+#include <qatzip.h>
+#include "include/buffer.h"
+
+class QatAccel {
+ QzSession_T session;
+
+ public:
+ QatAccel() : session({0}) {}
+ ~QatAccel();
+
+ bool init(const std::string &alg);
+
+ int compress(const bufferlist &in, bufferlist &out);
+ int decompress(const bufferlist &in, bufferlist &out);
+ int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst);
+};
+
+#endif
int factory(CompressorRef *cs, std::ostream *ss) override {
if (compressor == 0) {
- LZ4Compressor *interface = new LZ4Compressor();
+ LZ4Compressor *interface = new LZ4Compressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;
#include "compressor/Compressor.h"
#include "include/buffer.h"
#include "include/encoding.h"
+#include "common/config.h"
#include "common/Tub.h"
class LZ4Compressor : public Compressor {
public:
- LZ4Compressor() : Compressor(COMP_ALG_LZ4, "lz4") {}
+ LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
int compress(const bufferlist &src, bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst);
+#endif
bufferptr outptr = buffer::create_page_aligned(
LZ4_compressBound(src.length()));
LZ4_stream_t lz4_stream;
}
int decompress(const bufferlist &src, bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst);
+#endif
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}
int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst);
+#endif
uint32_t count;
std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs;
decode(count, p);
std::ostream *ss) override
{
if (compressor == 0) {
- SnappyCompressor *interface = new SnappyCompressor();
+ SnappyCompressor *interface = new SnappyCompressor(cct);
compressor = CompressorRef(interface);
}
*cs = compressor;
#include <snappy.h>
#include <snappy-sinksource.h>
+#include "common/config.h"
#include "compressor/Compressor.h"
#include "include/buffer.h"
class SnappyCompressor : public Compressor {
public:
- SnappyCompressor() : Compressor(COMP_ALG_SNAPPY, "snappy") {}
+ SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
int compress(const bufferlist &src, bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(src, dst);
+#endif
BufferlistSource source(const_cast<bufferlist&>(src).begin(), src.length());
bufferptr ptr = buffer::create_page_aligned(
snappy::MaxCompressedLength(src.length()));
}
int decompress(const bufferlist &src, bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(src, dst);
+#endif
bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
return decompress(i, src.length(), dst);
}
int decompress(bufferlist::iterator &p,
size_t compressed_len,
bufferlist &dst) override {
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_len, dst);
+#endif
snappy::uint32 res_len = 0;
BufferlistSource source_1(p, compressed_len);
if (!snappy::GetUncompressedLength(&source_1, &res_len)) {
#include "arch/probe.h"
#include "arch/intel.h"
#include "arch/arm.h"
-#include "common/config.h"
#include "compressor/CompressionPlugin.h"
#include "ZlibCompressor.h"
int ZlibCompressor::compress(const bufferlist &in, bufferlist &out)
{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.compress(in, out);
+#endif
#if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
if (isal_enabled)
return isal_compress(in, out);
int ZlibCompressor::decompress(bufferlist::iterator &p, size_t compressed_size, bufferlist &out)
{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(p, compressed_size, out);
+#endif
+
int ret;
unsigned have;
z_stream strm;
int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out)
{
+#ifdef HAVE_QATZIP
+ if (qat_enabled)
+ return qat_accel.decompress(in, out);
+#endif
bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
return decompress(i, in.length(), out);
}
#ifndef CEPH_COMPRESSION_ZLIB_H
#define CEPH_COMPRESSION_ZLIB_H
+#include "common/config.h"
#include "compressor/Compressor.h"
class ZlibCompressor : public Compressor {
CephContext *const cct;
public:
ZlibCompressor(CephContext *cct, bool isal)
- : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {}
+ : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {
+#ifdef HAVE_QATZIP
+ if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
+ qat_enabled = true;
+ else
+ qat_enabled = false;
+#endif
+ }
int compress(const bufferlist &in, bufferlist &out) override;
int decompress(const bufferlist &in, bufferlist &out) override;
/* Defined if std::map::merge() is supported */
#cmakedefine HAVE_STDLIB_MAP_SPLICING
+/* Defined if Intel QAT compress/decompress is supported */
+#cmakedefine HAVE_QATZIP
+
#endif /* CONFIG_H */
}
#endif // __x86_64__
+
+#ifdef HAVE_QATZIP
+TEST(QAT, enc_qat_dec_noqat) {
+#ifdef HAVE_LZ4
+ const char* alg_collection[] = {"zlib", "lz4", "snappy"};
+#else
+ const char* alg_collection[] = {"zlib", "snappy"};
+#endif
+ for (auto alg : alg_collection) {
+ g_conf->set_val("qat_compressor_enabled", "true");
+ CompressorRef q = Compressor::create(g_ceph_context, alg);
+ g_conf->set_val("qat_compressor_enabled", "false");
+ CompressorRef noq = Compressor::create(g_ceph_context, alg);
+
+ // generate random buffer
+ for (int cnt=0; cnt<100; cnt++) {
+ srand(cnt + 1000);
+ int log2 = (rand()%18) + 1;
+ int size = (rand() % (1 << log2)) + 1;
+
+ char test[size];
+ for (int i=0; i<size; ++i)
+ test[i] = rand()%256;
+ bufferlist in, out;
+ in.append(test, size);
+
+ int res = q->compress(in, out);
+ EXPECT_EQ(res, 0);
+ bufferlist after;
+ res = noq->decompress(out, after);
+ EXPECT_EQ(res, 0);
+ bufferlist exp;
+ exp.append(test, size);
+ EXPECT_TRUE(exp.contents_equal(after));
+ }
+ }
+}
+
+TEST(QAT, enc_noqat_dec_qat) {
+#ifdef HAVE_LZ4
+ const char* alg_collection[] = {"zlib", "lz4", "snappy"};
+#else
+ const char* alg_collection[] = {"zlib", "snappy"};
+#endif
+ for (auto alg : alg_collection) {
+ g_conf->set_val("qat_compressor_enabled", "true");
+ CompressorRef q = Compressor::create(g_ceph_context, alg);
+ g_conf->set_val("qat_compressor_enabled", "false");
+ CompressorRef noq = Compressor::create(g_ceph_context, alg);
+
+ // generate random buffer
+ for (int cnt=0; cnt<100; cnt++) {
+ srand(cnt + 1000);
+ int log2 = (rand()%18) + 1;
+ int size = (rand() % (1 << log2)) + 1;
+
+ char test[size];
+ for (int i=0; i<size; ++i)
+ test[i] = rand()%256;
+ bufferlist in, out;
+ in.append(test, size);
+
+ int res = noq->compress(in, out);
+ EXPECT_EQ(res, 0);
+ bufferlist after;
+ res = q->decompress(out, after);
+ EXPECT_EQ(res, 0);
+ bufferlist exp;
+ exp.append(test, size);
+ EXPECT_TRUE(exp.contents_equal(after));
+ }
+ }
+}
+
+#endif // HAVE_QATZIP