From: Casey Bodley Date: Thu, 23 Nov 2023 04:32:40 +0000 (-0500) Subject: compressor: move QatAccel out of common X-Git-Tag: v19.3.0~58^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=5d2560762edca4a9006878b4e654b812f1d7d04d;p=ceph.git compressor: move QatAccel out of common move the QatAccel instance out of the Compressor base class and into the zlib and lz4 compressors that can use it this avoids having to link QAT into the ceph-common library, and only the plugins where it's necessary had to add LZ4Compressor.cc to store the new static variable Signed-off-by: Casey Bodley --- diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 23196619eb8f..df2f3b6c9735 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -505,11 +505,6 @@ if(NOT WITH_SYSTEM_BOOST) list(APPEND ceph_common_deps ${ZLIB_LIBRARIES}) endif() -if(HAVE_QATZIP) - # TODO: only the compression plugins should depend on QAT - list(APPEND ceph_common_deps QAT::zip) -endif() - if(WITH_DPDK) list(APPEND ceph_common_deps common_async_dpdk) endif() diff --git a/src/compressor/CMakeLists.txt b/src/compressor/CMakeLists.txt index d9512e87408e..0da71aa1f1b4 100644 --- a/src/compressor/CMakeLists.txt +++ b/src/compressor/CMakeLists.txt @@ -1,19 +1,15 @@ - -set(compressor_srcs - Compressor.cc) -if (HAVE_QATZIP) - list(APPEND compressor_srcs QatAccel.cc) -endif() -add_library(compressor_objs OBJECT ${compressor_srcs}) +add_library(compressor_objs OBJECT Compressor.cc) add_dependencies(compressor_objs common-objs) +add_dependencies(compressor_objs legacy-option-headers) + if(HAVE_QATZIP AND HAVE_QAT) - target_link_libraries(compressor_objs PRIVATE + add_library(qat_compressor OBJECT QatAccel.cc) + target_link_libraries(qat_compressor PUBLIC QAT::qat QAT::usdm QAT::zip ) endif() -add_dependencies(compressor_objs legacy-option-headers) ## compressor plugins @@ -31,8 +27,8 @@ if(HAVE_BROTLI) add_subdirectory(brotli) endif() -add_library(compressor STATIC $) -target_link_libraries(compressor PRIVATE compressor_objs) +add_library(compressor STATIC) +target_link_libraries(compressor PUBLIC compressor_objs) set(ceph_compressor_libs ceph_snappy diff --git a/src/compressor/Compressor.cc b/src/compressor/Compressor.cc index 43d34c8eb01e..a13dfb30ddc7 100644 --- a/src/compressor/Compressor.cc +++ b/src/compressor/Compressor.cc @@ -26,10 +26,6 @@ namespace TOPNSPC { -#ifdef HAVE_QATZIP - QatAccel Compressor::qat_accel; -#endif - const char* Compressor::get_comp_alg_name(int a) { auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms), diff --git a/src/compressor/Compressor.h b/src/compressor/Compressor.h index 276cd875a9a8..11f020a0dd24 100644 --- a/src/compressor/Compressor.h +++ b/src/compressor/Compressor.h @@ -23,9 +23,6 @@ #include "include/common_fwd.h" #include "include/buffer.h" #include "include/int_types.h" -#ifdef HAVE_QATZIP - #include "QatAccel.h" -#endif namespace TOPNSPC { @@ -70,11 +67,6 @@ public: COMP_FORCE ///< compress always }; -#ifdef HAVE_QATZIP - bool qat_enabled; - static QatAccel qat_accel; -#endif - static const char* get_comp_alg_name(int a); static std::optional get_comp_alg_type(std::string_view s); diff --git a/src/compressor/lz4/CMakeLists.txt b/src/compressor/lz4/CMakeLists.txt index ff8e14c298c7..316493435aa6 100644 --- a/src/compressor/lz4/CMakeLists.txt +++ b/src/compressor/lz4/CMakeLists.txt @@ -2,11 +2,15 @@ set(lz4_sources CompressionPluginLZ4.cc + LZ4Compressor.cc ) add_library(ceph_lz4 SHARED ${lz4_sources}) target_link_libraries(ceph_lz4 PRIVATE LZ4::LZ4 compressor $<$:ceph-common>) +if(HAVE_QATZIP AND HAVE_QAT) + target_link_libraries(ceph_lz4 PRIVATE qat_compressor) +endif() set_target_properties(ceph_lz4 PROPERTIES VERSION 2.0.0 SOVERSION 2 diff --git a/src/compressor/lz4/LZ4Compressor.cc b/src/compressor/lz4/LZ4Compressor.cc new file mode 100644 index 000000000000..a209a5ac149f --- /dev/null +++ b/src/compressor/lz4/LZ4Compressor.cc @@ -0,0 +1,149 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright contributors to the Ceph project + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "LZ4Compressor.h" +#include "common/ceph_context.h" +#ifdef HAVE_QATZIP + #include "compressor/QatAccel.h" +#endif + +#ifdef HAVE_QATZIP +QatAccel LZ4Compressor::qat_accel; +#endif + +LZ4Compressor::LZ4Compressor(CephContext* cct) + : Compressor(COMP_ALG_LZ4, "lz4") +{ +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) + qat_enabled = true; + else + qat_enabled = false; +#endif +} + +int LZ4Compressor::compress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional &compressor_message) +{ + // older versions of liblz4 introduce bit errors when compressing + // fragmented buffers. this was fixed in lz4 commit + // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first + // appeared in v1.8.2. + // + // workaround: rebuild if not contiguous. + if (!src.is_contiguous()) { + ceph::buffer::list new_src = src; + new_src.rebuild(); + return compress(new_src, dst, compressor_message); + } + +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.compress(src, dst, compressor_message); +#endif + ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( + LZ4_compressBound(src.length())); + LZ4_stream_t lz4_stream; + LZ4_resetStream(&lz4_stream); + + using ceph::encode; + + auto p = src.begin(); + size_t left = src.length(); + int pos = 0; + const char *data; + unsigned num = src.get_num_buffers(); + encode((uint32_t)num, dst); + while (left) { + uint32_t origin_len = p.get_ptr_and_advance(left, &data); + int compressed_len = LZ4_compress_fast_continue( + &lz4_stream, data, outptr.c_str()+pos, origin_len, + outptr.length()-pos, 1); + if (compressed_len <= 0) + return -1; + pos += compressed_len; + left -= origin_len; + encode(origin_len, dst); + encode((uint32_t)compressed_len, dst); + } + ceph_assert(p.end()); + + dst.append(outptr, 0, pos); + return 0; +} + +int LZ4Compressor::decompress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional compressor_message) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(src, dst, compressor_message); +#endif + auto i = std::cbegin(src); + return decompress(i, src.length(), dst, compressor_message); +} + +int LZ4Compressor::decompress(ceph::buffer::list::const_iterator &p, + size_t compressed_len, + ceph::buffer::list &dst, + std::optional compressor_message) +{ +#ifdef HAVE_QATZIP + if (qat_enabled) + return qat_accel.decompress(p, compressed_len, dst, compressor_message); +#endif + using ceph::decode; + uint32_t count; + decode(count, p); + std::vector > compressed_pairs(count); + uint32_t total_origin = 0; + for (auto& [dst_size, src_size] : compressed_pairs) { + decode(dst_size, p); + decode(src_size, p); + total_origin += dst_size; + } + compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); + + ceph::buffer::ptr dstptr(total_origin); + LZ4_streamDecode_t lz4_stream_decode; + LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); + + ceph::buffer::ptr cur_ptr = p.get_current_ptr(); + ceph::buffer::ptr *ptr = &cur_ptr; + std::optional data_holder; + if (compressed_len != cur_ptr.length()) { + data_holder.emplace(compressed_len); + p.copy_deep(compressed_len, *data_holder); + ptr = &*data_holder; + } + + char *c_in = ptr->c_str(); + char *c_out = dstptr.c_str(); + for (unsigned i = 0; i < count; ++i) { + int r = LZ4_decompress_safe_continue( + &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); + if (r == (int)compressed_pairs[i].first) { + c_in += compressed_pairs[i].second; + c_out += compressed_pairs[i].first; + } else if (r < 0) { + return -1; + } else { + return -2; + } + } + dst.push_back(std::move(dstptr)); + return 0; +} diff --git a/src/compressor/lz4/LZ4Compressor.h b/src/compressor/lz4/LZ4Compressor.h index eca08e1a57ac..6939aae7609a 100644 --- a/src/compressor/lz4/LZ4Compressor.h +++ b/src/compressor/lz4/LZ4Compressor.h @@ -23,125 +23,29 @@ #include "include/encoding.h" #include "common/config.h" +class QatAccel; class LZ4Compressor : public Compressor { - public: - LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") { #ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4")) - qat_enabled = true; - else - qat_enabled = false; + bool qat_enabled; + static QatAccel qat_accel; #endif - } - - int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional &compressor_message) override { - // older versions of liblz4 introduce bit errors when compressing - // fragmented buffers. this was fixed in lz4 commit - // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first - // appeared in v1.8.2. - // - // workaround: rebuild if not contiguous. - if (!src.is_contiguous()) { - ceph::buffer::list new_src = src; - new_src.rebuild(); - return compress(new_src, dst, compressor_message); - } -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.compress(src, dst, compressor_message); -#endif - ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned( - LZ4_compressBound(src.length())); - LZ4_stream_t lz4_stream; - LZ4_resetStream(&lz4_stream); - - using ceph::encode; - - auto p = src.begin(); - size_t left = src.length(); - int pos = 0; - const char *data; - unsigned num = src.get_num_buffers(); - encode((uint32_t)num, dst); - while (left) { - uint32_t origin_len = p.get_ptr_and_advance(left, &data); - int compressed_len = LZ4_compress_fast_continue( - &lz4_stream, data, outptr.c_str()+pos, origin_len, - outptr.length()-pos, 1); - if (compressed_len <= 0) - return -1; - pos += compressed_len; - left -= origin_len; - encode(origin_len, dst); - encode((uint32_t)compressed_len, dst); - } - ceph_assert(p.end()); + public: + explicit LZ4Compressor(CephContext* cct); - dst.append(outptr, 0, pos); - return 0; - } + int compress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional &compressor_message) override; - int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(src, dst, compressor_message); -#endif - auto i = std::cbegin(src); - return decompress(i, src.length(), dst, compressor_message); - } + int decompress(const ceph::buffer::list &src, + ceph::buffer::list &dst, + std::optional compressor_message) override; int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &dst, - std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(p, compressed_len, dst, compressor_message); -#endif - using ceph::decode; - uint32_t count; - decode(count, p); - std::vector > compressed_pairs(count); - uint32_t total_origin = 0; - for (auto& [dst_size, src_size] : compressed_pairs) { - decode(dst_size, p); - decode(src_size, p); - total_origin += dst_size; - } - compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2); - - ceph::buffer::ptr dstptr(total_origin); - LZ4_streamDecode_t lz4_stream_decode; - LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0); - - ceph::buffer::ptr cur_ptr = p.get_current_ptr(); - ceph::buffer::ptr *ptr = &cur_ptr; - std::optional data_holder; - if (compressed_len != cur_ptr.length()) { - data_holder.emplace(compressed_len); - p.copy_deep(compressed_len, *data_holder); - ptr = &*data_holder; - } - - char *c_in = ptr->c_str(); - char *c_out = dstptr.c_str(); - for (unsigned i = 0; i < count; ++i) { - int r = LZ4_decompress_safe_continue( - &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first); - if (r == (int)compressed_pairs[i].first) { - c_in += compressed_pairs[i].second; - c_out += compressed_pairs[i].first; - } else if (r < 0) { - return -1; - } else { - return -2; - } - } - dst.push_back(std::move(dstptr)); - return 0; - } + std::optional compressor_message) override; }; #endif diff --git a/src/compressor/snappy/SnappyCompressor.h b/src/compressor/snappy/SnappyCompressor.h index 8150f783c157..b635581068ae 100644 --- a/src/compressor/snappy/SnappyCompressor.h +++ b/src/compressor/snappy/SnappyCompressor.h @@ -58,19 +58,9 @@ class CEPH_BUFFER_API BufferlistSource : public snappy::Source { class SnappyCompressor : public Compressor { public: SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") { -#ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy")) - qat_enabled = true; - else - qat_enabled = false; -#endif } int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional &compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.compress(src, dst, compressor_message); -#endif BufferlistSource source(const_cast(src).begin(), src.length()); ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned( snappy::MaxCompressedLength(src.length())); @@ -81,10 +71,6 @@ class SnappyCompressor : public Compressor { } int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(src, dst, compressor_message); -#endif auto i = src.begin(); return decompress(i, src.length(), dst, compressor_message); } @@ -93,10 +79,6 @@ class SnappyCompressor : public Compressor { size_t compressed_len, ceph::bufferlist &dst, std::optional compressor_message) override { -#ifdef HAVE_QATZIP - if (qat_enabled) - return qat_accel.decompress(p, compressed_len, dst, compressor_message); -#endif BufferlistSource source_1(p, compressed_len); uint32_t res_len = 0; if (!snappy::GetUncompressedLength(&source_1, &res_len)) { diff --git a/src/compressor/zlib/CMakeLists.txt b/src/compressor/zlib/CMakeLists.txt index 050ff03fa28f..3480ab068c96 100644 --- a/src/compressor/zlib/CMakeLists.txt +++ b/src/compressor/zlib/CMakeLists.txt @@ -91,6 +91,9 @@ endif() add_library(ceph_zlib SHARED ${zlib_sources}) target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$:ceph-common>) +if(HAVE_QATZIP AND HAVE_QAT) + target_link_libraries(ceph_zlib qat_compressor) +endif() target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include") set_target_properties(ceph_zlib PROPERTIES VERSION 2.0.0 diff --git a/src/compressor/zlib/ZlibCompressor.cc b/src/compressor/zlib/ZlibCompressor.cc index 9795d79b3ba7..2a0aa006901e 100644 --- a/src/compressor/zlib/ZlibCompressor.cc +++ b/src/compressor/zlib/ZlibCompressor.cc @@ -17,6 +17,9 @@ #include "ZlibCompressor.h" #include "osd/osd_types.h" #include "isa-l/include/igzip_lib.h" +#ifdef HAVE_QATZIP + #include "compressor/QatAccel.h" +#endif // ----------------------------------------------------------------------------- #include @@ -52,6 +55,21 @@ _prefix(std::ostream* _dout) // compression ratio. #define ZLIB_MEMORY_LEVEL 8 +#ifdef HAVE_QATZIP +QatAccel ZlibCompressor::qat_accel; +#endif + +ZlibCompressor::ZlibCompressor(CephContext *cct, bool isal) + : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) +{ +#ifdef HAVE_QATZIP + if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) + qat_enabled = true; + else + qat_enabled = false; +#endif +} + int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, std::optional &compressor_message) { int ret; diff --git a/src/compressor/zlib/ZlibCompressor.h b/src/compressor/zlib/ZlibCompressor.h index da1c8117e882..33b3ea4d4603 100644 --- a/src/compressor/zlib/ZlibCompressor.h +++ b/src/compressor/zlib/ZlibCompressor.h @@ -20,19 +20,18 @@ #include "common/config.h" #include "compressor/Compressor.h" +class QatAccel; + class ZlibCompressor : public Compressor { bool isal_enabled; CephContext *const cct; -public: - ZlibCompressor(CephContext *cct, bool isal) - : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) { #ifdef HAVE_QATZIP - if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib")) - qat_enabled = true; - else - qat_enabled = false; + bool qat_enabled; + static QatAccel qat_accel; #endif - } + + public: + ZlibCompressor(CephContext *cct, bool isal); int compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional &compressor_message) override; int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional compressor_message) override;