]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor/zlib:make zlib windowBits configurable for compression 34852/head
authorWangPengfei <wpf_1253@qq.com>
Thu, 30 Apr 2020 08:27:24 +0000 (16:27 +0800)
committerWangPengfei <wpf_1253@qq.com>
Thu, 30 Apr 2020 08:27:24 +0000 (16:27 +0800)
Signed-off-by: WangPengfei <wpf_1253@qq.com>
modified:   src/common/legacy_config_opts.h
modified:   src/common/options.cc
modified:   src/compressor/Compressor.h
modified:   src/compressor/QatAccel.cc
modified:   src/compressor/QatAccel.h
modified:   src/compressor/brotli/BrotliCompressor.cc
modified:   src/compressor/brotli/BrotliCompressor.h
modified:   src/compressor/lz4/LZ4Compressor.h
modified:   src/compressor/snappy/SnappyCompressor.h
modified:   src/compressor/zlib/ZlibCompressor.cc
modified:   src/compressor/zlib/ZlibCompressor.h
modified:   src/compressor/zstd/ZstdCompressor.h
modified:   src/os/bluestore/BlueStore.cc
modified:   src/os/bluestore/bluestore_types.cc
modified:   src/os/bluestore/bluestore_types.h
modified:   src/rgw/rgw_compression.cc
modified:   src/rgw/rgw_compression.h
modified:   src/rgw/rgw_compression_types.h
modified:   src/rgw/rgw_json_enc.cc
modified:   src/rgw/rgw_op.cc
modified:   src/rgw/rgw_rados.cc
modified:   src/test/compressor/compressor_example.h
modified:   src/test/compressor/test_compression.cc
modified:   src/test/rgw/test_rgw_compression.cc

24 files changed:
src/common/legacy_config_opts.h
src/common/options.cc
src/compressor/Compressor.h
src/compressor/QatAccel.cc
src/compressor/QatAccel.h
src/compressor/brotli/BrotliCompressor.cc
src/compressor/brotli/BrotliCompressor.h
src/compressor/lz4/LZ4Compressor.h
src/compressor/snappy/SnappyCompressor.h
src/compressor/zlib/ZlibCompressor.cc
src/compressor/zlib/ZlibCompressor.h
src/compressor/zstd/ZstdCompressor.h
src/os/bluestore/BlueStore.cc
src/os/bluestore/bluestore_types.cc
src/os/bluestore/bluestore_types.h
src/rgw/rgw_compression.cc
src/rgw/rgw_compression.h
src/rgw/rgw_compression_types.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/test/compressor/compressor_example.h
src/test/compressor/test_compression.cc
src/test/rgw/test_rgw_compression.cc

index c60b45eb8bf48fd5726f706ef3ff22d31da9f633..d91638862b9d6fe66bce921e31eda202cda33804 100644 (file)
@@ -82,6 +82,7 @@ SAFE_OPTION(plugin_dir, OPT_STR)
 
 OPTION(compressor_zlib_isal, OPT_BOOL)
 OPTION(compressor_zlib_level, OPT_INT) //regular zlib compression level, not applicable to isa-l optimized version
+OPTION(compressor_zlib_winsize, OPT_INT) //regular zlib compression winsize, not applicable to isa-l optimized version
 OPTION(compressor_zstd_level, OPT_INT) //regular zstd compression level
 
 OPTION(qat_compressor_enabled, OPT_BOOL)
index be1e955ab51ea75d7390d007984c6c682a331caa..93d6641a5add404b75f17eb0dcd2337e61c8915b 100644 (file)
@@ -791,6 +791,11 @@ std::vector<Option> get_global_options() {
     .set_default(5)
     .set_description("Zlib compression level to use"),
 
+    Option("compressor_zlib_winsize", Option::TYPE_INT, Option::LEVEL_ADVANCED)
+    .set_default(-15)
+    .set_min_max(-15,32)
+    .set_description("Zlib compression winsize to use"),
+
     Option("compressor_zstd_level", Option::TYPE_INT, Option::LEVEL_ADVANCED)
     .set_default(1)
     .set_description("Zstd compression level to use"),
index 8b26bacf178a0a6b622c29e352183d7dfd41cfb5..6a4eb2776686a7673c2a705b5b87b4f121d03f2b 100644 (file)
@@ -88,11 +88,11 @@ public:
   CompressionAlgorithm get_type() const {
     return alg;
   }
-  virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0;
-  virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out) = 0;
+  virtual int compress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> &compressor_message) = 0;
+  virtual int decompress(const ceph::bufferlist &in, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
   // this is a bit weird but we need non-const iterator to be in
   // alignment with decode methods
-  virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out) = 0;
+  virtual int decompress(ceph::bufferlist::const_iterator &p, size_t compressed_len, ceph::bufferlist &out, boost::optional<int32_t> compressor_message) = 0;
 
   static CompressorRef create(CephContext *cct, const std::string &type);
   static CompressorRef create(CephContext *cct, int alg);
index 7836243b8a34ba41f3227174a95a4365dd33d7f6..fff9e34e846b101ca41017fd81f8f4b07a51ca2e 100644 (file)
@@ -59,7 +59,7 @@ bool QatAccel::init(const std::string &alg) {
   return true;
 }
 
-int QatAccel::compress(const bufferlist &in, bufferlist &out) {
+int QatAccel::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) {
   for (auto &i : in.buffers()) {
     const unsigned char* c_in = (unsigned char*) i.c_str();
     unsigned int len = i.length();
@@ -75,14 +75,15 @@ int QatAccel::compress(const bufferlist &in, bufferlist &out) {
   return 0;
 }
 
-int QatAccel::decompress(const bufferlist &in, bufferlist &out) {
+int QatAccel::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) {
   auto i = in.begin();
-  return decompress(i, in.length(), out);
+  return decompress(i, in.length(), out, compressor_message);
 }
 
 int QatAccel::decompress(bufferlist::const_iterator &p,
                 size_t compressed_len,
-                bufferlist &dst) {
+                bufferlist &dst,
+                boost::optional<int32_t> compressor_message) {
   unsigned int ratio_idx = 0;
   bool read_more = false;
   bool joint = false;
index 295b180eb217ce86221cbbb49d4e05bc582d4ea6..f15e3303701764760c80e9e191a1479e3ce1a802 100644 (file)
@@ -27,9 +27,9 @@ class QatAccel {
 
   bool init(const std::string &alg);
 
-  int compress(const bufferlist &in, bufferlist &out);
-  int decompress(const bufferlist &in, bufferlist &out);
-  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst);
+  int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message);
+  int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message);
+  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &dst, boost::optional<int32_t> compressor_message);
 };
 
 #endif
index 27685da35bdd1d37cc09e361bef6be6717751c20..ed4abef4bbfaef58b68012faf6eac96f12623e20 100644 (file)
@@ -5,7 +5,7 @@
 
 #define MAX_LEN (CEPH_PAGE_SIZE)
 
-int BrotliCompressor::compress(const bufferlist &in, bufferlist &out) 
+int BrotliCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message
 {
   BrotliEncoderState* s = BrotliEncoderCreateInstance(nullptr,
                                                       nullptr,
@@ -49,7 +49,8 @@ int BrotliCompressor::compress(const bufferlist &in, bufferlist &out)
 
 int BrotliCompressor::decompress(bufferlist::const_iterator &p,
                                  size_t compressed_size,
-                                 bufferlist &out) 
+                                 bufferlist &out,
+                                boost::optional<int32_t> compressor_message) 
 {
   BrotliDecoderState* s = BrotliDecoderCreateInstance(nullptr,
                                                       nullptr,
@@ -88,8 +89,8 @@ int BrotliCompressor::decompress(bufferlist::const_iterator &p,
   return 0;
 }
 
-int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out) 
+int BrotliCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message
 {  
   auto i = std::cbegin(in);
-  return decompress(i, in.length(), out);
+  return decompress(i, in.length(), out, compressor_message);
 }
index 482fe5e20eb8e30fc29e72c5c6697145a39c5580..373300645f897131a53fca7c42f23d44d5fe5c44 100644 (file)
@@ -22,9 +22,9 @@ class BrotliCompressor : public Compressor
   public:
   BrotliCompressor() : Compressor(COMP_ALG_BROTLI, "brotli") {}
   
-  int compress(const bufferlist &in, bufferlist &out) override;
-  int decompress(const bufferlist &in, bufferlist &out) override;
-  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override;
+  int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override;
+  int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override;
+  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override;
 };
 
 #endif //CEPH_BROTLICOMPRESSOR_H
index d9ab78a30a4683b630a6f26ec5a75edb8f2ffeca..b97d91f2b2e61180e023edd6c5e9257b1ad9e900 100644 (file)
@@ -35,7 +35,7 @@ class LZ4Compressor : public Compressor {
 #endif
   }
 
-  int compress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
+  int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
     // older versions of liblz4 introduce bit errors when compressing
     // fragmented buffers.  this was fixed in lz4 commit
     // af127334670a5e7b710bbd6adb71aa7c3ef0cd72, which first
@@ -45,12 +45,12 @@ class LZ4Compressor : public Compressor {
     if (!src.is_contiguous()) {
       ceph::buffer::list new_src = src;
       new_src.rebuild();
-      return compress(new_src, dst);
+      return compress(new_src, dst, compressor_message);
     }
 
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.compress(src, dst);
+      return qat_accel.compress(src, dst, compressor_message);
 #endif
     ceph::buffer::ptr outptr = ceph::buffer::create_small_page_aligned(
       LZ4_compressBound(src.length()));
@@ -83,21 +83,22 @@ class LZ4Compressor : public Compressor {
     return 0;
   }
 
-  int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
+  int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.decompress(src, dst);
+      return qat_accel.decompress(src, dst, compressor_message);
 #endif
     auto i = std::cbegin(src);
-    return decompress(i, src.length(), dst);
+    return decompress(i, src.length(), dst, compressor_message);
   }
 
   int decompress(ceph::buffer::list::const_iterator &p,
                 size_t compressed_len,
-                ceph::buffer::list &dst) override {
+                ceph::buffer::list &dst,
+                boost::optional<int32_t> compressor_message) override {
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.decompress(p, compressed_len, dst);
+      return qat_accel.decompress(p, compressed_len, dst, compressor_message);
 #endif
     using ceph::decode;
     uint32_t count;
index 9da7feaa4bc816386f9374da25b0cf5e86cf39a8..25393f8dc1ff473c76de8da711bc99eae3dd2b6f 100644 (file)
@@ -66,10 +66,10 @@ class SnappyCompressor : public Compressor {
 #endif
   }
 
-  int compress(const ceph::bufferlist &src, ceph::bufferlist &dst) override {
+  int compress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> &compressor_message) override {
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.compress(src, dst);
+      return qat_accel.compress(src, dst, compressor_message);
 #endif
     BufferlistSource source(const_cast<ceph::bufferlist&>(src).begin(), src.length());
     ceph::bufferptr ptr = ceph::buffer::create_small_page_aligned(
@@ -80,21 +80,22 @@ class SnappyCompressor : public Compressor {
     return 0;
   }
 
-  int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst) override {
+  int decompress(const ceph::bufferlist &src, ceph::bufferlist &dst, boost::optional<int32_t> compressor_message) override {
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.decompress(src, dst);
+      return qat_accel.decompress(src, dst, compressor_message);
 #endif
     auto i = src.begin();
-    return decompress(i, src.length(), dst);
+    return decompress(i, src.length(), dst, compressor_message);
   }
 
   int decompress(ceph::bufferlist::const_iterator &p,
                 size_t compressed_len,
-                ceph::bufferlist &dst) override {
+                ceph::bufferlist &dst,
+                boost::optional<int32_t> compressor_message) override {
 #ifdef HAVE_QATZIP
     if (qat_enabled)
-      return qat_accel.decompress(p, compressed_len, dst);
+      return qat_accel.decompress(p, compressed_len, dst, compressor_message);
 #endif
     snappy::uint32 res_len = 0;
     BufferlistSource source_1(p, compressed_len);
index c9fe6f6519b7bbb709fea7ff5393c2513feb47b8..5edaba67242107c500c6edc21b3732ac1f8d1b8d 100644 (file)
@@ -52,7 +52,7 @@ _prefix(std::ostream* _dout)
 // compression ratio.
 #define ZLIB_MEMORY_LEVEL 8
 
-int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
+int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
 {
   int ret;
   unsigned have;
@@ -64,12 +64,13 @@ int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
   strm.zalloc = Z_NULL;
   strm.zfree = Z_NULL;
   strm.opaque = Z_NULL;
-  ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, ZLIB_DEFAULT_WIN_SIZE, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
+  ret = deflateInit2(&strm, cct->_conf->compressor_zlib_level, Z_DEFLATED, cct->_conf->compressor_zlib_winsize, ZLIB_MEMORY_LEVEL, Z_DEFAULT_STRATEGY);
   if (ret != Z_OK) {
     dout(1) << "Compression init error: init return "
          << ret << " instead of Z_OK" << dendl;
     return -1;
   }
+  compressor_message = cct->_conf->compressor_zlib_winsize;
 
   for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
       i != in.buffers().end();) {
@@ -113,7 +114,7 @@ int ZlibCompressor::zlib_compress(const bufferlist &in, bufferlist &out)
 }
 
 #if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
-int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
+int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
 {
   int ret;
   unsigned have;
@@ -124,6 +125,7 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
   /* allocate deflate state */
   isal_deflate_init(&strm);
   strm.end_of_stream = 0;
+  compressor_message = ZLIB_DEFAULT_WIN_SIZE;
 
   for (ceph::bufferlist::buffers_t::const_iterator i = in.buffers().begin();
       i != in.buffers().end();) {
@@ -166,27 +168,27 @@ int ZlibCompressor::isal_compress(const bufferlist &in, bufferlist &out)
 }
 #endif
 
-int ZlibCompressor::compress(const bufferlist &in, bufferlist &out)
+int ZlibCompressor::compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message)
 {
 #ifdef HAVE_QATZIP
   if (qat_enabled)
-    return qat_accel.compress(in, out);
+    return qat_accel.compress(in, out, compressor_message);
 #endif
 #if __x86_64__ && defined(HAVE_BETTER_YASM_ELF64)
   if (isal_enabled)
-    return isal_compress(in, out);
+    return isal_compress(in, out, compressor_message);
   else
-    return zlib_compress(in, out);
+    return zlib_compress(in, out, compressor_message);
 #else
-  return zlib_compress(in, out);
+  return zlib_compress(in, out, compressor_message);
 #endif
 }
 
-int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out)
+int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_size, bufferlist &out, boost::optional<int32_t> compressor_message)
 {
 #ifdef HAVE_QATZIP
   if (qat_enabled)
-    return qat_accel.decompress(p, compressed_size, out);
+    return qat_accel.decompress(p, compressed_size, out, compressor_message);
 #endif
 
   int ret;
@@ -203,7 +205,9 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
   strm.next_in = Z_NULL;
 
   // choose the variation of compressor
-  ret = inflateInit2(&strm, ZLIB_DEFAULT_WIN_SIZE);
+  if (!compressor_message)
+    compressor_message = ZLIB_DEFAULT_WIN_SIZE;
+  ret = inflateInit2(&strm, *compressor_message);
   if (ret != Z_OK) {
     dout(1) << "Decompression init error: init return "
          << ret << " instead of Z_OK" << dendl;
@@ -240,12 +244,12 @@ int ZlibCompressor::decompress(bufferlist::const_iterator &p, size_t compressed_
   return 0;
 }
 
-int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out)
+int ZlibCompressor::decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message)
 {
 #ifdef HAVE_QATZIP
   if (qat_enabled)
-    return qat_accel.decompress(in, out);
+    return qat_accel.decompress(in, out, compressor_message);
 #endif
   auto i = std::cbegin(in);
-  return decompress(i, in.length(), out);
+  return decompress(i, in.length(), out, compressor_message);
 }
index b26d0c2fa983bd4b7aa290d59babd282a7ad389b..b8e984110ad41102e2eccf3a4808ae97ce6c8f93 100644 (file)
@@ -34,12 +34,12 @@ public:
 #endif
   }
 
-  int compress(const ceph::buffer::list &in, ceph::buffer::list &out) override;
-  int decompress(const ceph::buffer::list &in, ceph::buffer::list &out) override;
-  int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out) override;
+  int compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message) override;
+  int decompress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
+  int decompress(ceph::buffer::list::const_iterator &p, size_t compressed_len, ceph::buffer::list &out, boost::optional<int32_t> compressor_message) override;
 private:
-  int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out);
-  int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out);
+  int zlib_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
+  int isal_compress(const ceph::buffer::list &in, ceph::buffer::list &out, boost::optional<int32_t> &compressor_message);
  };
 
 
index b59f3314fb47ac9b4125a0485132f560f5c36e62..95b492deb87b4df84c4df95242da22406cec8cc3 100644 (file)
@@ -26,7 +26,7 @@ class ZstdCompressor : public Compressor {
  public:
   ZstdCompressor(CephContext *cct) : Compressor(COMP_ALG_ZSTD, "zstd"), cct(cct) {}
 
-  int compress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
+  int compress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> &compressor_message) override {
     ZSTD_CStream *s = ZSTD_createCStream();
     ZSTD_initCStream_srcSize(s, cct->_conf->compressor_zstd_level, src.length());
     auto p = src.begin();
@@ -61,14 +61,15 @@ class ZstdCompressor : public Compressor {
     return 0;
   }
 
-  int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst) override {
+  int decompress(const ceph::buffer::list &src, ceph::buffer::list &dst, boost::optional<int32_t> compressor_message) override {
     auto i = std::cbegin(src);
-    return decompress(i, src.length(), dst);
+    return decompress(i, src.length(), dst, compressor_message);
   }
 
   int decompress(ceph::buffer::list::const_iterator &p,
                 size_t compressed_len,
-                ceph::buffer::list &dst) override {
+                ceph::buffer::list &dst,
+                boost::optional<int32_t> compressor_message) override {
     if (compressed_len < 4) {
       return -1;
     }
index be14eccb9fe0b87d745b2287c1845aaa42b2a760..43e55e0dd824c5ea707b7ebb754e046c59282290 100644 (file)
@@ -9990,7 +9990,7 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result)
     _set_compression_alert(false, alg_name);
     r = -EIO;
   } else {
-    r = cp->decompress(i, chdr.length, *result);
+    r = cp->decompress(i, chdr.length, *result, chdr.compressor_message);
     if (r < 0) {
       derr << __func__ << " decompression failed with exit code " << r << dendl;
       r = -EIO;
@@ -13672,7 +13672,8 @@ int BlueStore::_do_alloc_write(
 
       // FIXME: memory alignment here is bad
       bufferlist t;
-      int r = c->compress(wi.bl, t);
+      boost::optional<int32_t> compressor_message;
+      int r = c->compress(wi.bl, t, compressor_message);
       uint64_t want_len_raw = wi.blob_length * crr;
       uint64_t want_len = p2roundup(want_len_raw, min_alloc_size);
       bool rejected = false;
@@ -13684,6 +13685,7 @@ int BlueStore::_do_alloc_write(
        bluestore_compression_header_t chdr;
        chdr.type = c->get_type();
        chdr.length = t.length();
+       chdr.compressor_message = compressor_message;
        encode(chdr, wi.compressed_bl);
        wi.compressed_bl.claim_append(t);
 
index 4cfbf2dbc82ca8935a829bfc5108d6451b78ec72..2eaf003f2ca817b06ce35e0e55321a643feffcb6 100644 (file)
@@ -1163,6 +1163,9 @@ void bluestore_compression_header_t::dump(Formatter *f) const
 {
   f->dump_unsigned("type", type);
   f->dump_unsigned("length", length);
+  if (compressor_message) {
+    f->dump_int("compressor_message", *compressor_message);
+  }
 }
 
 void bluestore_compression_header_t::generate_test_instances(
index 3964e19b0a5117ef0aca02e7cc779870eeb44807..a9c3bf3ae3a89e979e12f84c75af756036f0d88e 100644 (file)
@@ -1074,15 +1074,19 @@ WRITE_CLASS_DENC(bluestore_deferred_transaction_t)
 struct bluestore_compression_header_t {
   uint8_t type = Compressor::COMP_ALG_NONE;
   uint32_t length = 0;
+  boost::optional<int32_t> compressor_message;
 
   bluestore_compression_header_t() {}
   bluestore_compression_header_t(uint8_t _type)
     : type(_type) {}
 
   DENC(bluestore_compression_header_t, v, p) {
-    DENC_START(1, 1, p);
+    DENC_START(2, 1, p);
     denc(v.type, p);
     denc(v.length, p);
+    if (struct_v >= 2) {
+      denc(v.compressor_message, p);
+    }
     DENC_FINISH(p);
   }
   void dump(ceph::Formatter *f) const;
index 7db8108d69582eff21dc3a2931f0f04db7516f0f..c60cfb026f0b3b00195be9c1cc31aa5e9d8fb5f6 100644 (file)
@@ -40,7 +40,7 @@ int RGWPutObj_Compress::process(bufferlist&& in, uint64_t logical_offset)
     if ((logical_offset > 0 && compressed) || // if previous part was compressed
         (logical_offset == 0)) {              // or it's the first part
       ldout(cct, 10) << "Compression for rgw is enabled, compress part " << in.length() << dendl;
-      int cr = compressor->compress(in, out);
+      int cr = compressor->compress(in, out, compressor_message);
       if (cr < 0) {
         if (logical_offset > 0) {
           lderr(cct) << "Compression failed with exit code " << cr
@@ -128,7 +128,7 @@ int RGWGetObj_Decompress::handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len
       iter_in_bl.seek(ofs_in_bl);
     }
     iter_in_bl.copy(first_block->len, tmp);
-    int cr = compressor->decompress(tmp, out_bl);
+    int cr = compressor->decompress(tmp, out_bl, cs_info->compressor_message);
     if (cr < 0) {
       lderr(cct) << "Decompression failed with exit code " << cr << dendl;
       return cr;
index 4d7f8638412a01bab3656178b8e609c22c5f33d4..6a2cc6e5685cbed1803b76308f32262bcdfa52cd 100644 (file)
@@ -40,6 +40,7 @@ class RGWPutObj_Compress : public rgw::putobj::Pipe
   CephContext* cct;
   bool compressed{false};
   CompressorRef compressor;
+  boost::optional<int32_t> compressor_message;
   std::vector<compression_block> blocks;
 public:
   RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
@@ -50,6 +51,7 @@ public:
 
   bool is_compressed() { return compressed; }
   vector<compression_block>& get_compression_blocks() { return blocks; }
+  boost::optional<int32_t> get_compressor_message() { return compressor_message; }
 
 }; /* RGWPutObj_Compress */
 
index 780e5eac109a98af83a1bf5284abf3c63d278a3e..e5d98a3d53e6bdac36d6ebcbe1e05571088c2c87 100644 (file)
@@ -44,25 +44,31 @@ WRITE_CLASS_ENCODER(compression_block)
 struct RGWCompressionInfo {
   string compression_type;
   uint64_t orig_size;
+  boost::optional<int32_t> compressor_message;
   vector<compression_block> blocks;
 
   RGWCompressionInfo() : compression_type("none"), orig_size(0) {}
   RGWCompressionInfo(const RGWCompressionInfo& cs_info) : compression_type(cs_info.compression_type),
                                                           orig_size(cs_info.orig_size),
+                                                         compressor_message(cs_info.compressor_message),
                                                           blocks(cs_info.blocks) {}
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(compression_type, bl);
     encode(orig_size, bl);
+    encode(compressor_message, bl);
     encode(blocks, bl);
     ENCODE_FINISH(bl);
   }
 
   void decode(bufferlist::const_iterator& bl) {
-     DECODE_START(1, bl);
+     DECODE_START(2, bl);
      decode(compression_type, bl);
      decode(orig_size, bl);
+     if (struct_v >= 2) {
+       decode(compressor_message, bl);
+     }
      decode(blocks, bl);
      DECODE_FINISH(bl);
   } 
index e5f5f63952a3d445f4103eab2d7b3b2021a9a774..d95f7e8fd37bf95ce74efcbe0c5bbfbfaf76a3b5 100644 (file)
@@ -2051,6 +2051,9 @@ void RGWCompressionInfo::dump(Formatter *f) const
 {
   f->dump_string("compression_type", compression_type);
   f->dump_unsigned("orig_size", orig_size);
+  if (compressor_message) {
+    f->dump_int("compressor_message", *compressor_message);
+  }
   ::encode_json("blocks", blocks, f);
 }
 
index fadba7c73dbe3a637a9c4740a664fd9d65c89849..89638e4f0027ace4f5cf4f0e5ad1d6c30057ef07 100644 (file)
@@ -4089,6 +4089,7 @@ void RGWPutObj::execute()
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = s->obj_size;
+    cs_info.compressor_message = compressor->get_compressor_message();
     cs_info.blocks = move(compressor->get_compression_blocks());
     encode(cs_info, tmp);
     attrs[RGW_ATTR_COMPRESSION] = tmp;
@@ -4394,6 +4395,7 @@ void RGWPostObj::execute()
       RGWCompressionInfo cs_info;
       cs_info.compression_type = plugin->get_type_name();
       cs_info.orig_size = s->obj_size;
+      cs_info.compressor_message = compressor->get_compressor_message();
       cs_info.blocks = move(compressor->get_compression_blocks());
       encode(cs_info, tmp);
       emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
@@ -7294,6 +7296,7 @@ int RGWBulkUploadOp::handle_file(const boost::string_ref path,
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = s->obj_size;
+    cs_info.compressor_message = compressor->get_compressor_message();
     cs_info.blocks = std::move(compressor->get_compression_blocks());
     encode(cs_info, tmp);
     attrs.emplace(RGW_ATTR_COMPRESSION, std::move(tmp));
index a6dbb57d97120e98a830d232f28941bdc5fba944..b74a38db17ed549b530c436df6bb784814815745 100644 (file)
@@ -3848,6 +3848,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = cb.get_data_len();
+    cs_info.compressor_message = compressor->get_compressor_message();
     cs_info.blocks = move(compressor->get_compression_blocks());
     encode(cs_info, tmp);
     cb.get_attrs()[RGW_ATTR_COMPRESSION] = tmp;
index a2dbd8f660ad0d2b8367a78835d2ed05ab3d4cd2..3afd59a7d252270cc749c542906579ad866c65f8 100644 (file)
@@ -32,18 +32,18 @@ public:
   CompressorExample() : Compressor(COMP_ALG_NONE, "example") {}
   ~CompressorExample() override {}
 
-  int compress(const bufferlist &in, bufferlist &out) override
+  int compress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> &compressor_message) override
   {
     out = in;
     return 0;
   }
 
-  int decompress(const bufferlist &in, bufferlist &out) override
+  int decompress(const bufferlist &in, bufferlist &out, boost::optional<int32_t> compressor_message) override
   {
     out = in;
     return 0;
   }
-  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out) override
+  int decompress(bufferlist::const_iterator &p, size_t compressed_len, bufferlist &out, boost::optional<int32_t> compressor_message) override
   {
     p.copy(std::min<size_t>(p.get_remaining(), compressed_len), out);
     return 0;
index 90b2981e2dfc407d7747d93b82d414e125bdacd1..a8a66398cbcd4a43abf075feae47faad917c36b0 100644 (file)
@@ -76,10 +76,11 @@ TEST_P(CompressorTest, small_round_trip)
   bufferlist orig;
   orig.append("This is a short string.  There are many strings like it but this one is mine.");
   bufferlist compressed;
-  int r = compressor->compress(orig, compressed);
+  boost::optional<int32_t> compressor_message;
+  int r = compressor->compress(orig, compressed, compressor_message);
   ASSERT_EQ(0, r);
   bufferlist decompressed;
-  r = compressor->decompress(compressed, decompressed);
+  r = compressor->decompress(compressed, decompressed, compressor_message);
   ASSERT_EQ(0, r);
   ASSERT_EQ(decompressed.length(), orig.length());
   ASSERT_TRUE(decompressed.contents_equal(orig));
@@ -95,10 +96,11 @@ TEST_P(CompressorTest, big_round_trip_repeated)
     orig.append("This is a short string.  There are many strings like it but this one is mine.");
   }
   bufferlist compressed;
-  int r = compressor->compress(orig, compressed);
+  boost::optional<int32_t> compressor_message;
+  int r = compressor->compress(orig, compressed, compressor_message);
   ASSERT_EQ(0, r);
   bufferlist decompressed;
-  r = compressor->decompress(compressed, decompressed);
+  r = compressor->decompress(compressed, decompressed, compressor_message);
   ASSERT_EQ(0, r);
   ASSERT_EQ(decompressed.length(), orig.length());
   ASSERT_TRUE(decompressed.contents_equal(orig));
@@ -124,10 +126,11 @@ TEST_P(CompressorTest, big_round_trip_randomish)
     orig.append(bp);
   }
   bufferlist compressed;
-  int r = compressor->compress(orig, compressed);
+  boost::optional<int32_t> compressor_message;
+  int r = compressor->compress(orig, compressed, compressor_message);
   ASSERT_EQ(0, r);
   bufferlist decompressed;
-  r = compressor->decompress(compressed, decompressed);
+  r = compressor->decompress(compressed, decompressed, compressor_message);
   ASSERT_EQ(0, r);
   ASSERT_EQ(decompressed.length(), orig.length());
   ASSERT_TRUE(decompressed.contents_equal(orig));
@@ -178,10 +181,11 @@ TEST_P(CompressorTest, round_trip_osdmap)
     chunk.substr_of(fbl, j*size, l);
     //fbl.rebuild();
     bufferlist compressed;
-    int r = compressor->compress(chunk, compressed);
+    boost::optional<int32_t> compressor_message;
+    int r = compressor->compress(chunk, compressed, compressor_message);
     ASSERT_EQ(0, r);
     bufferlist decompressed;
-    r = compressor->decompress(compressed, decompressed);
+    r = compressor->decompress(compressed, decompressed, compressor_message);
     ASSERT_EQ(0, r);
     ASSERT_EQ(decompressed.length(), chunk.length());
     if (!decompressed.contents_equal(chunk)) {
@@ -205,9 +209,10 @@ TEST_P(CompressorTest, compress_decompress)
   bufferlist after;
   bufferlist exp;
   in.append(test, len);
-  res = compressor->compress(in, out);
+  boost::optional<int32_t> compressor_message;
+  res = compressor->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
-  res = compressor->decompress(out, after);
+  res = compressor->decompress(out, after, compressor_message);
   EXPECT_EQ(res, 0);
   exp.append(test);
   EXPECT_TRUE(exp.contents_equal(after));
@@ -215,7 +220,7 @@ TEST_P(CompressorTest, compress_decompress)
   size_t compressed_len = out.length();
   out.append_zero(12);
   auto it = out.cbegin();
-  res = compressor->decompress(it, compressed_len, after);
+  res = compressor->decompress(it, compressed_len, after, compressor_message);
   EXPECT_EQ(res, 0);
   EXPECT_TRUE(exp.contents_equal(after));
 
@@ -228,7 +233,7 @@ TEST_P(CompressorTest, compress_decompress)
   out.clear();
   in.append(data);
   exp = in;
-  res = compressor->compress(in, out);
+  res = compressor->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
   compressed_len = out.length();
   out.append_zero(0x10000 - out.length());
@@ -241,7 +246,7 @@ TEST_P(CompressorTest, compress_decompress)
   out.swap(prefix);
   it = out.cbegin();
   it += prefix_len;
-  res = compressor->decompress(it, compressed_len, after);
+  res = compressor->decompress(it, compressed_len, after, compressor_message);
   EXPECT_EQ(res, 0);
   EXPECT_TRUE(exp.contents_equal(after));
 }
@@ -254,7 +259,8 @@ TEST_P(CompressorTest, sharded_input_decompress)
   int len = test.size();
   bufferlist in, out;
   in.append(test.c_str(), len);
-  int res = compressor->compress(in, out);
+  boost::optional<int32_t> compressor_message;
+  int res = compressor->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
   EXPECT_GT(out.length(), small_prefix_size);
 
@@ -272,7 +278,7 @@ TEST_P(CompressorTest, sharded_input_decompress)
   }
 
   bufferlist after;
-  res = compressor->decompress(out2, after);
+  res = compressor->decompress(out2, after, compressor_message);
   EXPECT_EQ(res, 0);
 }
 
@@ -286,7 +292,8 @@ void test_compress(CompressorRef compressor, size_t size)
   in.append(data, size);
   for (size_t t = 0; t < 10000; t++) {
     bufferlist out;
-    int res = compressor->compress(in, out);
+    boost::optional<int32_t> compressor_message;
+    int res = compressor->compress(in, out, compressor_message);
     EXPECT_EQ(res, 0);
   }
   free(data);
@@ -300,11 +307,12 @@ void test_decompress(CompressorRef compressor, size_t size)
   }
   bufferlist in, out;
   in.append(data, size);
-  int res = compressor->compress(in, out);
+  boost::optional<int32_t> compressor_message;
+  int res = compressor->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
   for (size_t t = 0; t < 10000; t++) {
     bufferlist out_dec;
-    int res = compressor->decompress(out, out_dec);
+    int res = compressor->decompress(out, out_dec, compressor_message);
     EXPECT_EQ(res, 0);
   }
   free(data);
@@ -401,10 +409,11 @@ TEST(ZlibCompressor, zlib_isal_compatibility)
   bufferlist in, out;
   in.append(test, len);
   // isal -> zlib
-  int res = isal->compress(in, out);
+  boost::optional<int32_t> compressor_message;
+  int res = isal->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
   bufferlist after;
-  res = zlib->decompress(out, after);
+  res = zlib->decompress(out, after, compressor_message);
   EXPECT_EQ(res, 0);
   bufferlist exp;
   exp.append(static_cast<char*>(test));
@@ -413,9 +422,9 @@ TEST(ZlibCompressor, zlib_isal_compatibility)
   out.clear();
   exp.clear();
   // zlib -> isal
-  res = zlib->compress(in, out);
+  res = zlib->compress(in, out, compressor_message);
   EXPECT_EQ(res, 0);
-  res = isal->decompress(out, after);
+  res = isal->decompress(out, after, compressor_message);
   EXPECT_EQ(res, 0);
   exp.append(static_cast<char*>(test));
   EXPECT_TRUE(exp.contents_equal(after));
@@ -469,10 +478,11 @@ TEST(ZlibCompressor, isal_compress_zlib_decompress_random)
     bufferlist in, out;
     in.append(test, size);
 
-    int res = isal->compress(in, out);
+    boost::optional<int32_t> compressor_message;
+    int res = isal->compress(in, out, compressor_message);
     EXPECT_EQ(res, 0);
     bufferlist after;
-    res = zlib->decompress(out, after);
+    res = zlib->decompress(out, after, compressor_message);
     EXPECT_EQ(res, 0);
     bufferlist exp;
     exp.append(test, size);
@@ -508,10 +518,11 @@ TEST(ZlibCompressor, isal_compress_zlib_decompress_walk)
     bufferlist in, out;
     in.append(test, size);
 
-    int res = isal->compress(in, out);
+    boost::optional<int32_t> compressor_message;
+    int res = isal->compress(in, out, compressor_message);
     EXPECT_EQ(res, 0);
     bufferlist after;
-    res = zlib->decompress(out, after);
+    res = zlib->decompress(out, after, compressor_message);
     EXPECT_EQ(res, 0);
     bufferlist exp;
     exp.append(test, size);
@@ -546,10 +557,11 @@ TEST(QAT, enc_qat_dec_noqat) {
       bufferlist in, out;
       in.append(test, size);
   
-      int res = q->compress(in, out);
+      boost::optional<int32_t> compressor_message;
+      int res = q->compress(in, out, compressor_message);
       EXPECT_EQ(res, 0);
       bufferlist after;
-      res = noq->decompress(out, after);
+      res = noq->decompress(out, after, compressor_message);
       EXPECT_EQ(res, 0);
       bufferlist exp;
       exp.append(test, size);
@@ -582,10 +594,11 @@ TEST(QAT, enc_noqat_dec_qat) {
       bufferlist in, out;
       in.append(test, size);
   
-      int res = noq->compress(in, out);
+      boost::optional<int32_t> compressor_message;
+      int res = noq->compress(in, out, compressor_message);
       EXPECT_EQ(res, 0);
       bufferlist after;
-      res = q->decompress(out, after);
+      res = q->decompress(out, after, compressor_message);
       EXPECT_EQ(res, 0);
       bufferlist exp;
       exp.append(test, size);
index 936b8f7c979395979a430e795870ecea456fc805..b1e3403281ad41fb8f3db927bd1129ce9574d2c1 100644 (file)
@@ -129,6 +129,7 @@ TEST(Compress, LimitedChunkSize)
     RGWCompressionInfo cs_info;
     cs_info.compression_type = plugin->get_type_name();
     cs_info.orig_size = s;
+    cs_info.compressor_message = compressor.get_compressor_message();
     cs_info.blocks = move(compressor.get_compression_blocks());
 
     ut_get_sink_size d_sink;
@@ -167,6 +168,7 @@ TEST(Compress, BillionZeros)
   RGWCompressionInfo cs_info;
   cs_info.compression_type = plugin->get_type_name();
   cs_info.orig_size = size*1000;
+  cs_info.compressor_message = compressor.get_compressor_message();
   cs_info.blocks = move(compressor.get_compression_blocks());
 
   ut_get_sink d_sink;