]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
compressor: Extends decompressor interface to be able to provide compressed data...
authorIgor Fedotov <ifedotov@mirantis.com>
Tue, 24 May 2016 17:05:14 +0000 (20:05 +0300)
committerSage Weil <sage@redhat.com>
Wed, 1 Jun 2016 15:40:48 +0000 (11:40 -0400)
Signed-off-by: Igor Fedotov <ifedotov@mirantis.com>
src/compressor/Compressor.h
src/compressor/snappy/SnappyCompressor.h
src/compressor/zlib/CompressionZlib.cc
src/compressor/zlib/CompressionZlib.h
src/os/bluestore/BlueStore.cc
src/test/compressor/compressor_example.h
src/test/compressor/test_compression_snappy.cc
src/test/compressor/test_compression_zlib.cc

index f75c651c40e8144dd4fd104f57908d70706b0d98..e1aa3c2d1e6e47520f6e549b4613f83a01ddc806 100644 (file)
@@ -33,7 +33,7 @@ class Compressor {
   virtual int decompress(const bufferlist &in, bufferlist &out) = 0;
   // this is a bit weird but we need non-const iterator to be in
   // alignment with decode methods
-  virtual int decompress(bufferlist::iterator &p, bufferlist &out) = 0;
+  virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) = 0;
 
   static CompressorRef create(CephContext *cct, const string &type);
 };
index 2fc748ae7fbbab5066d50d1a935471422f21a50b..677d7a329024a1250fee46f91c81a17c50cc0ca9 100644 (file)
 class CEPH_BUFFER_API BufferlistSource : public snappy::Source {
 
   bufferlist::iterator pb;
+  size_t remaining;
 
  public:
-  explicit BufferlistSource(bufferlist::iterator _pb): pb(_pb) {}
+  explicit BufferlistSource(bufferlist::iterator _pb, size_t _input_len): pb(_pb), remaining(_input_len) {
+    remaining = MIN(remaining, pb.get_remaining());
+  }
   virtual ~BufferlistSource() {}
-  virtual size_t Available() const { return pb.get_remaining(); }
+  virtual size_t Available() const { return remaining; }
   virtual const char* Peek(size_t* len) {
     const char* data = NULL;
     *len = 0;
-    if(pb.get_remaining()) {
+    size_t avail = Available();
+    if(avail) {
       auto ptmp = pb;
-      *len = ptmp.get_ptr_and_advance(pb.get_remaining(), &data);
+      *len = ptmp.get_ptr_and_advance(avail, &data);
     }
     return data;
   }
   virtual void Skip(size_t n) {
+    assert(n <= remaining);
     pb.advance(n);
+    remaining -= n;
   }
   bufferlist::iterator get_pos() const { return pb; }
 };
@@ -47,7 +53,7 @@ class SnappyCompressor : public Compressor {
  public:
   SnappyCompressor() : Compressor("snappy") {}
   int compress(const bufferlist &src, bufferlist &dst) override {
-    BufferlistSource source(const_cast<bufferlist&>(src).begin());
+    BufferlistSource source(const_cast<bufferlist&>(src).begin(), src.length());
     bufferptr ptr(snappy::MaxCompressedLength(src.length()));
     snappy::UncheckedByteArraySink sink(ptr.c_str());
     snappy::Compress(&source, &sink);
@@ -56,9 +62,9 @@ class SnappyCompressor : public Compressor {
   }
   int decompress(const bufferlist &src, bufferlist &dst) override {
     bufferlist::iterator i = const_cast<bufferlist&>(src).begin();
-    return decompress(i, dst);
+    return decompress(i, src.length(), dst);
   }
-  int decompress(bufferlist::iterator &p, bufferlist &dst) override {
+  int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &dst) override {
     size_t res_len = 0;
     // Trick, decompress only need first 32bits buffer
     bufferlist::const_iterator ptmp = p;
@@ -66,7 +72,7 @@ class SnappyCompressor : public Compressor {
     ptmp.copy(4, tmp);
     if (!snappy::GetUncompressedLength(tmp.c_str(), tmp.length(), &res_len))
       return -1;
-    BufferlistSource source(p);
+    BufferlistSource source(p, compressed_len);
     bufferptr ptr(res_len);
     if (snappy::RawUncompress(&source, ptr.c_str())) {
       p = source.get_pos();
index d8a28fa064b033aba74408a10785c7a02c8fc205..27aba8cf464d835a287f317abd8d304d4fe1e51e 100644 (file)
@@ -94,7 +94,7 @@ int CompressionZlib::compress(const bufferlist &in, bufferlist &out)
   return 0;
 }
 
-int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
+int CompressionZlib::decompress(bufferlist::iterator &p, size_t compressed_size, bufferlist &out)
 {
   int ret;
   unsigned have;
@@ -115,11 +115,11 @@ int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
   }
 
   unsigned char c_out[max_len];
+  size_t remaining = MIN( p.get_remaining(), compressed_size);
+  while(remaining) {
 
-  while(!p.end()) {
-
-    long unsigned int len = p.get_ptr_and_advance(p.get_remaining(), &c_in);
-
+    long unsigned int len = p.get_ptr_and_advance(remaining, &c_in);
+    remaining -= len;
     strm.avail_in = len;
     strm.next_in = (unsigned char*)c_in;
 
@@ -147,5 +147,5 @@ int CompressionZlib::decompress(bufferlist::iterator &p, bufferlist &out)
 int CompressionZlib::decompress(const bufferlist &in, bufferlist &out)
 {
   bufferlist::iterator i = const_cast<bufferlist&>(in).begin();
-  return decompress(i, out);
+  return decompress(i, in.length(), out);
 }
index 21acb65ac1efd98d733236000bc2aa7f1524b1c6..133f91232fb1383b92f2eb20e9806daffe785d88 100644 (file)
@@ -26,7 +26,7 @@ public:
   CompressionZlib() : Compressor("zlib") {}
   int compress(const bufferlist &in, bufferlist &out) override;
   int decompress(const bufferlist &in, bufferlist &out) override;
-  int decompress(bufferlist::iterator &p, bufferlist &out) override;
+  int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out) override;
  };
 
 
index 04b499736c85a17472c330ab4d145fe7a77de040..2526ae3d5869120191fcf51259ea128ceb5b9b30 100644 (file)
@@ -3186,9 +3186,7 @@ int BlueStore::_decompress(bufferlist& source, bufferlist* result)
     derr << __func__ << " can't load decompressor " << chdr.type << dendl;
     r = -EIO;
   } else {
-    bufferlist t;
-    i.copy(chdr.length, t);
-    r = compressor->decompress(t, *result);
+    r = compressor->decompress(i, chdr.length, *result);
     if (r < 0) {
       derr << __func__ << " decompression failed with exit code " << r << dendl;
       r = -EIO;
index f02aafb18263b828c70ea506e939ec146abd1be5..40ab660878c42ab32a0da533619fb88588fd6586 100644 (file)
@@ -43,9 +43,9 @@ public:
     out = in;
     return 0;
   }
-  virtual int decompress(bufferlist::iterator &p, bufferlist &out)
+  virtual int decompress(bufferlist::iterator &p, size_t compressed_len, bufferlist &out)
   {
-    p.copy(p.get_remaining(), out);
+    p.copy(MIN(p.get_remaining(), compressed_len), out);
     return 0;
   }
 };
index e8bfe1bba76e24ea106d08ae3ba3d7d1ed09962a..296de4103b2c675344d6536de9a55300b470d499 100644 (file)
@@ -36,6 +36,13 @@ TEST(SnappyCompressor, compress_decompress)
   bufferlist after;
   res = sp.decompress(out, after);
   EXPECT_EQ(res, 0);
+
+  after.clear();
+  size_t compressed_len = out.length();
+  out.append_zero(12);
+  auto it = out.begin();
+  res = sp.decompress(it, compressed_len, after);
+  EXPECT_EQ(res, 0);
 }
 
 TEST(SnappyCompressor, sharded_input_decompress)
index e3706018ce3048bb1e494d1dd9a02f8f5bcc78c1..642264b3380e55eba71c860e329dfcd06d4c4410 100644 (file)
@@ -27,7 +27,7 @@
 TEST(CompressionZlib, compress_decompress)
 {
   CompressionZlib sp;
-  EXPECT_STREQ(sp.get_type(), "zlib");
+  EXPECT_STREQ(sp.get_type().c_str(), "zlib");
   const char* test = "This is test text";
   int len = strlen(test);
   bufferlist in, out;
@@ -40,12 +40,19 @@ TEST(CompressionZlib, compress_decompress)
   bufferlist exp;
   exp.append(test);
   EXPECT_TRUE(exp.contents_equal(after));
+  after.clear();
+  size_t compressed_len = out.length();
+  out.append_zero(12);
+  auto it = out.begin();
+  res = sp.decompress(it, compressed_len, after);
+  EXPECT_EQ(res, 0);
+  EXPECT_TRUE(exp.contents_equal(after));
 }
 
 TEST(CompressionZlib, compress_decompress_chunk)
 {
   CompressionZlib sp;
-  EXPECT_STREQ(sp.get_type(), "zlib");
+  EXPECT_STREQ(sp.get_type().c_str(), "zlib");
   const char* test = "This is test text";
   buffer::ptr test2 ("1234567890", 10);
   int len = strlen(test);