From 5d2560762edca4a9006878b4e654b812f1d7d04d Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 22 Nov 2023 23:32:40 -0500 Subject: [PATCH] compressor: move QatAccel out of common move the QatAccel instance out of the Compressor base class and into the zlib and lz4 compressors that can use it this avoids having to link QAT into the ceph-common library, and only the plugins where it's necessary had to add LZ4Compressor.cc to store the new static variable Signed-off-by: Casey Bodley --- src/CMakeLists.txt | 5 - src/compressor/CMakeLists.txt | 18 ++- src/compressor/Compressor.cc | 4 - src/compressor/Compressor.h | 8 -- src/compressor/lz4/CMakeLists.txt | 4 + src/compressor/lz4/LZ4Compressor.cc | 149 +++++++++++++++++++++++ src/compressor/lz4/LZ4Compressor.h | 120 ++---------------- src/compressor/snappy/SnappyCompressor.h | 18 --- src/compressor/zlib/CMakeLists.txt | 3 + src/compressor/zlib/ZlibCompressor.cc | 18 +++ src/compressor/zlib/ZlibCompressor.h | 15 ++- 11 files changed, 200 insertions(+), 162 deletions(-) create mode 100644 src/compressor/lz4/LZ4Compressor.cc diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 23196619eb8..df2f3b6c973 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -505,11 +505,6 @@ if(NOT WITH_SYSTEM_BOOST) list(APPEND ceph_common_deps ${ZLIB_LIBRARIES}) endif() -if(HAVE_QATZIP) - # TODO: only the compression plugins should depend on QAT - list(APPEND ceph_common_deps QAT::zip) -endif() - if(WITH_DPDK) list(APPEND ceph_common_deps common_async_dpdk) endif() diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt index d9512e87408..0da71aa1f1b 100644 --- a/src/compressor/CMakeLists.txt +++ b/src/compressor/CMakeLists.txt @@ -1,19 +1,15 @@ - -set(compressor_srcs - Compressor.cc) -if (HAVE_QATZIP) - list(APPEND compressor_srcs QatAccel.cc) -endif() -add_library(compressor_objs OBJECT ${compressor_srcs}) +add_library(compressor_objs OBJECT Compressor.cc) add_dependencies(compressor_objs common-objs) +add_dependencies(compressor_objs legacy-option-headers) + if(HAVE_QATZIP AND HAVE_QAT) - target_link_libraries(compressor_objs PRIVATE + add_library(qat_compressor OBJECT QatAccel.cc) + target_link_libraries(qat_compressor PUBLIC QAT::qat QAT::usdm QAT::zip ) endif() -add_dependencies(compressor_objs legacy-option-headers) ## compressor plugins @@ -31,8 +27,8 @@ if(HAVE_BROTLI) add_subdirectory(brotli) endif() -add_library(compressor STATIC $) -target_link_libraries(compressor PRIVATE compressor_objs) +add_library(compressor STATIC) +target_link_libraries(compressor PUBLIC compressor_objs) set(ceph_compressor_libs ceph_snappy diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index 43d34c8eb01..a13dfb30ddc 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -26,10 +26,6 @@ namespace TOPNSPC { -#ifdef HAVE_QATZIP - QatAccel Compressor::qat_accel; -#endif - const char* Compressor::get_comp_alg_name(int a) { auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 276cd875a9a..11f020a0dd2 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -23,9 +23,6 @@ #include "include/common_fwd.h" #include "include/buffer.h" #include "include/int_types.h" -#ifdef HAVE_QATZIP - #include "QatAccel.h" -#endif namespace TOPNSPC { @@ -70,11 +67,6 @@ public: COMP_FORCE ///< compress always }; -#ifdef HAVE_QATZIP - bool qat_enabled; - static QatAccel qat_accel; -#endif - static const char* get_comp_alg_name(int a); static std::optional get_comp_alg_type(std::string_view s); diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt index ff8e14c298c..316493435aa 100644 --- a/src/compressor/lz4/CMakeLists.txt +++ b/src/compressor/lz4/CMakeLists.txt @@ -2,11 +2,15 @@ set(lz4_sources CompressionPluginLZ4.cc + LZ4Compressor.cc ) add_library(ceph_lz4 SHARED ${lz4_sources}) target_link_libraries(ceph_lz4 PRIVATE LZ4::LZ4 compressor $<$:ceph-common>) +if(HAVE_QATZIP AND HAVE_QAT) + target_link_libraries(ceph_lz4 PRIVATE qat_compressor) +endif() set_target_properties(ceph_lz4 PROPERTIES VERSION 2.0.0 SOVERSION 2 diff --git a/src/compressor/lz4/LZ4Compressor.cc b/src/compressor/lz4/LZ4Compressor.cc new file mode 100644 index 00000000000..a209a5ac149 --- /dev/null +++ b/src/compressor/lz4/LZ4Compressor.cc @@ -0,0 +1,149 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "LZ4Compressor.h" +#include "common/ceph_context.h" +#ifdef HAVE_QATZIP + #include "compressor/QatAccel.h" +#endif + +#ifdef HAVE_QATZIP +QatAccel LZ4Compressor::qat_accel; +#endif + +LZ4Compressor::LZ4Compressor(CephContext* cct) + : Compressor(COMP_ALG_LZ4, "lz4") +{ +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) + qat_enabled = true; + else + qat_enabled = false; +#endif +} + +int LZ4Compressor::compress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional &compressor_message) +{ + // older versions of liblz4 introduce bit errors when compressing + // fragmented buffers. this was fixed in lz4 commit + // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first + // appeared in v1.8.2. + // + // workaround: rebuild if not contiguous. + if (!src.is_contiguous()) { + ceph::buffer::list new_src = src; + new_src.rebuild(); + return compress(new_src, dst, compressor_message); + } + +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst, compressor_message); +#endif + ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( + LZ4_compressBound(src.length())); + LZ4_stream_t lz4_stream; + LZ4_resetStream(&lz4_stream); + + using ceph::encode; + + auto p = src.begin(); + size_t left = src.length(); + int pos = 0; + const char *data; + unsigned num = src.get_num_buffers(); + encode((uint32_t)num, dst); + while (left) { + uint32_t origin_len = p.get_ptr_and_advance(left, &data); + int compressed_len = LZ4_compress_fast_continue( + &lz4_stream, data, outptr.c_str()+pos, origin_len, + outptr.length()-pos, 1); + if (compressed_len <= 0) + return -1; + pos += compressed_len; + left -= origin_len; + encode(origin_len, dst); + encode((uint32_t)compressed_len, dst); + } + ceph_assert(p.end()); + + dst.append(outptr, 0, pos); + return 0; +} + +int LZ4Compressor::decompress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional compressor_message) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst, compressor_message); +#endif + auto i = std::cbegin(src); + return decompress(i, src.length(), dst, compressor_message); +} + +int LZ4Compressor::decompress(ceph::buffer::list::const_iterator &p, + size_t compressed_len, + ceph::buffer::list &dst, + std::optional compressor_message) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst, compressor_message); +#endif + using ceph::decode; + uint32_t count; + decode(count, p); + std::vector > compressed_pairs(count); + uint32_t total_origin = 0; + for (auto& [dst_size, src_size] : compressed_pairs) { + decode(dst_size, p); + decode(src_size, p); + total_origin += dst_size; + } + compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); + + ceph::buffer::ptr dstptr(total_origin); + LZ4_streamDecode_t lz4_stream_decode; + LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); + + ceph::buffer::ptr cur_ptr = p.get_current_ptr(); + ceph::buffer::ptr *ptr = &cur_ptr; + std::optional data_holder; + if (compressed_len != cur_ptr.length()) { + data_holder.emplace(compressed_len); + p.copy_deep(compressed_len, *data_holder); + ptr = &*data_holder; + } + + char *c_in = ptr->c_str(); + char *c_out = dstptr.c_str(); + for (unsigned i = 0; i < count; ++i) { + int r = LZ4_decompress_safe_continue( + &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); + if (r == (int)compressed_pairs[i].first) { + c_in += compressed_pairs[i].second; + c_out += compressed_pairs[i].first; + } else if (r < 0) { + return -1; + } else { + return -2; + } + } + dst.push_back(std::move(dstptr)); + return 0; +} diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h index eca08e1a57a..6939aae7609 100644 --- a/src/compressor/lz4/LZ4Compressor.h +++ b/src/compressor/lz4/LZ4Compressor.h @@ -23,125 +23,29 @@ #include "include/encoding.h" #include "common/config.h" +class QatAccel; class LZ4Compressor : public Compressor { - public: - LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { #ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) - qat_enabled = true; - else - qat_enabled = false; + bool qat_enabled; + static QatAccel qat_accel; #endif - } - - int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional &compressor_message) override { - // older versions of liblz4 introduce bit errors when compressing - // fragmented buffers. this was fixed in lz4 commit - // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first - // appeared in v1.8.2. - // - // workaround: rebuild if not contiguous. - if (!src.is_contiguous()) { - ceph::buffer::list new_src = src; - new_src.rebuild(); - return compress(new_src, dst, compressor_message); - } -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.compress(src, dst, compressor_message); -#endif - ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( - LZ4_compressBound(src.length())); - LZ4_stream_t lz4_stream; - LZ4_resetStream(&lz4_stream); - - using ceph::encode; - - auto p = src.begin(); - size_t left = src.length(); - int pos = 0; - const char *data; - unsigned num = src.get_num_buffers(); - encode((uint32_t)num, dst); - while (left) { - uint32_t origin_len = p.get_ptr_and_advance(left, &data); - int compressed_len = LZ4_compress_fast_continue( - &lz4_stream, data, outptr.c_str()+pos, origin_len, - outptr.length()-pos, 1); - if (compressed_len <= 0) - return -1; - pos += compressed_len; - left -= origin_len; - encode(origin_len, dst); - encode((uint32_t)compressed_len, dst); - } - ceph_assert(p.end()); + public: + explicit LZ4Compressor(CephContext* cct); - dst.append(outptr, 0, pos); - return 0; - } + int compress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional &compressor_message) override; - int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(src, dst, compressor_message); -#endif - auto i = std::cbegin(src); - return decompress(i, src.length(), dst, compressor_message); - } + int decompress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional compressor_message) override; int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &dst, - std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(p, compressed_len, dst, compressor_message); -#endif - using ceph::decode; - uint32_t count; - decode(count, p); - std::vector > compressed_pairs(count); - uint32_t total_origin = 0; - for (auto& [dst_size, src_size] : compressed_pairs) { - decode(dst_size, p); - decode(src_size, p); - total_origin += dst_size; - } - compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); - - ceph::buffer::ptr dstptr(total_origin); - LZ4_streamDecode_t lz4_stream_decode; - LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); - - ceph::buffer::ptr cur_ptr = p.get_current_ptr(); - ceph::buffer::ptr *ptr = &cur_ptr; - std::optional data_holder; - if (compressed_len != cur_ptr.length()) { - data_holder.emplace(compressed_len); - p.copy_deep(compressed_len, *data_holder); - ptr = &*data_holder; - } - - char *c_in = ptr->c_str(); - char *c_out = dstptr.c_str(); - for (unsigned i = 0; i < count; ++i) { - int r = LZ4_decompress_safe_continue( - &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); - if (r == (int)compressed_pairs[i].first) { - c_in += compressed_pairs[i].second; - c_out += compressed_pairs[i].first; - } else if (r < 0) { - return -1; - } else { - return -2; - } - } - dst.push_back(std::move(dstptr)); - return 0; - } + std::optional compressor_message) override; }; #endif diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h index 8150f783c15..b635581068a 100644 --- a/src/compressor/snappy/SnappyCompressor.h +++ b/src/compressor/snappy/SnappyCompressor.h @@ -58,19 +58,9 @@ class CEPH_BUFFER_API BufferlistSource : public snappy::Source { class SnappyCompressor : public Compressor { public: SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") { -#ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy")) - qat_enabled = true; - else - qat_enabled = false; -#endif } int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional &compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.compress(src, dst, compressor_message); -#endif BufferlistSource source(const_cast(src).begin(), src.length()); ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned( snappy::MaxCompressedLength(src.length())); @@ -81,10 +71,6 @@ class SnappyCompressor : public Compressor { } int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(src, dst, compressor_message); -#endif auto i = src.begin(); return decompress(i, src.length(), dst, compressor_message); } @@ -93,10 +79,6 @@ class SnappyCompressor : public Compressor { size_t compressed_len, ceph::bufferlist &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(p, compressed_len, dst, compressor_message); -#endif BufferlistSource source_1(p, compressed_len); uint32_t res_len = 0; if (!snappy::GetUncompressedLength(&source_1, &res_len)) { diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt index 050ff03fa28..3480ab068c9 100644 --- a/src/compressor/zlib/CMakeLists.txt +++ b/src/compressor/zlib/CMakeLists.txt @@ -91,6 +91,9 @@ endif() add_library(ceph_zlib SHARED ${zlib_sources}) target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$:ceph-common>) +if(HAVE_QATZIP AND HAVE_QAT) + target_link_libraries(ceph_zlib qat_compressor) +endif() target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include") set_target_properties(ceph_zlib PROPERTIES VERSION 2.0.0 diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc index 9795d79b3ba..2a0aa006901 100644 --- a/src/compressor/zlib/ZlibCompressor.cc +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -17,6 +17,9 @@ #include "ZlibCompressor.h" #include "osd/osd_types.h" #include "isa-l/include/igzip_lib.h" +#ifdef HAVE_QATZIP + #include "compressor/QatAccel.h" +#endif // ----------------------------------------------------------------------------- #include @@ -52,6 +55,21 @@ _prefix(std::ostream* _dout) // compression ratio. #define ZLIB_MEMORY_LEVEL 8 +#ifdef HAVE_QATZIP +QatAccel ZlibCompressor::qat_accel; +#endif + +ZlibCompressor::ZlibCompressor(CephContext *cct, bool isal) + : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) +{ +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) + qat_enabled = true; + else + qat_enabled = false; +#endif +} + int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, std::optional &compressor_message) { int ret; diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h index da1c8117e88..33b3ea4d460 100644 --- a/src/compressor/zlib/ZlibCompressor.h +++ b/src/compressor/zlib/ZlibCompressor.h @@ -20,19 +20,18 @@ #include "common/config.h" #include "compressor/Compressor.h" +class QatAccel; + class ZlibCompressor : public Compressor { bool isal_enabled; CephContext *const cct; -public: - ZlibCompressor(CephContext *cct, bool isal) - : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) { #ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) - qat_enabled = true; - else - qat_enabled = false; + bool qat_enabled; + static QatAccel qat_accel; #endif - } + + public: + ZlibCompressor(CephContext *cct, bool isal); int compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional &compressor_message) override; int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional compressor_message) override; -- 2.47.3