]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor: move QatAccel out of common
authorCasey Bodley <cbodley@redhat.com>
Thu, 23 Nov 2023 04:32:40 +0000 (23:32 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 7 Feb 2024 16:13:21 +0000 (11:13 -0500)
move the QatAccel instance out of the Compressor base class and into
the zlib and lz4 compressors that can use it

this avoids having to link QAT into the ceph-common library, and only
the plugins where it's necessary

had to add LZ4Compressor.cc to store the new static variable

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/CMakeLists.txt
src/compressor/CMakeLists.txt
src/compressor/Compressor.cc
src/compressor/Compressor.h
src/compressor/lz4/CMakeLists.txt
src/compressor/lz4/LZ4Compressor.cc [new file with mode: 0644]
src/compressor/lz4/LZ4Compressor.h
src/compressor/snappy/SnappyCompressor.h
src/compressor/zlib/CMakeLists.txt
src/compressor/zlib/ZlibCompressor.cc
src/compressor/zlib/ZlibCompressor.h

index 23196619eb8fc971f3967d007133aac26318a36a..df2f3b6c9735829af48a39a884d7bb491781a2cd 100644 (file)
@@ -505,11 +505,6 @@ if(NOT WITH_SYSTEM_BOOST)
   list(APPEND ceph_common_deps ${ZLIB_LIBRARIES})
 endif()
 
-if(HAVE_QATZIP)
-  # TODO: only the compression plugins should depend on QAT
-  list(APPEND ceph_common_deps QAT::zip)
-endif()
-
 if(WITH_DPDK)
   list(APPEND ceph_common_deps common_async_dpdk)
 endif()
index d9512e87408e06b1a900acc6ffa716e7bd28d688..0da71aa1f1b4ce086bdccdb3049f475163162361 100644 (file)
@@ -1,19 +1,15 @@
-
-set(compressor_srcs
-  Compressor.cc)
-if (HAVE_QATZIP)
-  list(APPEND compressor_srcs QatAccel.cc)
-endif()
-add_library(compressor_objs OBJECT ${compressor_srcs})
+add_library(compressor_objs OBJECT Compressor.cc)
 add_dependencies(compressor_objs common-objs)
+add_dependencies(compressor_objs legacy-option-headers)
+
 if(HAVE_QATZIP AND HAVE_QAT)
-  target_link_libraries(compressor_objs PRIVATE
+  add_library(qat_compressor OBJECT QatAccel.cc)
+  target_link_libraries(qat_compressor PUBLIC
                         QAT::qat
                         QAT::usdm
                         QAT::zip
                        )
 endif()
-add_dependencies(compressor_objs legacy-option-headers)
 
 ## compressor plugins
 
@@ -31,8 +27,8 @@ if(HAVE_BROTLI)
   add_subdirectory(brotli)
 endif()
 
-add_library(compressor STATIC $<TARGET_OBJECTS:compressor_objs>)
-target_link_libraries(compressor PRIVATE compressor_objs)
+add_library(compressor STATIC)
+target_link_libraries(compressor PUBLIC compressor_objs)
 
 set(ceph_compressor_libs
     ceph_snappy
index 43d34c8eb01e4991bd2c996302563d69a0276681..a13dfb30ddc786adc4633a7f1e25819c7946c9db 100644 (file)
 
 namespace TOPNSPC {
 
-#ifdef HAVE_QATZIP
-  QatAccel Compressor::qat_accel;
-#endif
-
 const char* Compressor::get_comp_alg_name(int a) {
 
   auto p = std::find_if(std::cbegin(compression_algorithms), std::cend(compression_algorithms),
index 276cd875a9a8c5d55f3c84102c01e524797cd563..11f020a0dd247fc9b5eb6bd6b75a38af56d98287 100644 (file)
@@ -23,9 +23,6 @@
 #include "include/common_fwd.h"
 #include "include/buffer.h"
 #include "include/int_types.h"
-#ifdef HAVE_QATZIP
-  #include "QatAccel.h"
-#endif
 
 namespace TOPNSPC {
 
@@ -70,11 +67,6 @@ public:
     COMP_FORCE                  ///< compress always
   };
 
-#ifdef HAVE_QATZIP
-  bool qat_enabled;
-  static QatAccel qat_accel;
-#endif
-
   static const char* get_comp_alg_name(int a);
   static std::optional<CompressionAlgorithm> get_comp_alg_type(std::string_view s);
 
index ff8e14c298c730ae396835e3f3630c1b8acec29a..316493435aa6643747d108d42fab14de1507b664 100644 (file)
@@ -2,11 +2,15 @@
 
 set(lz4_sources
   CompressionPluginLZ4.cc
+  LZ4Compressor.cc
 )
 
 add_library(ceph_lz4 SHARED ${lz4_sources})
 target_link_libraries(ceph_lz4
   PRIVATE LZ4::LZ4 compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+if(HAVE_QATZIP AND HAVE_QAT)
+  target_link_libraries(ceph_lz4 PRIVATE qat_compressor)
+endif()
 set_target_properties(ceph_lz4 PROPERTIES
   VERSION 2.0.0
   SOVERSION 2
diff --git a/src/compressor/lz4/LZ4Compressor.cc b/src/compressor/lz4/LZ4Compressor.cc
new file mode 100644 (file)
index 0000000..a209a5a
--- /dev/null
@@ -0,0 +1,149 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright contributors to the Ceph project
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "LZ4Compressor.h"
+#include "common/ceph_context.h"
+#ifdef HAVE_QATZIP
+  #include "compressor/QatAccel.h"
+#endif
+
+#ifdef HAVE_QATZIP
+QatAccel LZ4Compressor::qat_accel;
+#endif
+
+LZ4Compressor::LZ4Compressor(CephContext* cct)
+  : Compressor(COMP_ALG_LZ4, "lz4")
+{
+#ifdef HAVE_QATZIP
+  if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
+    qat_enabled = true;
+  else
+    qat_enabled = false;
+#endif
+}
+
+int LZ4Compressor::compress(const ceph::buffer::list &src,
+                            ceph::buffer::list &dst,
+                            std::optional<int32_t> &compressor_message)
+{
+  // older versions of liblz4 introduce bit errors when compressing
+  // fragmented buffers.  this was fixed in lz4 commit
+  // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
+  // appeared in v1.8.2.
+  //
+  // workaround: rebuild if not contiguous.
+  if (!src.is_contiguous()) {
+    ceph::buffer::list new_src = src;
+    new_src.rebuild();
+    return compress(new_src, dst, compressor_message);
+  }
+
+#ifdef HAVE_QATZIP
+  if (qat_enabled)
+    return qat_accel.compress(src, dst, compressor_message);
+#endif
+  ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
+    LZ4_compressBound(src.length()));
+  LZ4_stream_t lz4_stream;
+  LZ4_resetStream(&lz4_stream);
+
+  using ceph::encode;
+
+  auto p = src.begin();
+  size_t left = src.length();
+  int pos = 0;
+  const char *data;
+  unsigned num = src.get_num_buffers();
+  encode((uint32_t)num, dst);
+  while (left) {
+    uint32_t origin_len = p.get_ptr_and_advance(left, &data);
+    int compressed_len = LZ4_compress_fast_continue(
+      &lz4_stream, data, outptr.c_str()+pos, origin_len,
+      outptr.length()-pos, 1);
+    if (compressed_len <= 0)
+      return -1;
+    pos += compressed_len;
+    left -= origin_len;
+    encode(origin_len, dst);
+    encode((uint32_t)compressed_len, dst);
+  }
+  ceph_assert(p.end());
+
+  dst.append(outptr, 0, pos);
+  return 0;
+}
+
+int LZ4Compressor::decompress(const ceph::buffer::list &src,
+                              ceph::buffer::list &dst,
+                              std::optional<int32_t> compressor_message)
+{
+#ifdef HAVE_QATZIP
+  if (qat_enabled)
+    return qat_accel.decompress(src, dst, compressor_message);
+#endif
+  auto i = std::cbegin(src);
+  return decompress(i, src.length(), dst, compressor_message);
+}
+
+int LZ4Compressor::decompress(ceph::buffer::list::const_iterator &p,
+                              size_t compressed_len,
+                              ceph::buffer::list &dst,
+                              std::optional<int32_t> compressor_message)
+{
+#ifdef HAVE_QATZIP
+  if (qat_enabled)
+    return qat_accel.decompress(p, compressed_len, dst, compressor_message);
+#endif
+  using ceph::decode;
+  uint32_t count;
+  decode(count, p);
+  std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count);
+  uint32_t total_origin = 0;
+  for (auto& [dst_size, src_size] : compressed_pairs) {
+    decode(dst_size, p);
+    decode(src_size, p);
+    total_origin += dst_size;
+  }
+  compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
+
+  ceph::buffer::ptr dstptr(total_origin);
+  LZ4_streamDecode_t lz4_stream_decode;
+  LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
+
+  ceph::buffer::ptr cur_ptr = p.get_current_ptr();
+  ceph::buffer::ptr *ptr = &cur_ptr;
+  std::optional<ceph::buffer::ptr> data_holder;
+  if (compressed_len != cur_ptr.length()) {
+    data_holder.emplace(compressed_len);
+    p.copy_deep(compressed_len, *data_holder);
+    ptr = &*data_holder;
+  }
+
+  char *c_in = ptr->c_str();
+  char *c_out = dstptr.c_str();
+  for (unsigned i = 0; i < count; ++i) {
+    int r = LZ4_decompress_safe_continue(
+        &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
+    if (r == (int)compressed_pairs[i].first) {
+      c_in += compressed_pairs[i].second;
+      c_out += compressed_pairs[i].first;
+    } else if (r < 0) {
+      return -1;
+    } else {
+      return -2;
+    }
+  }
+  dst.push_back(std::move(dstptr));
+  return 0;
+}
index eca08e1a57ac780b50fe39ddc0965b3b55ecc67d..6939aae7609a19973412030bf082003387739c3c 100644 (file)
 #include "include/encoding.h"
 #include "common/config.h"
 
+class QatAccel;
 
 class LZ4Compressor : public Compressor {
- public:
-  LZ4Compressor(CephContext* cct) : Compressor(COMP_ALG_LZ4, "lz4") {
 #ifdef HAVE_QATZIP
-    if (cct->_conf->qat_compressor_enabled && qat_accel.init("lz4"))
-      qat_enabled = true;
-    else
-      qat_enabled = false;
+  bool qat_enabled;
+  static QatAccel qat_accel;
 #endif
-  }
-
-  int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> &compressor_message) override {
-    // older versions of liblz4 introduce bit errors when compressing
-    // fragmented buffers.  this was fixed in lz4 commit
-    // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
-    // appeared in v1.8.2.
-    //
-    // workaround: rebuild if not contiguous.
-    if (!src.is_contiguous()) {
-      ceph::buffer::list new_src = src;
-      new_src.rebuild();
-      return compress(new_src, dst, compressor_message);
-    }
 
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.compress(src, dst, compressor_message);
-#endif
-    ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
-      LZ4_compressBound(src.length()));
-    LZ4_stream_t lz4_stream;
-    LZ4_resetStream(&lz4_stream);
-
-    using ceph::encode;
-
-    auto p = src.begin();
-    size_t left = src.length();
-    int pos = 0;
-    const char *data;
-    unsigned num = src.get_num_buffers();
-    encode((uint32_t)num, dst);
-    while (left) {
-      uint32_t origin_len = p.get_ptr_and_advance(left, &data);
-      int compressed_len = LZ4_compress_fast_continue(
-        &lz4_stream, data, outptr.c_str()+pos, origin_len,
-        outptr.length()-pos, 1);
-      if (compressed_len <= 0)
-        return -1;
-      pos += compressed_len;
-      left -= origin_len;
-      encode(origin_len, dst);
-      encode((uint32_t)compressed_len, dst);
-    }
-    ceph_assert(p.end());
+ public:
+  explicit LZ4Compressor(CephContext* cct);
 
-    dst.append(outptr, 0, pos);
-    return 0;
-  }
+  int compress(const ceph::buffer::list &src,
+               ceph::buffer::list &dst,
+               std::optional<int32_t> &compressor_message) override;
 
-  int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, std::optional<int32_t> compressor_message) override {
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.decompress(src, dst, compressor_message);
-#endif
-    auto i = std::cbegin(src);
-    return decompress(i, src.length(), dst, compressor_message);
-  }
+  int decompress(const ceph::buffer::list &src,
+                 ceph::buffer::list &dst,
+                 std::optional<int32_t> compressor_message) override;
 
   int decompress(ceph::buffer::list::const_iterator &p,
                 size_t compressed_len,
                 ceph::buffer::list &dst,
-                std::optional<int32_t> compressor_message) override {
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.decompress(p, compressed_len, dst, compressor_message);
-#endif
-    using ceph::decode;
-    uint32_t count;
-    decode(count, p);
-    std::vector<std::pair<uint32_t, uint32_t> > compressed_pairs(count);
-    uint32_t total_origin = 0;
-    for (auto& [dst_size, src_size] : compressed_pairs) {
-      decode(dst_size, p);
-      decode(src_size, p);
-      total_origin += dst_size;
-    }
-    compressed_len -= (sizeof(uint32_t) + sizeof(uint32_t) * count * 2);
-
-    ceph::buffer::ptr dstptr(total_origin);
-    LZ4_streamDecode_t lz4_stream_decode;
-    LZ4_setStreamDecode(&lz4_stream_decode, nullptr, 0);
-
-    ceph::buffer::ptr cur_ptr = p.get_current_ptr();
-    ceph::buffer::ptr *ptr = &cur_ptr;
-    std::optional<ceph::buffer::ptr> data_holder;
-    if (compressed_len != cur_ptr.length()) {
-      data_holder.emplace(compressed_len);
-      p.copy_deep(compressed_len, *data_holder);
-      ptr = &*data_holder;
-    }
-
-    char *c_in = ptr->c_str();
-    char *c_out = dstptr.c_str();
-    for (unsigned i = 0; i < count; ++i) {
-      int r = LZ4_decompress_safe_continue(
-          &lz4_stream_decode, c_in, c_out, compressed_pairs[i].second, compressed_pairs[i].first);
-      if (r == (int)compressed_pairs[i].first) {
-        c_in += compressed_pairs[i].second;
-        c_out += compressed_pairs[i].first;
-      } else if (r < 0) {
-        return -1;
-      } else {
-        return -2;
-      }
-    }
-    dst.push_back(std::move(dstptr));
-    return 0;
-  }
+                std::optional<int32_t> compressor_message) override;
 };
 
 #endif
index 8150f783c157113a8ac97abc76e8cd32e5f79a4e..b635581068aebc37fea9d661fb3576955231527c 100644 (file)
@@ -58,19 +58,9 @@ class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
 class SnappyCompressor : public Compressor {
  public:
   SnappyCompressor(CephContext* cct) : Compressor(COMP_ALG_SNAPPY, "snappy") {
-#ifdef HAVE_QATZIP
-    if (cct->_conf->qat_compressor_enabled && qat_accel.init("snappy"))
-      qat_enabled = true;
-    else
-      qat_enabled = false;
-#endif
   }
 
   int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> &compressor_message) override {
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.compress(src, dst, compressor_message);
-#endif
     BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length());
     ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned(
       snappy::MaxCompressedLength(src.length()));
@@ -81,10 +71,6 @@ class SnappyCompressor : public Compressor {
   }
 
   int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, std::optional<int32_t> compressor_message) override {
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.decompress(src, dst, compressor_message);
-#endif
     auto i = src.begin();
     return decompress(i, src.length(), dst, compressor_message);
   }
@@ -93,10 +79,6 @@ class SnappyCompressor : public Compressor {
                 size_t compressed_len,
                 ceph::bufferlist &dst,
                 std::optional<int32_t> compressor_message) override {
-#ifdef HAVE_QATZIP
-    if (qat_enabled)
-      return qat_accel.decompress(p, compressed_len, dst, compressor_message);
-#endif
     BufferlistSource source_1(p, compressed_len);
     uint32_t res_len = 0;
     if (!snappy::GetUncompressedLength(&source_1, &res_len)) {
index 050ff03fa28f1eed55ef83d87cc5e43b6fd88891..3480ab068c96fbdde988fb613167fd089415f67b 100644 (file)
@@ -91,6 +91,9 @@ endif()
 
 add_library(ceph_zlib SHARED ${zlib_sources})
 target_link_libraries(ceph_zlib ZLIB::ZLIB compressor $<$<PLATFORM_ID:Windows>:ceph-common>)
+if(HAVE_QATZIP AND HAVE_QAT)
+  target_link_libraries(ceph_zlib qat_compressor)
+endif()
 target_include_directories(ceph_zlib SYSTEM PRIVATE "${CMAKE_SOURCE_DIR}/src/isa-l/include")
 set_target_properties(ceph_zlib PROPERTIES
   VERSION 2.0.0
index 9795d79b3ba7f337919084559f4d45139f404857..2a0aa006901e3330a05321b907b975b2492c18c6 100644 (file)
@@ -17,6 +17,9 @@
 #include "ZlibCompressor.h"
 #include "osd/osd_types.h"
 #include "isa-l/include/igzip_lib.h"
+#ifdef HAVE_QATZIP
+  #include "compressor/QatAccel.h"
+#endif
 // -----------------------------------------------------------------------------
 
 #include <zlib.h>
@@ -52,6 +55,21 @@ _prefix(std::ostream* _dout)
 // compression ratio.
 #define ZLIB_MEMORY_LEVEL 8
 
+#ifdef HAVE_QATZIP
+QatAccel ZlibCompressor::qat_accel;
+#endif
+
+ZlibCompressor::ZlibCompressor(CephContext *cct, bool isal)
+  : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct)
+{
+#ifdef HAVE_QATZIP
+  if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
+    qat_enabled = true;
+  else
+    qat_enabled = false;
+#endif
+}
+
 int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, std::optional<int32_t> &compressor_message)
 {
   int ret;
index da1c8117e882e8931a9b14e7b4b92aab1e7565de..33b3ea4d4603b37c7cf1279e9fc53082ff9f5d33 100644 (file)
 #include "common/config.h"
 #include "compressor/Compressor.h"
 
+class QatAccel;
+
 class ZlibCompressor : public Compressor {
   bool isal_enabled;
   CephContext *const cct;
-public:
-  ZlibCompressor(CephContext *cct, bool isal)
-    : Compressor(COMP_ALG_ZLIB, "zlib"), isal_enabled(isal), cct(cct) {
 #ifdef HAVE_QATZIP
-    if (cct->_conf->qat_compressor_enabled && qat_accel.init("zlib"))
-      qat_enabled = true;
-    else
-      qat_enabled = false;
+  bool qat_enabled;
+  static QatAccel qat_accel;
 #endif
-  }
+
+ public:
+  ZlibCompressor(CephContext *cct, bool isal);
 
   int compress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> &compressor_message) override;
   int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, std::optional<int32_t> compressor_message) override;